35 namespace Buffer_Namespace {
37 std::string BufferMgr::keyToString(
const ChunkKey& key) {
38 std::ostringstream oss;
41 for (
auto sub_key : key) {
42 oss << sub_key <<
",";
48 BufferMgr::BufferMgr(
const int device_id,
49 const size_t max_buffer_pool_size,
50 const size_t min_slab_size,
51 const size_t max_slab_size,
52 const size_t page_size,
53 AbstractBufferMgr* parent_mgr)
54 : AbstractBufferMgr(device_id)
55 , max_buffer_pool_size_(max_buffer_pool_size)
56 , min_slab_size_(min_slab_size)
57 , max_slab_size_(max_slab_size)
58 , page_size_(page_size)
59 , num_pages_allocated_(0)
60 , allocations_capped_(
false)
61 , parent_mgr_(parent_mgr)
99 delete buf.second->buffer;
102 chunk_index_.clear();
111 const size_t chunk_page_size,
112 const size_t initial_size) {
114 size_t actual_chunk_page_size = chunk_page_size;
115 if (actual_chunk_page_size == 0) {
124 buffer_seg.chunk_key = chunk_key;
140 buffer_it->second->buffer =
153 const size_t num_pages_requested,
154 const int slab_num) {
158 auto evict_it = evict_start;
159 size_t num_pages = 0;
160 size_t start_page = evict_start->start_page;
161 while (num_pages < num_pages_requested) {
162 if (evict_it->mem_status ==
USED) {
163 CHECK(evict_it->buffer->getPinCount() < 1);
165 num_pages += evict_it->num_pages;
166 if (evict_it->mem_status ==
USED && evict_it->chunk_key.size() > 0) {
179 if (num_pages_requested < num_pages) {
180 size_t excess_pages = num_pages - num_pages_requested;
182 evict_it->mem_status ==
FREE) {
183 evict_it->start_page = start_page + num_pages_requested;
184 evict_it->num_pages += excess_pages;
186 BufferSeg free_seg(start_page + num_pages_requested, excess_pages,
FREE);
194 BufferList::iterator& seg_it,
195 const size_t num_bytes) {
198 size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
200 if (num_pages_requested < seg_it->num_pages) {
205 int slab_num = seg_it->slab_num;
207 BufferList::iterator next_it = std::next(seg_it);
209 next_it->num_pages >= num_pages_extra_needed) {
211 size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
212 seg_it->num_pages = num_pages_requested;
213 next_it->num_pages = leftover_pages;
214 next_it->start_page = seg_it->start_page + seg_it->num_pages;
223 new_seg_it->buffer = seg_it->buffer;
224 new_seg_it->chunk_key = seg_it->chunk_key;
225 int8_t* old_mem = new_seg_it->buffer->mem_;
226 new_seg_it->buffer->mem_ =
231 if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
232 new_seg_it->buffer->writeData(old_mem,
233 new_seg_it->buffer->size(),
235 new_seg_it->buffer->getType(),
249 const size_t num_pages_requested) {
253 if (buffer_it->mem_status ==
FREE && buffer_it->num_pages >= num_pages_requested) {
255 size_t excess_pages = buffer_it->num_pages - num_pages_requested;
256 buffer_it->num_pages = num_pages_requested;
257 buffer_it->mem_status =
USED;
259 buffer_it->slab_num = slab_num;
260 if (excess_pages > 0) {
262 buffer_it->start_page + num_pages_requested, excess_pages,
FREE);
263 auto temp_it = buffer_it;
284 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
299 if (num_pages_requested <=
306 << alloc_ms <<
" ms " << getStringMgrType() <<
":" << device_id_;
314 num_pages_requested);
316 }
catch (std::runtime_error& error) {
319 << getStringMgrType() <<
":" << device_id_;
336 << getStringMgrType() <<
":" << device_id_;
348 size_t min_score = std::numeric_limits<size_t>::max();
352 BufferList::iterator best_eviction_start =
slab_segments_[0].end();
353 int best_eviction_start_slab = -1;
357 ++slab_it, ++slab_num) {
358 for (
auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
366 size_t page_count = 0;
368 bool solution_found =
false;
369 auto evict_it = buffer_it;
374 if (evict_it->mem_status ==
USED && evict_it->buffer->getPinCount() > 0) {
377 page_count += evict_it->num_pages;
378 if (evict_it->mem_status ==
USED) {
386 score = std::max(score, static_cast<size_t>(evict_it->last_touched));
388 if (page_count >= num_pages_requested) {
389 solution_found =
true;
393 if (solution_found && score < min_score) {
395 best_eviction_start = buffer_it;
396 best_eviction_start_slab = slab_num;
410 LOG(
ERROR) <<
"ALLOCATION failed to find " << num_bytes <<
"B throwing out of memory "
411 << getStringMgrType() <<
":" << device_id_;
415 LOG(
INFO) <<
"ALLOCATION failed to find " << num_bytes <<
"B free. Forcing Eviction."
416 <<
" Eviction start " << best_eviction_start->start_page
417 <<
" Number pages requested " << num_pages_requested
418 <<
" Best Eviction Start Slab " << best_eviction_start_slab <<
" "
419 << getStringMgrType() <<
":" << device_id_;
420 best_eviction_start =
421 evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
422 return best_eviction_start;
426 std::ostringstream tss;
428 tss <<
"Slab St.Page Pages Touch" << std::endl;
430 tss << setfill(
' ') << setw(4) << slab_num;
432 tss << setfill(
' ') << setw(8) << segment.start_page;
433 tss << setfill(
' ') << setw(8) << segment.num_pages;
436 tss << setfill(
' ') << setw(7) << segment.last_touched;
438 if (segment.mem_status ==
FREE) {
442 tss <<
" PC: " << setfill(
' ') << setw(2) << segment.buffer->getPinCount();
443 tss <<
" USED - Chunk: ";
445 for (
auto&& key_elem : segment.chunk_key) {
446 tss << key_elem <<
",";
455 std::ostringstream tss;
457 <<
"Slabs Contents: "
458 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
460 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
463 tss <<
"--------------------" << std::endl;
468 bool pinned_exists =
false;
470 for (
auto& segment : segment_list) {
471 if (segment.mem_status ==
FREE) {
473 }
else if (segment.buffer->getPinCount() < 1) {
476 pinned_exists =
true;
480 if (!pinned_exists) {
511 for (
auto& segment : segment_list) {
512 if (segment.mem_status !=
FREE) {
521 std::ostringstream tss;
522 tss <<
"SN: " << setfill(
' ') << setw(2) << seg_it->slab_num;
523 tss <<
" SP: " << setfill(
' ') << setw(7) << seg_it->start_page;
524 tss <<
" NP: " << setfill(
' ') << setw(7) << seg_it->num_pages;
525 tss <<
" LT: " << setfill(
' ') << setw(7) << seg_it->last_touched;
526 tss <<
" PC: " << setfill(
' ') << setw(2) << seg_it->buffer->getPinCount();
527 if (seg_it->mem_status ==
FREE) {
531 tss <<
" USED - Chunk: ";
532 for (
auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
534 tss << *vec_it <<
",";
542 std::ostringstream tss;
546 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
549 ++seg_it, ++seg_num) {
559 tss <<
"--------------------" << std::endl;
566 LOG(
INFO) << std::endl <<
" " << getStringMgrType() <<
":" << device_id_;
568 ++slab_it, ++slab_num) {
569 LOG(
INFO) <<
"Slab Num: " << slab_num <<
" " << getStringMgrType() <<
":"
571 for (
auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
572 LOG(
INFO) <<
"Segment: " << seg_num <<
" " << getStringMgrType() <<
":"
575 LOG(
INFO) <<
" " << getStringMgrType() <<
":" << device_id_;
577 LOG(
INFO) <<
"--------------------"
578 <<
" " << getStringMgrType() <<
":" << device_id_;
599 auto seg_it = buffer_it->second;
601 chunk_index_lock.unlock();
603 if (seg_it->buffer) {
604 delete seg_it->buffer;
613 std::lock_guard<std::mutex> sized_segs_lock(
618 auto startChunkIt =
chunk_index_.lower_bound(key_prefix);
623 auto buffer_it = startChunkIt;
625 std::search(buffer_it->first.begin(),
626 buffer_it->first.begin() + key_prefix.size(),
628 key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
629 auto seg_it = buffer_it->second;
630 if (seg_it->buffer) {
631 delete seg_it->buffer;
632 seg_it->buffer =
nullptr;
641 int slab_num = seg_it->slab_num;
648 auto prev_it = std::prev(seg_it);
651 if (prev_it->mem_status ==
FREE) {
652 seg_it->start_page = prev_it->start_page;
653 seg_it->num_pages += prev_it->num_pages;
657 auto next_it = std::next(seg_it);
659 if (next_it->mem_status ==
FREE) {
660 seg_it->num_pages += next_it->num_pages;
664 seg_it->mem_status =
FREE;
676 auto& buffer_itr = chunk_itr.second;
677 if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
678 parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
679 buffer_itr->buffer->clearDirtyBits();
689 key_prefix.push_back(db_id);
690 key_prefix.push_back(tb_id);
691 auto start_chunk_it =
chunk_index_.lower_bound(key_prefix);
696 auto buffer_it = start_chunk_it;
698 std::search(buffer_it->first.begin(),
699 buffer_it->first.begin() + key_prefix.size(),
701 key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
702 if (buffer_it->second->chunk_key[0] != -1 &&
703 buffer_it->second->buffer->isDirty()) {
706 parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
707 buffer_it->second->buffer->clearDirtyBits();
722 chunk_index_lock.unlock();
724 CHECK(buffer_it->second->buffer);
725 buffer_it->second->buffer->pin();
726 sized_segs_lock.unlock();
730 if (buffer_it->second->buffer->size() < num_bytes) {
732 parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
734 return buffer_it->second->buffer;
736 sized_segs_lock.unlock();
741 key, buffer, num_bytes);
745 <<
" from foreign storage. Error was " << error.what();
747 }
catch (
const std::exception& error) {
749 <<
" in buffer pool or parent buffer pools. Error was " << error.what();
757 const size_t num_bytes) {
764 chunk_index_lock.unlock();
767 sized_segs_lock.unlock();
772 }
catch (std::runtime_error& error) {
776 buffer = buffer_it->second->buffer;
778 if (num_bytes > buffer->
size()) {
781 }
catch (std::runtime_error& error) {
785 sized_segs_lock.unlock();
788 buffer->
copyTo(dest_buffer, num_bytes);
794 const size_t num_bytes) {
798 chunk_index_lock.unlock();
803 buffer = buffer_it->second->buffer;
805 size_t old_buffer_size = buffer->
size();
806 size_t new_buffer_size = num_bytes == 0 ? src_buffer->
size() : num_bytes;
818 CHECK(old_buffer_size < new_buffer_size);
820 new_buffer_size - old_buffer_size,
846 if (casted_buffer == 0) {
847 LOG(
FATAL) <<
"Wrong buffer type - expects base class pointer to Buffer type.";
871 LOG(
FATAL) <<
"getChunkMetadataVecForPrefix not supported for BufferMgr.";
size_t max_buffer_pool_num_pages_
size_t getAllocated() override
size_t min_num_pages_per_slab_
AbstractBufferMgr * parent_mgr_
std::vector< int > ChunkKey
~BufferMgr() override
Destructor.
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
BufferList::iterator seg_it_
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
const size_t max_buffer_pool_size_
size_t getMaxSize() override
size_t max_num_pages_per_slab_
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
static TimeT::rep execution(F func, Args &&...args)
size_t getNumChunks() override
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
size_t num_pages_allocated_
void removeSegment(BufferList::iterator &seg_it)
unsigned int buffer_epoch_
std::vector< BufferList > slab_segments_
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
size_t size()
Returns the total number of bytes allocated.
std::map< ChunkKey, BufferList::iterator > chunk_index_
std::string printSeg(BufferList::iterator &seg_it)
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
bool isAllocationCapped() override
std::string printSlabs() override
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
std::mutex buffer_id_mutex_
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
void free(AbstractBuffer *buffer) override
std::mutex sized_segs_mutex_
void checkpoint() override
std::mutex unsized_segs_mutex_
void removeTableRelatedDS(const int db_id, const int table_id) override
size_t current_max_slab_page_size_
const std::vector< BufferList > & getSlabSegments()
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
bool g_enable_watchdog false
size_t getMaxBufferSize()
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b->mem_
const size_t max_slab_size_
std::string printSlab(size_t slab_num)
std::mutex chunk_index_mutex_
void clearSlabs() override
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
std::string keyToString(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
std::vector< int8_t * > slabs_
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Note(s): Forbid Copying Idiom 4.1.
virtual void freeAllMem()=0