OmniSciDB  04ee39c94c
Buffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //
18 // Buffer.cpp
19 // mapd2
20 //
21 // @author Steven Stewart <steve@map-d.com>
22 // @author Todd Mostak <todd@map-d.com>
23 //
24 // Copyright (c) 2014 MapD Technologies, Inc. All rights reserved.
25 //
26 #include <cassert>
27 #include <stdexcept>
28 
29 #include "Buffer.h"
30 #include "BufferMgr.h"
31 #include "Shared/Logger.h"
32 
33 namespace Buffer_Namespace {
34 
36  BufferList::iterator segIt,
37  const int deviceId,
38  const size_t pageSize,
39  const size_t numBytes)
40  : AbstractBuffer(deviceId)
41  , mem_(0)
42  , bm_(bm)
43  , segIt_(segIt)
44  , pageSize_(pageSize)
45  , numPages_(0)
46  , pinCount_(0) {
47  pin();
48  // so that the pointer value of this Buffer is stored
49  segIt_->buffer = this;
50  if (numBytes > 0) {
51  reserve(numBytes);
52  }
53 }
54 
55 /*
56  Buffer::Buffer(const int8_t * mem, const size_t numPages, const size_t pageSize, const
57  int epoch): mem_(mem), pageSize_(pageSize), used_(0), epoch_(epoch), dirty_(false)
58  {
59  assert(pageSize_ > 0);
60  pageDirtyFlags_.resize(numPages);
61  for (size_t i = 0; i < numPages; ++i)
62  pages_.push_back(Page(mem + (i * pageSize), false));
63 
64  }
65  */
66 
68 
69 void Buffer::reserve(const size_t numBytes) {
70 #ifdef BUFFER_MUTEX
71  boost::unique_lock<boost::shared_mutex> writeLock(readWriteMutex_);
72 #endif
73  size_t numPages = (numBytes + pageSize_ - 1) / pageSize_;
74  // std::cout << "NumPages reserved: " << numPages << std::endl;
75  if (numPages > numPages_) {
76  // When running out of cpu buffers, reserveBuffer() will fail and
77  // trigger a SlabTooBig exception, so pageDirtyFlags_ and numPages_
78  // MUST NOT be set until reserveBuffer() returns; otherwise, this
79  // buffer is not properly resized, so any call to FileMgr::fetchBuffer()
80  // will proceed to read(), corrupt heap memory and cause core dump later.
81  segIt_ = bm_->reserveBuffer(segIt_, pageSize_ * numPages);
82  pageDirtyFlags_.resize(numPages);
83  numPages_ = numPages;
84  }
85 }
86 
87 void Buffer::read(int8_t* const dst,
88  const size_t numBytes,
89  const size_t offset,
90  const MemoryLevel dstBufferType,
91  const int dstDeviceId) {
92  if (numBytes == 0) {
93  return;
94  }
95  assert(dst && mem_);
96 #ifdef BUFFER_MUTEX
97  boost::shared_lock<boost::shared_mutex> readLock(readWriteMutex_);
98 #endif
99 
100  if (numBytes + offset > size_) {
101  LOG(FATAL) << "Buffer: Out of bounds read error";
102  }
103  readData(dst, numBytes, offset, dstBufferType, dstDeviceId);
104 }
105 
106 void Buffer::write(int8_t* src,
107  const size_t numBytes,
108  const size_t offset,
109  const MemoryLevel srcBufferType,
110  const int srcDeviceId) {
111  assert(numBytes > 0); // cannot write 0 bytes
112 #ifdef BUFFER_MUTEX
113  boost::unique_lock<boost::shared_mutex> writeLock(readWriteMutex_);
114 #endif
115  if (numBytes + offset > reservedSize()) {
116  reserve(numBytes + offset);
117  }
118  // write source contents to buffer
119  writeData(src, numBytes, offset, srcBufferType, srcDeviceId);
120 
121  // update dirty flags for buffer and each affected page
122  isDirty_ = true;
123  if (offset < size_) {
124  isUpdated_ = true;
125  }
126  if (offset + numBytes > size_) {
127  isAppended_ = true;
128  size_ = offset + numBytes;
129  }
130  // std::cout << "Size after write: " << size_ << std::endl;
131 
132  size_t firstDirtyPage = offset / pageSize_;
133  size_t lastDirtyPage = (offset + numBytes - 1) / pageSize_;
134  for (size_t i = firstDirtyPage; i <= lastDirtyPage; ++i) {
135  pageDirtyFlags_[i] = true;
136  }
137 }
138 
139 void Buffer::append(int8_t* src,
140  const size_t numBytes,
141  const MemoryLevel srcBufferType,
142  const int srcDeviceId) {
143 #ifdef BUFFER_MUTEX
144  boost::shared_lock<boost::shared_mutex> readLock(
145  readWriteMutex_); // keep another thread from getting a write lock
146  boost::unique_lock<boost::shared_mutex> appendLock(
147  appendMutex_); // keep another thread from getting an append lock
148 #endif
149 
150  isDirty_ = true;
151  isAppended_ = true;
152 
153  if (numBytes + size_ > reservedSize()) {
154  reserve(numBytes + size_);
155  }
156 
157  writeData(src, numBytes, size_, srcBufferType, srcDeviceId);
158  size_ += numBytes;
159  // Do we worry about dirty flags here or does append avoid them
160 }
161 
163  return mem_;
164 }
165 } // namespace Buffer_Namespace
virtual void readData(int8_t *const dst, const size_t numBytes, const size_t offset=0, const MemoryLevel dstBufferType=CPU_LEVEL, const int dstDeviceId=-1)=0
int8_t * getMemoryPtr() override
Returns a raw, constant (read-only) pointer to the underlying buffer.
Definition: Buffer.cpp:162
~Buffer() override
Destructor.
Definition: Buffer.cpp:67
#define LOG(tag)
Definition: Logger.h:182
size_t numPages_
the size of each page in the buffer
Definition: Buffer.h:169
int64_t * src
void reserve(const size_t numBytes) override
Definition: Buffer.cpp:69
Note(s): Forbid Copying Idiom 4.1.
Definition: BufferMgr.h:96
BufferList::iterator segIt_
Definition: Buffer.h:166
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const MemoryLevel dstBufferType=CPU_LEVEL, const int deviceId=-1) override
Reads (copies) data from the buffer to the destination (dst) memory location. Reads (copies) nbytes o...
Definition: Buffer.cpp:87
Buffer(BufferMgr *bm, BufferList::iterator segIt, const int deviceId, const size_t pageSize=512, const size_t numBytes=0)
Constructs a Buffer object. The constructor requires a memory address (provided by BufferMgr)...
Definition: Buffer.cpp:35
virtual void writeData(int8_t *const src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcBufferType=CPU_LEVEL, const int srcDeviceId=-1)=0
An AbstractBuffer is a unit of data management for a data manager.
std::vector< bool > pageDirtyFlags_
indicates when the buffer was last flushed
Definition: Buffer.h:173
BufferList::iterator reserveBuffer(BufferList::iterator &segIt, const size_t numBytes)
Definition: BufferMgr.cpp:179
size_t reservedSize() const override
Returns the total number of bytes allocated for the buffer.
Definition: Buffer.h:122
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcBufferType=CPU_LEVEL, const int deviceId=-1) override
Definition: Buffer.cpp:139
int pin() override
Definition: Buffer.h:134
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcBufferType=CPU_LEVEL, const int deviceId=-1) override
Writes (copies) data from src into the buffer. Writes (copies) nbytes of data into the buffer at the ...
Definition: Buffer.cpp:106