OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Buffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <cassert>
20 #include <stdexcept>
21 
23 #include "Logger/Logger.h"
24 
25 namespace Buffer_Namespace {
26 
28  BufferList::iterator seg_it,
29  const int device_id,
30  const size_t page_size,
31  const size_t num_bytes)
32  : AbstractBuffer(device_id)
33  , mem_(nullptr)
34  , bm_(bm)
35  , seg_it_(seg_it)
36  , page_size_(page_size)
37  , num_pages_(0)
38  , pin_count_(0) {
39  pin();
40  // so that the pointer value of this Buffer is stored
41  seg_it_->buffer = this;
42  if (num_bytes > 0) {
43  reserve(num_bytes);
44  }
45 }
46 
48 
49 void Buffer::reserve(const size_t num_bytes) {
50 #ifdef BUFFER_MUTEX
51  boost::unique_lock<boost::shared_mutex> write_lock(read_write_mutex_);
52 #endif
53  size_t num_pages = (num_bytes + page_size_ - 1) / page_size_;
54  // std::cout << "NumPages reserved: " << numPages << std::endl;
55  if (num_pages > num_pages_) {
56  // When running out of cpu buffers, reserveBuffer() will fail and
57  // trigger a SlabTooBig exception, so pageDirtyFlags_ and numPages_
58  // MUST NOT be set until reserveBuffer() returns; otherwise, this
59  // buffer is not properly resized, so any call to FileMgr::fetchBuffer()
60  // will proceed to read(), corrupt heap memory and cause core dump later.
61  seg_it_ = bm_->reserveBuffer(seg_it_, page_size_ * num_pages);
62  page_dirty_flags_.resize(num_pages);
63  num_pages_ = num_pages;
64  }
65 }
66 
67 void Buffer::read(int8_t* const dst,
68  const size_t num_bytes,
69  const size_t offset,
70  const MemoryLevel dst_buffer_type,
71  const int dst_device_id) {
72  if (num_bytes == 0) {
73  return;
74  }
75  CHECK(dst && mem_);
76 #ifdef BUFFER_MUTEX
77  boost::shared_lock<boost::shared_mutex> read_lock(read_write_mutex_);
78 #endif
79 
80  if (num_bytes + offset > size_) {
81  LOG(FATAL) << "Buffer: Out of bounds read error";
82  }
83  readData(dst, num_bytes, offset, dst_buffer_type, dst_device_id);
84 }
85 
86 void Buffer::write(int8_t* src,
87  const size_t num_bytes,
88  const size_t offset,
89  const MemoryLevel src_buffer_type,
90  const int src_device_id) {
91  CHECK_GT(num_bytes, size_t(0)); // cannot write 0 bytes
92 #ifdef BUFFER_MUTEX
93  boost::unique_lock<boost::shared_mutex> write_lock(read_write_mutex_);
94 #endif
95  if (num_bytes + offset > reservedSize()) {
96  reserve(num_bytes + offset);
97  }
98  // write source contents to buffer
99  writeData(src, num_bytes, offset, src_buffer_type, src_device_id);
100 
101  // update dirty flags for buffer and each affected page
102  setDirty();
103  if (offset < size_) {
104  setUpdated();
105  }
106  if (offset + num_bytes > size_) {
107  setAppended();
108  size_ = offset + num_bytes;
109  }
110 
111  // std::cout << "Size after write: " << size_ << std::endl;
112 
113  size_t first_dirty_page = offset / page_size_;
114  size_t last_dirty_page = (offset + num_bytes - 1) / page_size_;
115  for (size_t i = first_dirty_page; i <= last_dirty_page; ++i) {
116  page_dirty_flags_[i] = true;
117  }
118 }
119 
120 void Buffer::append(int8_t* src,
121  const size_t num_bytes,
122  const MemoryLevel src_buffer_type,
123  const int src_device_id) {
124 #ifdef BUFFER_MUTEX
125  boost::shared_lock<boost::shared_mutex> read_lock(
126  read_write_mutex_); // keep another thread from getting a write lock
127  boost::unique_lock<boost::shared_mutex> append_lock(
128  append_mutex_); // keep another thread from getting an append lock
129 #endif
130 
131  setAppended();
132 
133  if (num_bytes + size_ > reservedSize()) {
134  reserve(num_bytes + size_);
135  }
136 
137  writeData(src, num_bytes, size_, src_buffer_type, src_device_id);
138  size_ += num_bytes;
139  // Do we worry about dirty flags here or does append avoid them
140 }
141 
143  return mem_;
144 }
145 
146 void Buffer::setMemoryPtr(int8_t* new_ptr) {
147  mem_ = new_ptr;
148 }
149 } // namespace Buffer_Namespace
virtual void writeData(int8_t *const src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
size_t num_pages_
the size of each page in the buffer
Definition: Buffer.h:165
void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int deviceId=-1) override
Definition: Buffer.cpp:120
BufferList::iterator seg_it_
Definition: Buffer.h:163
int8_t * getMemoryPtr() override
Returns a raw, constant (read-only) pointer to the underlying buffer.
Definition: Buffer.cpp:142
~Buffer() override
Destructor.
Definition: Buffer.cpp:47
#define LOG(tag)
Definition: Logger.h:285
#define CHECK_GT(x, y)
Definition: Logger.h:305
Note(s): Forbid Copying Idiom 4.1.
Definition: BufferMgr.h:96
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:198
std::vector< bool > page_dirty_flags_
indicates when the buffer was last flushed
Definition: Buffer.h:167
An AbstractBuffer is a unit of data management for a data manager.
virtual void readData(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
#define CHECK(condition)
Definition: Logger.h:291
void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1) override
Writes (copies) data from src into the buffer. Writes (copies) nbytes of data into the buffer at the ...
Definition: Buffer.cpp:86
void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int device_id=-1) override
Reads (copies) data from the buffer to the destination (dst) memory location. Reads (copies) nbytes o...
Definition: Buffer.cpp:67
Buffer(BufferMgr *bm, BufferList::iterator seg_it, const int device_id, const size_t page_size=512, const size_t num_bytes=0)
Constructs a Buffer object. The constructor requires a memory address (provided by BufferMgr)...
Definition: Buffer.cpp:27
size_t reservedSize() const override
Returns the total number of bytes allocated for the buffer.
Definition: Buffer.h:118
void reserve(const size_t num_bytes) override
Definition: Buffer.cpp:49
void setMemoryPtr(int8_t *new_ptr) override
Definition: Buffer.cpp:146
int pin() override
Definition: Buffer.h:126