34 namespace Buffer_Namespace {
36 std::string BufferMgr::keyToString(
const ChunkKey& key) {
37 std::ostringstream oss;
40 for (
auto sub_key : key) {
41 oss << sub_key <<
",";
47 BufferMgr::BufferMgr(
const int device_id,
48 const size_t max_buffer_pool_size,
49 const size_t min_slab_size,
50 const size_t max_slab_size,
51 const size_t page_size,
52 AbstractBufferMgr* parent_mgr)
53 : AbstractBufferMgr(device_id)
54 , max_buffer_pool_size_(max_buffer_pool_size)
55 , min_slab_size_(min_slab_size)
56 , max_slab_size_(max_slab_size)
57 , page_size_(page_size)
58 , num_pages_allocated_(0)
59 , allocations_capped_(
false)
60 , parent_mgr_(parent_mgr)
99 delete buf.second->buffer;
102 chunk_index_.clear();
111 const size_t chunk_page_size,
112 const size_t initial_size) {
114 size_t actual_chunk_page_size = chunk_page_size;
115 if (actual_chunk_page_size == 0) {
124 buffer_seg.chunk_key = chunk_key;
140 buffer_it->second->buffer =
153 const size_t num_pages_requested,
154 const int slab_num) {
158 auto evict_it = evict_start;
159 size_t num_pages = 0;
160 size_t start_page = evict_start->start_page;
161 while (num_pages < num_pages_requested) {
162 if (evict_it->mem_status ==
USED) {
163 CHECK(evict_it->buffer->getPinCount() < 1);
165 num_pages += evict_it->num_pages;
166 if (evict_it->mem_status ==
USED && evict_it->chunk_key.size() > 0) {
169 if (evict_it->buffer !=
nullptr) {
172 delete evict_it->buffer;
184 if (num_pages_requested < num_pages) {
185 size_t excess_pages = num_pages - num_pages_requested;
187 evict_it->mem_status ==
FREE) {
188 evict_it->start_page = start_page + num_pages_requested;
189 evict_it->num_pages += excess_pages;
191 BufferSeg free_seg(start_page + num_pages_requested, excess_pages,
FREE);
199 BufferList::iterator& seg_it,
200 const size_t num_bytes) {
203 size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
205 if (num_pages_requested < seg_it->num_pages) {
210 int slab_num = seg_it->slab_num;
212 BufferList::iterator next_it = std::next(seg_it);
214 next_it->num_pages >= num_pages_extra_needed) {
216 size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
217 seg_it->num_pages = num_pages_requested;
218 next_it->num_pages = leftover_pages;
219 next_it->start_page = seg_it->start_page + seg_it->num_pages;
228 new_seg_it->buffer = seg_it->buffer;
229 new_seg_it->chunk_key = seg_it->chunk_key;
230 int8_t* old_mem = new_seg_it->buffer->mem_;
231 new_seg_it->buffer->mem_ =
236 if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
237 new_seg_it->buffer->writeData(old_mem,
238 new_seg_it->buffer->size(),
240 new_seg_it->buffer->getType(),
254 const size_t num_pages_requested) {
258 if (buffer_it->mem_status ==
FREE && buffer_it->num_pages >= num_pages_requested) {
260 size_t excess_pages = buffer_it->num_pages - num_pages_requested;
261 buffer_it->num_pages = num_pages_requested;
262 buffer_it->mem_status =
USED;
264 buffer_it->slab_num = slab_num;
265 if (excess_pages > 0) {
267 buffer_it->start_page + num_pages_requested, excess_pages,
FREE);
268 auto temp_it = buffer_it;
289 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
304 if (num_pages_requested <=
311 << alloc_ms <<
" ms " << getStringMgrType() <<
":" << device_id_;
319 num_pages_requested);
321 }
catch (std::runtime_error& error) {
324 << getStringMgrType() <<
":" << device_id_;
341 << getStringMgrType() <<
":" << device_id_;
353 size_t min_score = std::numeric_limits<size_t>::max();
357 BufferList::iterator best_eviction_start =
slab_segments_[0].end();
358 int best_eviction_start_slab = -1;
362 ++slab_it, ++slab_num) {
363 for (
auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
371 size_t page_count = 0;
373 bool solution_found =
false;
374 auto evict_it = buffer_it;
379 if (evict_it->mem_status ==
USED && evict_it->buffer->getPinCount() > 0) {
382 page_count += evict_it->num_pages;
383 if (evict_it->mem_status ==
USED) {
391 score = std::max(score, static_cast<size_t>(evict_it->last_touched));
393 if (page_count >= num_pages_requested) {
394 solution_found =
true;
398 if (solution_found && score < min_score) {
400 best_eviction_start = buffer_it;
401 best_eviction_start_slab = slab_num;
415 LOG(
ERROR) <<
"ALLOCATION failed to find " << num_bytes <<
"B throwing out of memory "
416 << getStringMgrType() <<
":" << device_id_;
420 LOG(
INFO) <<
"ALLOCATION failed to find " << num_bytes <<
"B free. Forcing Eviction."
421 <<
" Eviction start " << best_eviction_start->start_page
422 <<
" Number pages requested " << num_pages_requested
423 <<
" Best Eviction Start Slab " << best_eviction_start_slab <<
" "
424 << getStringMgrType() <<
":" << device_id_;
425 best_eviction_start =
426 evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
427 return best_eviction_start;
431 std::ostringstream tss;
433 tss <<
"Slab St.Page Pages Touch" << std::endl;
435 tss << setfill(
' ') << setw(4) << slab_num;
437 tss << setfill(
' ') << setw(8) << segment.start_page;
438 tss << setfill(
' ') << setw(8) << segment.num_pages;
441 tss << setfill(
' ') << setw(7) << segment.last_touched;
443 if (segment.mem_status ==
FREE) {
447 tss <<
" PC: " << setfill(
' ') << setw(2) << segment.buffer->getPinCount();
448 tss <<
" USED - Chunk: ";
450 for (
auto&& key_elem : segment.chunk_key) {
451 tss << key_elem <<
",";
460 std::ostringstream tss;
462 <<
"Slabs Contents: "
463 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
465 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
468 tss <<
"--------------------" << std::endl;
473 bool pinned_exists =
false;
475 for (
auto& segment : segment_list) {
476 if (segment.mem_status ==
FREE) {
478 }
else if (segment.buffer->getPinCount() < 1) {
481 pinned_exists =
true;
485 if (!pinned_exists) {
516 for (
auto& segment : segment_list) {
517 if (segment.mem_status !=
FREE) {
526 std::ostringstream tss;
527 tss <<
"SN: " << setfill(
' ') << setw(2) << seg_it->slab_num;
528 tss <<
" SP: " << setfill(
' ') << setw(7) << seg_it->start_page;
529 tss <<
" NP: " << setfill(
' ') << setw(7) << seg_it->num_pages;
530 tss <<
" LT: " << setfill(
' ') << setw(7) << seg_it->last_touched;
531 tss <<
" PC: " << setfill(
' ') << setw(2) << seg_it->buffer->getPinCount();
532 if (seg_it->mem_status ==
FREE) {
536 tss <<
" USED - Chunk: ";
537 for (
auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
539 tss << *vec_it <<
",";
547 std::ostringstream tss;
551 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
554 ++seg_it, ++seg_num) {
564 tss <<
"--------------------" << std::endl;
571 LOG(
INFO) << std::endl <<
" " << getStringMgrType() <<
":" << device_id_;
573 ++slab_it, ++slab_num) {
574 LOG(
INFO) <<
"Slab Num: " << slab_num <<
" " << getStringMgrType() <<
":"
576 for (
auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
577 LOG(
INFO) <<
"Segment: " << seg_num <<
" " << getStringMgrType() <<
":"
580 LOG(
INFO) <<
" " << getStringMgrType() <<
":" << device_id_;
582 LOG(
INFO) <<
"--------------------"
583 <<
" " << getStringMgrType() <<
":" << device_id_;
604 auto seg_it = buffer_it->second;
606 chunk_index_lock.unlock();
608 if (seg_it->buffer) {
609 delete seg_it->buffer;
618 std::lock_guard<std::mutex> sized_segs_lock(
623 auto startChunkIt =
chunk_index_.lower_bound(key_prefix);
628 auto buffer_it = startChunkIt;
630 std::search(buffer_it->first.begin(),
631 buffer_it->first.begin() + key_prefix.size(),
633 key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
634 auto seg_it = buffer_it->second;
635 if (seg_it->buffer) {
636 if (seg_it->buffer->getPinCount() != 0) {
642 delete seg_it->buffer;
643 seg_it->buffer =
nullptr;
652 int slab_num = seg_it->slab_num;
659 auto prev_it = std::prev(seg_it);
662 if (prev_it->mem_status ==
FREE) {
663 seg_it->start_page = prev_it->start_page;
664 seg_it->num_pages += prev_it->num_pages;
668 auto next_it = std::next(seg_it);
670 if (next_it->mem_status ==
FREE) {
671 seg_it->num_pages += next_it->num_pages;
675 seg_it->mem_status =
FREE;
687 auto& buffer_itr = chunk_itr.second;
688 if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
689 parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
690 buffer_itr->buffer->clearDirtyBits();
700 key_prefix.push_back(db_id);
701 key_prefix.push_back(tb_id);
702 auto start_chunk_it =
chunk_index_.lower_bound(key_prefix);
707 auto buffer_it = start_chunk_it;
709 std::search(buffer_it->first.begin(),
710 buffer_it->first.begin() + key_prefix.size(),
712 key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
713 if (buffer_it->second->chunk_key[0] != -1 &&
714 buffer_it->second->buffer->isDirty()) {
717 parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
718 buffer_it->second->buffer->clearDirtyBits();
733 chunk_index_lock.unlock();
735 CHECK(buffer_it->second->buffer);
736 buffer_it->second->buffer->pin();
737 sized_segs_lock.unlock();
741 auto buffer_size = buffer_it->second->buffer->size();
742 if (buffer_size < num_bytes) {
744 VLOG(1) << ToString(getMgrType())
745 <<
": Fetching buffer from parent manager. Reason: increased buffer size. "
747 << buffer_size <<
", num bytes to fetch: " << num_bytes
749 parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
751 return buffer_it->second->buffer;
753 sized_segs_lock.unlock();
757 VLOG(1) << ToString(getMgrType())
758 <<
": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
760 << num_bytes <<
", chunk key: " <<
keyToString(key);
762 key, buffer, num_bytes);
766 <<
" from foreign storage. Error was " << error.what();
768 }
catch (
const std::exception& error) {
770 <<
" in buffer pool or parent buffer pools. Error was " << error.what();
778 const size_t num_bytes) {
785 chunk_index_lock.unlock();
788 sized_segs_lock.unlock();
792 VLOG(1) << ToString(getMgrType())
793 <<
": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
795 << num_bytes <<
", chunk key: " <<
keyToString(key);
800 <<
" from foreign storage. Error was " << error.what();
802 }
catch (std::runtime_error& error) {
804 <<
" error: " << error.what();
807 buffer = buffer_it->second->buffer;
809 auto buffer_size = buffer->
size();
810 if (num_bytes > buffer_size) {
812 VLOG(1) << ToString(getMgrType())
813 <<
": Fetching buffer from parent manager. Reason: increased buffer "
814 "size. Buffer size: "
815 << buffer_size <<
", num bytes to fetch: " << num_bytes
820 <<
" from foreign storage. Error was " << error.what();
822 }
catch (std::runtime_error& error) {
824 <<
" error: " << error.what();
827 sized_segs_lock.unlock();
830 buffer->
copyTo(dest_buffer, num_bytes);
836 const size_t num_bytes) {
840 chunk_index_lock.unlock();
845 buffer = buffer_it->second->buffer;
847 size_t old_buffer_size = buffer->
size();
848 size_t new_buffer_size = num_bytes == 0 ? src_buffer->
size() : num_bytes;
860 CHECK(old_buffer_size < new_buffer_size);
862 new_buffer_size - old_buffer_size,
888 if (casted_buffer == 0) {
889 LOG(
FATAL) <<
"Wrong buffer type - expects base class pointer to Buffer type.";
913 LOG(
FATAL) <<
"getChunkMetadataVecForPrefix not supported for BufferMgr.";
size_t max_buffer_pool_num_pages_
size_t getAllocated() override
size_t min_num_pages_per_slab_
AbstractBufferMgr * parent_mgr_
std::vector< int > ChunkKey
~BufferMgr() override
Destructor.
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
BufferList::iterator seg_it_
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
const size_t max_buffer_pool_size_
size_t getMaxSize() override
size_t max_num_pages_per_slab_
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
static TimeT::rep execution(F func, Args &&...args)
size_t getNumChunks() override
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
size_t num_pages_allocated_
void removeSegment(BufferList::iterator &seg_it)
unsigned int buffer_epoch_
std::vector< BufferList > slab_segments_
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
size_t size()
Returns the total number of bytes allocated.
std::map< ChunkKey, BufferList::iterator > chunk_index_
std::string printSeg(BufferList::iterator &seg_it)
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
bool isAllocationCapped() override
std::string printSlabs() override
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
std::mutex buffer_id_mutex_
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
void free(AbstractBuffer *buffer) override
std::mutex sized_segs_mutex_
void checkpoint() override
std::mutex unsized_segs_mutex_
void removeTableRelatedDS(const int db_id, const int table_id) override
size_t current_max_slab_page_size_
const std::vector< BufferList > & getSlabSegments()
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
bool g_enable_watchdog false
size_t getMaxBufferSize()
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b->mem_
const size_t max_slab_size_
std::string printSlab(size_t slab_num)
std::mutex chunk_index_mutex_
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
std::string keyToString(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
std::vector< int8_t * > slabs_
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Note(s): Forbid Copying Idiom 4.1.
virtual void freeAllMem()=0