OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
File_Namespace::FileBuffer Class Reference

Represents/provides access to contiguous data stored in the file system. More...

#include <FileBuffer.h>

+ Inheritance diagram for File_Namespace::FileBuffer:
+ Collaboration diagram for File_Namespace::FileBuffer:

Public Member Functions

 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
 Constructs a FileBuffer object. More...
 
 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const SQLTypeInfo sqlType, const size_t initialSize=0)
 
 FileBuffer (FileMgr *fm, const ChunkKey &chunkKey, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
 
 ~FileBuffer () override
 Destructor. More...
 
Page addNewMultiPage (const int epoch)
 
void reserve (const size_t numBytes) override
 
void freePages ()
 
void read (int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void write (int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 Writes the contents of source (src) into new versions of the affected logical pages. More...
 
void append (int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void copyPage (Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
 
Data_Namespace::MemoryLevel getType () const override
 
int8_t * getMemoryPtr () override
 Not implemented for FileMgr – throws a runtime_error. More...
 
size_t pageCount () const override
 Returns the number of pages in the FileBuffer. More...
 
size_t pageSize () const override
 Returns the size in bytes of each page in the FileBuffer. More...
 
virtual size_t pageDataSize () const
 Returns the size in bytes of the data portion of each page in the FileBuffer. More...
 
virtual size_t reservedHeaderSize () const
 
virtual std::vector< MultiPagegetMultiPage () const
 Returns vector of MultiPages in the FileBuffer. More...
 
size_t size () const override
 
size_t reservedSize () const override
 Returns the total number of bytes allocated for the FileBuffer. More...
 
bool isDirty () const override
 Returns the total number of used bytes in the FileBuffer. More...
 
- Public Member Functions inherited from Data_Namespace::AbstractBuffer
 AbstractBuffer (const int device_id)
 
 AbstractBuffer (const int device_id, const SQLTypeInfo sql_type)
 
virtual ~AbstractBuffer ()
 
virtual int getDeviceId () const
 
virtual int pin ()
 
virtual int unPin ()
 
virtual int getPinCount ()
 
virtual bool isAppended () const
 
virtual bool isUpdated () const
 
virtual void setDirty ()
 
virtual void setUpdated ()
 
virtual void setAppended ()
 
void setSize (const size_t size)
 
void clearDirtyBits ()
 
void initEncoder (const SQLTypeInfo tmp_sql_type)
 
void syncEncoder (const AbstractBuffer *src_buffer)
 

Private Member Functions

void writeHeader (Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
 Write header writes header at top of page in format. More...
 
void writeMetadata (const int epoch)
 
void readMetadata (const Page &page)
 
void calcHeaderBuffer ()
 

Private Attributes

FileMgrfm_
 
MultiPage metadataPages_
 
std::vector< MultiPagemultiPages_
 
size_t pageSize_
 
size_t pageDataSize_
 
size_t reservedHeaderSize_
 
ChunkKey chunkKey_
 

Static Private Attributes

static size_t headerBufferOffset_ = 32
 

Friends

class FileMgr
 

Additional Inherited Members

- Public Attributes inherited from Data_Namespace::AbstractBuffer
std::unique_ptr< Encoderencoder
 
bool has_encoder
 
SQLTypeInfo sql_type
 
- Protected Attributes inherited from Data_Namespace::AbstractBuffer
size_t size_
 
bool is_dirty_
 
bool is_appended_
 
bool is_updated_
 
int device_id_
 

Detailed Description

Represents/provides access to contiguous data stored in the file system.

The FileBuffer consists of logical pages, which can map to any identically-sized page in any file of the underlying file system. A page's metadata (file and page number) are stored in MultiPage objects, and each MultiPage includes page metadata for multiple versions of the same page.

Note that a "Chunk" is brought into a FileBuffer by the FileMgr.

Note(s): Forbid Copying Idiom 4.1

Definition at line 55 of file FileBuffer.h.

Constructor & Destructor Documentation

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const size_t  initialSize = 0 
)

Constructs a FileBuffer object.

Definition at line 39 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK(), fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

43  : AbstractBuffer(fm->getDeviceId())
44  , fm_(fm)
47  , chunkKey_(chunkKey) {
48  // Create a new FileBuffer
49  CHECK(fm_);
52  //@todo reintroduce initialSize - need to develop easy way of
53  // differentiating these pre-allocated pages from "written-to" pages
54  /*
55  if (initalSize > 0) {
56  // should expand to initialSize bytes
57  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
58  int epoch = fm_->epoch();
59  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
60  Page page = addNewMultiPage(epoch);
61  writeHeader(page,pageNum,epoch);
62  }
63  }
64  */
65 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
CHECK(cgen_state)
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127

+ Here is the call graph for this function:

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const SQLTypeInfo  sqlType,
const size_t  initialSize = 0 
)

Definition at line 67 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK(), fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

72  : AbstractBuffer(fm->getDeviceId(), sqlType)
73  , fm_(fm)
76  , chunkKey_(chunkKey) {
77  CHECK(fm_);
80 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
CHECK(cgen_state)
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127

+ Here is the call graph for this function:

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const ChunkKey chunkKey,
const std::vector< HeaderInfo >::const_iterator &  headerStartIt,
const std::vector< HeaderInfo >::const_iterator &  headerEndIt 
)

Definition at line 82 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK(), File_Namespace::MultiPage::epochs, logger::FATAL, fm_, LOG, metadataPages_, multiPages_, pageDataSize_, pageSize_, File_Namespace::MultiPage::pageVersions, readMetadata(), reservedHeaderSize_, and showChunk().

86  : AbstractBuffer(fm->getDeviceId())
87  , fm_(fm)
89  , pageSize_(0)
90  , chunkKey_(chunkKey) {
91  // We are being assigned an existing FileBuffer on disk
92 
93  CHECK(fm_);
95  // MultiPage multiPage(pageSize_); // why was this here?
96  int lastPageId = -1;
97  // Page lastMetadataPage;
98  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
99  int curPageId = vecIt->pageId;
100 
101  // We only want to read last metadata page
102  if (curPageId == -1) { // stats page
103  metadataPages_.epochs.push_back(vecIt->versionEpoch);
104  metadataPages_.pageVersions.push_back(vecIt->page);
105  } else {
106  if (curPageId != lastPageId) {
107  // protect from bad data on disk, and give diagnostics
108  if (curPageId != lastPageId + 1) {
109  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
110  << " Current page " << curPageId << " last page " << lastPageId
111  << " epoch " << vecIt->versionEpoch;
112  }
113  if (lastPageId == -1) {
114  // If we are on first real page
115  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
118  }
119  MultiPage multiPage(pageSize_);
120  multiPages_.push_back(multiPage);
121  lastPageId = curPageId;
122  }
123  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
124  multiPages_.back().pageVersions.push_back(vecIt->page);
125  }
126  if (curPageId == -1) { // meaning there was only a metadata page
129  }
130  }
131  // auto lastHeaderIt = std::prev(headerEndIt);
132  // size_ = lastHeaderIt->chunkSize;
133 }
#define LOG(tag)
Definition: Logger.h:185
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::deque< Page > pageVersions
Definition: Page.h:71
CHECK(cgen_state)
AbstractBuffer(const int device_id)
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
std::deque< int > epochs
Definition: Page.h:72
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:389

+ Here is the call graph for this function:

File_Namespace::FileBuffer::~FileBuffer ( )
override

Destructor.

Definition at line 135 of file FileBuffer.cpp.

135  {
136  // need to free pages
137  // NOP
138 }

Member Function Documentation

Page File_Namespace::FileBuffer::addNewMultiPage ( const int  epoch)

Definition at line 361 of file FileBuffer.cpp.

References File_Namespace::MultiPage::epochs, fm_, multiPages_, pageSize_, File_Namespace::MultiPage::pageVersions, and File_Namespace::FileMgr::requestFreePage().

Referenced by append(), reserve(), and write().

361  {
362  Page page = fm_->requestFreePage(pageSize_, false);
363  MultiPage multiPage(pageSize_);
364  multiPage.epochs.push_back(epoch);
365  multiPage.pageVersions.push_back(page);
366  multiPages_.push_back(multiPage);
367  return page;
368 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:799
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::append ( int8_t *  src,
const size_t  numBytes,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 457 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK(), File_Namespace::FileMgr::epoch(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::is_appended_, Data_Namespace::AbstractBuffer::is_dirty_, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

460  {
461  is_dirty_ = true;
462  is_appended_ = true;
463 
464  size_t startPage = size_ / pageDataSize_;
465  size_t startPageOffset = size_ % pageDataSize_;
466  size_t numPagesToWrite =
467  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
468  size_t bytesLeft = numBytes;
469  int8_t* curPtr = src; // a pointer to the current location in dst being written to
470  size_t initialNumPages = multiPages_.size();
471  size_ = size_ + numBytes;
472  int epoch = fm_->epoch();
473  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
474  Page page;
475  if (pageNum >= initialNumPages) {
476  page = addNewMultiPage(epoch);
477  writeHeader(page, pageNum, epoch);
478  } else {
479  // we already have a new page at current
480  // epoch for this page - just grab this page
481  page = multiPages_[pageNum].current();
482  }
483  CHECK(page.fileId >= 0); // make sure page was initialized
484  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
485  size_t bytesWritten;
486  if (pageNum == startPage) {
487  bytesWritten = fileInfo->write(
488  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
489  min(pageDataSize_ - startPageOffset, bytesLeft),
490  curPtr);
491  } else {
492  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
493  min(pageDataSize_, bytesLeft),
494  curPtr);
495  }
496  curPtr += bytesWritten;
497  bytesLeft -= bytesWritten;
498  }
499  CHECK(bytesLeft == 0);
500 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:361
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
int64_t * src
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:204
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
CHECK(cgen_state)

+ Here is the call graph for this function:

void File_Namespace::FileBuffer::calcHeaderBuffer ( )
private

Definition at line 151 of file FileBuffer.cpp.

References chunkKey_, headerBufferOffset_, and reservedHeaderSize_.

Referenced by FileBuffer().

151  {
152  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
153  // sizeof(size_t) is for chunkSize
154  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
155  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
156  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
157  if (headerMod > 0) {
159  }
160  // pageDataSize_ = pageSize_-reservedHeaderSize_;
161 }
static size_t headerBufferOffset_
Definition: FileBuffer.h:169

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::copyPage ( Page srcPage,
Page destPage,
const size_t  numBytes,
const size_t  offset = 0 
)

Definition at line 341 of file FileBuffer.cpp.

References CHECK(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileInfo::read(), reservedHeaderSize_, and File_Namespace::FileInfo::write().

Referenced by write().

344  {
345  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
346  // FILE *destFile = fm_->files_[destPage.fileId]->f;
347  CHECK(offset + numBytes < pageDataSize_);
348  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
349  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
350 
351  int8_t* buffer = new int8_t[numBytes];
352  size_t bytesRead = srcFileInfo->read(
353  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
354  CHECK(bytesRead == numBytes);
355  size_t bytesWritten = destFileInfo->write(
356  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
357  CHECK(bytesWritten == numBytes);
358  delete[] buffer;
359 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
CHECK(cgen_state)
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::freePages ( )

Definition at line 163 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), metadataPages_, multiPages_, and File_Namespace::MultiPage::pageVersions.

163  {
164  // Need to zero headers (actually just first four bytes of header)
165 
166  // First delete metadata pages
167  for (auto metaPageIt = metadataPages_.pageVersions.begin();
168  metaPageIt != metadataPages_.pageVersions.end();
169  ++metaPageIt) {
170  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
171  fileInfo->freePage(metaPageIt->pageNum);
172  }
173 
174  // Now delete regular pages
175  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
176  ++multiPageIt) {
177  for (auto pageIt = multiPageIt->pageVersions.begin();
178  pageIt != multiPageIt->pageVersions.end();
179  ++pageIt) {
180  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
181  fileInfo->freePage(pageIt->pageNum);
182  }
183  }
184 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::deque< Page > pageVersions
Definition: Page.h:71
void freePage(int pageId)
Definition: FileInfo.cpp:202

+ Here is the call graph for this function:

int8_t* File_Namespace::FileBuffer::getMemoryPtr ( )
inlineoverridevirtual

Not implemented for FileMgr – throws a runtime_error.

Implements Data_Namespace::AbstractBuffer.

Definition at line 118 of file FileBuffer.h.

References logger::FATAL, and LOG.

118  {
119  LOG(FATAL) << "Operation not supported.";
120  return nullptr; // satisfy return-type warning
121  }
#define LOG(tag)
Definition: Logger.h:185
virtual std::vector<MultiPage> File_Namespace::FileBuffer::getMultiPage ( ) const
inlinevirtual

Returns vector of MultiPages in the FileBuffer.

Definition at line 137 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and read().

137 { return multiPages_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171

+ Here is the caller graph for this function:

Data_Namespace::MemoryLevel File_Namespace::FileBuffer::getType ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 115 of file FileBuffer.h.

References Data_Namespace::DISK_LEVEL.

bool File_Namespace::FileBuffer::isDirty ( ) const
inlineoverridevirtual

Returns the total number of used bytes in the FileBuffer.

Returns whether or not the FileBuffer has been modified since the last flush/checkpoint.

Reimplemented from Data_Namespace::AbstractBuffer.

Definition at line 149 of file FileBuffer.h.

size_t File_Namespace::FileBuffer::pageCount ( ) const
inlineoverridevirtual

Returns the number of pages in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 124 of file FileBuffer.h.

124 { return multiPages_.size(); }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
virtual size_t File_Namespace::FileBuffer::pageDataSize ( ) const
inlinevirtual

Returns the size in bytes of the data portion of each page in the FileBuffer.

Definition at line 130 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

130 { return pageDataSize_; }

+ Here is the caller graph for this function:

size_t File_Namespace::FileBuffer::pageSize ( ) const
inlineoverridevirtual

Returns the size in bytes of each page in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 127 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), File_Namespace::readForThread(), and writeHeader().

127 { return pageSize_; }

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::read ( int8_t *const  dst,
const size_t  numBytes = 0,
const size_t  offset = 0,
const MemoryLevel  dstMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 238 of file FileBuffer.cpp.

References CHECK(), Data_Namespace::CPU_LEVEL, logger::FATAL, fm_, getMultiPage(), File_Namespace::FileMgr::getNumReaderThreads(), LOG, File_Namespace::readThreadDS::multiPages, multiPages_, pageDataSize_, File_Namespace::readForThread(), File_Namespace::readThreadDS::t_bytesLeft, File_Namespace::readThreadDS::t_curPtr, File_Namespace::readThreadDS::t_endPage, File_Namespace::readThreadDS::t_fm, File_Namespace::readThreadDS::t_isFirstPage, File_Namespace::readThreadDS::t_startPage, and File_Namespace::readThreadDS::t_startPageOffset.

242  {
243  if (dstBufferType != CPU_LEVEL) {
244  LOG(FATAL) << "Unsupported Buffer type";
245  }
246 
247  // variable declarations
248  size_t startPage = offset / pageDataSize_;
249  size_t startPageOffset = offset % pageDataSize_;
250  size_t numPagesToRead =
251  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
252  /*
253  if (startPage + numPagesToRead > multiPages_.size()) {
254  cout << "Start page: " << startPage << endl;
255  cout << "Num pages to read: " << numPagesToRead << endl;
256  cout << "Num multipages: " << multiPages_.size() << endl;
257  cout << "Offset: " << offset << endl;
258  cout << "Num bytes: " << numBytes << endl;
259  }
260  */
261 
262  CHECK(startPage + numPagesToRead <= multiPages_.size());
263 
264  size_t numPagesPerThread = 0;
265  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
266  size_t bytesRead = 0; // total number of bytes already being read
267  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
268  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
269  size_t numThreads = fm_->getNumReaderThreads();
270  std::vector<readThreadDS>
271  threadDSArr; // array of threadDS, needed to avoid racing conditions
272 
273  if (numPagesToRead > numThreads) {
274  numPagesPerThread = numPagesToRead / numThreads;
275  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
276  } else {
277  numThreads = numPagesToRead;
278  numPagesPerThread = 1;
279  }
280 
281  /* set threadDS for the first thread */
282  readThreadDS threadDS;
283  threadDS.t_fm = fm_;
284  threadDS.t_startPage = offset / pageDataSize_;
285  if (numExtraPages > 0) {
286  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
287  numExtraPages--;
288  } else {
289  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
290  }
291  threadDS.t_curPtr = dst;
292  threadDS.t_startPageOffset = offset % pageDataSize_;
293  threadDS.t_isFirstPage = true;
294 
295  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
296  threadDS.t_startPageOffset),
297  numBytesCurrent);
298  threadDS.t_bytesLeft = bytesLeftForThread;
299  threadDS.multiPages = getMultiPage();
300 
301  if (numThreads == 1) {
302  bytesRead += readForThread(this, threadDS);
303  } else {
304  std::vector<std::future<size_t>> threads;
305 
306  for (size_t i = 0; i < numThreads; i++) {
307  threadDSArr.push_back(threadDS);
308  threads.push_back(
309  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
310 
311  // calculate elements of threadDS
312  threadDS.t_fm = fm_;
313  threadDS.t_isFirstPage = false;
314  threadDS.t_curPtr += bytesLeftForThread;
315  threadDS.t_startPage +=
316  threadDS.t_endPage -
317  threadDS.t_startPage; // based on # of pages read on previous iteration
318  if (numExtraPages > 0) {
319  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
320  numExtraPages--;
321  } else {
322  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
323  }
324  numBytesCurrent -= bytesLeftForThread;
325  bytesLeftForThread = min(
326  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
327  threadDS.t_bytesLeft = bytesLeftForThread;
328  threadDS.multiPages = getMultiPage();
329  }
330 
331  for (auto& p : threads) {
332  p.wait();
333  }
334  for (auto& p : threads) {
335  bytesRead += p.get();
336  }
337  }
338  CHECK(bytesRead == numBytes);
339 }
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:197
#define LOG(tag)
Definition: Logger.h:185
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
CHECK(cgen_state)
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:210

+ Here is the call graph for this function:

void File_Namespace::FileBuffer::readMetadata ( const Page page)
private

Definition at line 389 of file FileBuffer.cpp.

References CHECK(), Data_Namespace::AbstractBuffer::encoder, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::has_encoder, Data_Namespace::AbstractBuffer::initEncoder(), METADATA_PAGE_SIZE, METADATA_VERSION, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, SQLTypeInfoCore< TYPE_FACET_PACK >::set_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_notnull(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_type(), Data_Namespace::AbstractBuffer::size_, and Data_Namespace::AbstractBuffer::sql_type.

Referenced by FileBuffer().

389  {
390  FILE* f = fm_->getFileForFileId(page.fileId);
391  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
392  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
393  fread((int8_t*)&size_, sizeof(size_t), 1, f);
394  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
395  // encodingType, encodingBits all as int
396  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
397  int version = typeData[0];
398  CHECK(version == METADATA_VERSION); // add backward compatibility code here
399  has_encoder = static_cast<bool>(typeData[1]);
400  if (has_encoder) {
401  sql_type.set_type(static_cast<SQLTypes>(typeData[2]));
402  sql_type.set_subtype(static_cast<SQLTypes>(typeData[3]));
403  sql_type.set_dimension(typeData[4]);
404  sql_type.set_scale(typeData[5]);
405  sql_type.set_notnull(static_cast<bool>(typeData[6]));
406  sql_type.set_compression(static_cast<EncodingType>(typeData[7]));
407  sql_type.set_comp_param(typeData[8]);
408  sql_type.set_size(typeData[9]);
410  encoder->readMetadata(f);
411  }
412 }
void initEncoder(const SQLTypeInfo tmp_sql_type)
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
void set_size(int s)
Definition: sqltypes.h:424
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:916
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:416
void set_dimension(int d)
Definition: sqltypes.h:418
void set_scale(int s)
Definition: sqltypes.h:421
void set_compression(EncodingType c)
Definition: sqltypes.h:426
void set_notnull(bool n)
Definition: sqltypes.h:423
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:417
CHECK(cgen_state)
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_comp_param(int p)
Definition: sqltypes.h:427
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::reserve ( const size_t  numBytes)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 140 of file FileBuffer.cpp.

References addNewMultiPage(), File_Namespace::FileMgr::epoch(), fm_, multiPages_, pageSize_, and writeHeader().

140  {
141  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
142  size_t numCurrentPages = multiPages_.size();
143  int epoch = fm_->epoch();
144 
145  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
146  Page page = addNewMultiPage(epoch);
147  writeHeader(page, pageNum, epoch);
148  }
149 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:361
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:204

+ Here is the call graph for this function:

virtual size_t File_Namespace::FileBuffer::reservedHeaderSize ( ) const
inlinevirtual

Returns the size in bytes of the reserved header portion of each page in the FileBuffer.

Definition at line 134 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

134 { return reservedHeaderSize_; }

+ Here is the caller graph for this function:

size_t File_Namespace::FileBuffer::reservedSize ( ) const
inlineoverridevirtual

Returns the total number of bytes allocated for the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 142 of file FileBuffer.h.

142 { return multiPages_.size() * pageSize_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
size_t File_Namespace::FileBuffer::size ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 139 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init().

139 { return size_; }

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::write ( int8_t *  src,
const size_t  numBytes,
const size_t  offset = 0,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Writes the contents of source (src) into new versions of the affected logical pages.

This method will write the contents of source (src) into new version of the affected logical pages. New pages are only appended if the value of epoch (in FileMgr)

Implements Data_Namespace::AbstractBuffer.

Definition at line 502 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK(), copyPage(), Data_Namespace::CPU_LEVEL, File_Namespace::FileMgr::epoch(), logger::FATAL, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::is_appended_, Data_Namespace::AbstractBuffer::is_dirty_, Data_Namespace::AbstractBuffer::is_updated_, LOG, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

506  {
507  if (srcBufferType != CPU_LEVEL) {
508  LOG(FATAL) << "Unsupported Buffer type";
509  }
510  is_dirty_ = true;
511  if (offset < size_) {
512  is_updated_ = true;
513  }
514  bool tempIsAppended = false;
515 
516  if (offset + numBytes > size_) {
517  tempIsAppended = true; // because is_appended_ could have already been true - to
518  // avoid rewriting header
519  is_appended_ = true;
520  size_ = offset + numBytes;
521  }
522 
523  size_t startPage = offset / pageDataSize_;
524  size_t startPageOffset = offset % pageDataSize_;
525  size_t numPagesToWrite =
526  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
527  size_t bytesLeft = numBytes;
528  int8_t* curPtr = src; // a pointer to the current location in dst being written to
529  size_t initialNumPages = multiPages_.size();
530  int epoch = fm_->epoch();
531 
532  if (startPage >
533  initialNumPages) { // means there is a gap we need to allocate pages for
534  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
535  Page page = addNewMultiPage(epoch);
536  writeHeader(page, pageNum, epoch);
537  }
538  }
539  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
540  Page page;
541  if (pageNum >= initialNumPages) {
542  page = addNewMultiPage(epoch);
543  writeHeader(page, pageNum, epoch);
544  } else if (multiPages_[pageNum].epochs.back() <
545  epoch) { // need to create new page b/c this current one lags epoch and we
546  // can't overwrite it also need to copy if we are on first or
547  // last page
548  Page lastPage = multiPages_[pageNum].current();
549  page = fm_->requestFreePage(pageSize_, false);
550  multiPages_[pageNum].epochs.push_back(epoch);
551  multiPages_[pageNum].pageVersions.push_back(page);
552  if (pageNum == startPage && startPageOffset > 0) {
553  // copyPage takes care of header offset so don't worry
554  // about it
555  copyPage(lastPage, page, startPageOffset, 0);
556  }
557  if (pageNum == startPage + numPagesToWrite &&
558  bytesLeft > 0) { // bytesLeft should always > 0
559  copyPage(lastPage,
560  page,
561  pageDataSize_ - bytesLeft,
562  bytesLeft); // these would be empty if we're appending but we won't
563  // worry about it right now
564  }
565  writeHeader(page, pageNum, epoch);
566  } else {
567  // we already have a new page at current
568  // epoch for this page - just grab this page
569  page = multiPages_[pageNum].current();
570  }
571  CHECK(page.fileId >= 0); // make sure page was initialized
572  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
573  size_t bytesWritten;
574  if (pageNum == startPage) {
575  bytesWritten = fileInfo->write(
576  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
577  min(pageDataSize_ - startPageOffset, bytesLeft),
578  curPtr);
579  } else {
580  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
581  min(pageDataSize_, bytesLeft),
582  curPtr);
583  }
584  curPtr += bytesWritten;
585  bytesLeft -= bytesWritten;
586  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
587  //@todo below can lead to undefined - we're overwriting num
588  // bytes valid at checkpoint
589  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
590  }
591  }
592  CHECK(bytesLeft == 0);
593 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:361
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:799
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
#define LOG(tag)
Definition: Logger.h:185
int64_t * src
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:204
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
CHECK(cgen_state)
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:341

+ Here is the call graph for this function:

void File_Namespace::FileBuffer::writeHeader ( Page page,
const int  pageId,
const int  epoch,
const bool  writeMetadata = false 
)
private

Write header writes header at top of page in format.

Definition at line 370 of file FileBuffer.cpp.

References chunkKey_, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), METADATA_PAGE_SIZE, File_Namespace::Page::pageNum, pageSize(), pageSize_, and File_Namespace::FileInfo::write().

Referenced by append(), File_Namespace::FileMgr::init(), reserve(), write(), and writeMetadata().

373  {
374  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
375  vector<int> header(intHeaderSize);
376  // in addition to chunkkey we need size of header, pageId, version
377  header[0] =
378  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
379  // value - sizeof(size_t) is for chunkSize
380  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
381  header[intHeaderSize - 2] = pageId;
382  header[intHeaderSize - 1] = epoch;
383  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
385  fileInfo->write(
386  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
387 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:414

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::writeMetadata ( const int  epoch)
private

Definition at line 414 of file FileBuffer.cpp.

References Data_Namespace::AbstractBuffer::encoder, File_Namespace::MultiPage::epochs, File_Namespace::Page::fileId, fm_, SQLTypeInfoCore< TYPE_FACET_PACK >::get_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_notnull(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::has_encoder, METADATA_PAGE_SIZE, METADATA_VERSION, metadataPages_, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, File_Namespace::MultiPage::pageVersions, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, Data_Namespace::AbstractBuffer::sql_type, and writeHeader().

414  {
415  // Right now stats page is size_ (in bytes), bufferType, encodingType,
416  // encodingDataType, numElements
417  Page page = fm_->requestFreePage(METADATA_PAGE_SIZE, true);
418  writeHeader(page, -1, epoch, true);
419  FILE* f = fm_->getFileForFileId(page.fileId);
420  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
421  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
422  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
423  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
424  // encodingType, encodingBits all as int
425  typeData[0] = METADATA_VERSION;
426  typeData[1] = static_cast<int>(has_encoder);
427  if (has_encoder) {
428  typeData[2] = static_cast<int>(sql_type.get_type());
429  typeData[3] = static_cast<int>(sql_type.get_subtype());
430  typeData[4] = sql_type.get_dimension();
431  typeData[5] = sql_type.get_scale();
432  typeData[6] = static_cast<int>(sql_type.get_notnull());
433  typeData[7] = static_cast<int>(sql_type.get_compression());
434  typeData[8] = sql_type.get_comp_param();
435  typeData[9] = sql_type.get_size();
436  }
437  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
438  if (has_encoder) { // redundant
439  encoder->writeMetadata(f);
440  }
441  metadataPages_.epochs.push_back(epoch);
442  metadataPages_.pageVersions.push_back(page);
443 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:799
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:916
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
std::deque< Page > pageVersions
Definition: Page.h:71
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
std::deque< int > epochs
Definition: Page.h:72
#define NUM_METADATA
Definition: FileBuffer.h:35
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36

+ Here is the call graph for this function:

Friends And Related Function Documentation

friend class FileMgr
friend

Definition at line 56 of file FileBuffer.h.

Member Data Documentation

ChunkKey File_Namespace::FileBuffer::chunkKey_
private

Definition at line 175 of file FileBuffer.h.

Referenced by calcHeaderBuffer(), and writeHeader().

FileMgr* File_Namespace::FileBuffer::fm_
private
size_t File_Namespace::FileBuffer::headerBufferOffset_ = 32
staticprivate

Definition at line 169 of file FileBuffer.h.

Referenced by calcHeaderBuffer().

MultiPage File_Namespace::FileBuffer::metadataPages_
private

Definition at line 170 of file FileBuffer.h.

Referenced by FileBuffer(), freePages(), and writeMetadata().

std::vector<MultiPage> File_Namespace::FileBuffer::multiPages_
private
size_t File_Namespace::FileBuffer::pageDataSize_
private

Definition at line 173 of file FileBuffer.h.

Referenced by append(), copyPage(), FileBuffer(), read(), and write().

size_t File_Namespace::FileBuffer::pageSize_
private
size_t File_Namespace::FileBuffer::reservedHeaderSize_
private

The documentation for this class was generated from the following files: