OmniSciDB  04ee39c94c
File_Namespace::FileBuffer Class Reference

Represents/provides access to contiguous data stored in the file system. More...

#include <FileBuffer.h>

+ Inheritance diagram for File_Namespace::FileBuffer:
+ Collaboration diagram for File_Namespace::FileBuffer:

Public Member Functions

 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
 Constructs a FileBuffer object. More...
 
 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const SQLTypeInfo sqlType, const size_t initialSize=0)
 
 FileBuffer (FileMgr *fm, const ChunkKey &chunkKey, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
 
 ~FileBuffer () override
 Destructor. More...
 
Page addNewMultiPage (const int epoch)
 
void reserve (const size_t numBytes) override
 
void freePages ()
 
void read (int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void write (int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 Writes the contents of source (src) into new versions of the affected logical pages. More...
 
void append (int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void copyPage (Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
 
Data_Namespace::MemoryLevel getType () const override
 
int8_t * getMemoryPtr () override
 Not implemented for FileMgr – throws a runtime_error. More...
 
size_t pageCount () const override
 Returns the number of pages in the FileBuffer. More...
 
size_t pageSize () const override
 Returns the size in bytes of each page in the FileBuffer. More...
 
virtual size_t pageDataSize () const
 Returns the size in bytes of the data portion of each page in the FileBuffer. More...
 
virtual size_t reservedHeaderSize () const
 
virtual std::vector< MultiPagegetMultiPage () const
 Returns vector of MultiPages in the FileBuffer. More...
 
size_t size () const override
 
size_t reservedSize () const override
 Returns the total number of bytes allocated for the FileBuffer. More...
 
bool isDirty () const override
 Returns the total number of used bytes in the FileBuffer. More...
 
- Public Member Functions inherited from Data_Namespace::AbstractBuffer
 AbstractBuffer (const int deviceId)
 
 AbstractBuffer (const int deviceId, const SQLTypeInfo sqlType)
 
virtual ~AbstractBuffer ()
 
virtual int getDeviceId () const
 
virtual int pin ()
 
virtual int unPin ()
 
virtual int getPinCount ()
 
virtual bool isAppended () const
 
virtual bool isUpdated () const
 
virtual void setDirty ()
 
virtual void setUpdated ()
 
virtual void setAppended ()
 
void setSize (const size_t size)
 
void clearDirtyBits ()
 
void initEncoder (const SQLTypeInfo tmpSqlType)
 
void syncEncoder (const AbstractBuffer *srcBuffer)
 

Private Member Functions

void writeHeader (Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
 Write header writes header at top of page in format. More...
 
void writeMetadata (const int epoch)
 
void readMetadata (const Page &page)
 
void calcHeaderBuffer ()
 

Private Attributes

FileMgrfm_
 
MultiPage metadataPages_
 
std::vector< MultiPagemultiPages_
 
size_t pageSize_
 
size_t pageDataSize_
 
size_t reservedHeaderSize_
 
ChunkKey chunkKey_
 

Static Private Attributes

static size_t headerBufferOffset_ = 32
 

Friends

class FileMgr
 

Additional Inherited Members

- Public Attributes inherited from Data_Namespace::AbstractBuffer
std::unique_ptr< Encoderencoder
 
bool hasEncoder
 
SQLTypeInfo sqlType
 
- Protected Attributes inherited from Data_Namespace::AbstractBuffer
size_t size_
 
bool isDirty_
 
bool isAppended_
 
bool isUpdated_
 
int deviceId_
 

Detailed Description

Represents/provides access to contiguous data stored in the file system.

The FileBuffer consists of logical pages, which can map to any identically-sized page in any file of the underlying file system. A page's metadata (file and page number) are stored in MultiPage objects, and each MultiPage includes page metadata for multiple versions of the same page.

Note that a "Chunk" is brought into a FileBuffer by the FileMgr.

Note(s): Forbid Copying Idiom 4.1

Definition at line 55 of file FileBuffer.h.

Constructor & Destructor Documentation

◆ FileBuffer() [1/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const size_t  initialSize = 0 
)

Constructs a FileBuffer object.

Definition at line 37 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

41  : AbstractBuffer(fm->getDeviceId())
42  , fm_(fm)
45  , chunkKey_(chunkKey) {
46  // Create a new FileBuffer
47  CHECK(fm_);
50  //@todo reintroduce initialSize - need to develop easy way of
51  // differentiating these pre-allocated pages from "written-to" pages
52  /*
53  if (initalSize > 0) {
54  // should expand to initialSize bytes
55  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
56  int epoch = fm_->epoch();
57  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
58  Page page = addNewMultiPage(epoch);
59  writeHeader(page,pageNum,epoch);
60  }
61  }
62  */
63 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
AbstractBuffer(const int deviceId)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ FileBuffer() [2/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const SQLTypeInfo  sqlType,
const size_t  initialSize = 0 
)

Definition at line 65 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

70  : AbstractBuffer(fm->getDeviceId(), sqlType)
71  , fm_(fm)
74  , chunkKey_(chunkKey) {
75  CHECK(fm_);
78 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
AbstractBuffer(const int deviceId)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ FileBuffer() [3/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const ChunkKey chunkKey,
const std::vector< HeaderInfo >::const_iterator &  headerStartIt,
const std::vector< HeaderInfo >::const_iterator &  headerEndIt 
)

Definition at line 80 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, File_Namespace::MultiPage::epochs, logger::FATAL, fm_, LOG, metadataPages_, multiPages_, pageDataSize_, pageSize_, File_Namespace::MultiPage::pageVersions, readMetadata(), reservedHeaderSize_, and showChunk().

84  : AbstractBuffer(fm->getDeviceId())
85  , fm_(fm)
87  , pageSize_(0)
88  , chunkKey_(chunkKey) {
89  // We are being assigned an existing FileBuffer on disk
90 
91  CHECK(fm_);
93  // MultiPage multiPage(pageSize_); // why was this here?
94  int lastPageId = -1;
95  // Page lastMetadataPage;
96  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
97  int curPageId = vecIt->pageId;
98 
99  // We only want to read last metadata page
100  if (curPageId == -1) { // stats page
101  metadataPages_.epochs.push_back(vecIt->versionEpoch);
102  metadataPages_.pageVersions.push_back(vecIt->page);
103  } else {
104  if (curPageId != lastPageId) {
105  // protect from bad data on disk, and give diagnostics
106  if (curPageId != lastPageId + 1) {
107  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
108  << " Current page " << curPageId << " last page " << lastPageId
109  << " epoch " << vecIt->versionEpoch;
110  }
111  if (lastPageId == -1) {
112  // If we are on first real page
113  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
116  }
117  MultiPage multiPage(pageSize_);
118  multiPages_.push_back(multiPage);
119  lastPageId = curPageId;
120  }
121  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
122  multiPages_.back().pageVersions.push_back(vecIt->page);
123  }
124  if (curPageId == -1) { // meaning there was only a metadata page
127  }
128  }
129  // auto lastHeaderIt = std::prev(headerEndIt);
130  // size_ = lastHeaderIt->chunkSize;
131 }
#define LOG(tag)
Definition: Logger.h:182
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
AbstractBuffer(const int deviceId)
std::deque< Page > pageVersions
Definition: Page.h:71
std::deque< int > epochs
Definition: Page.h:72
#define CHECK(condition)
Definition: Logger.h:187
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:387
+ Here is the call graph for this function:

◆ ~FileBuffer()

File_Namespace::FileBuffer::~FileBuffer ( )
override

Destructor.

Definition at line 133 of file FileBuffer.cpp.

133  {
134  // need to free pages
135  // NOP
136 }

Member Function Documentation

◆ addNewMultiPage()

Page File_Namespace::FileBuffer::addNewMultiPage ( const int  epoch)

Definition at line 359 of file FileBuffer.cpp.

References File_Namespace::MultiPage::epochs, fm_, multiPages_, pageSize_, File_Namespace::MultiPage::pageVersions, and File_Namespace::FileMgr::requestFreePage().

Referenced by append(), reserve(), and write().

359  {
360  Page page = fm_->requestFreePage(pageSize_, false);
361  MultiPage multiPage(pageSize_);
362  multiPage.epochs.push_back(epoch);
363  multiPage.pageVersions.push_back(page);
364  multiPages_.push_back(multiPage);
365  return page;
366 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ append()

void File_Namespace::FileBuffer::append ( int8_t *  src,
const size_t  numBytes,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 455 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, File_Namespace::FileMgr::epoch(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::isAppended_, Data_Namespace::AbstractBuffer::isDirty_, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

458  {
459  isDirty_ = true;
460  isAppended_ = true;
461 
462  size_t startPage = size_ / pageDataSize_;
463  size_t startPageOffset = size_ % pageDataSize_;
464  size_t numPagesToWrite =
465  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
466  size_t bytesLeft = numBytes;
467  int8_t* curPtr = src; // a pointer to the current location in dst being written to
468  size_t initialNumPages = multiPages_.size();
469  size_ = size_ + numBytes;
470  int epoch = fm_->epoch();
471  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
472  Page page;
473  if (pageNum >= initialNumPages) {
474  page = addNewMultiPage(epoch);
475  writeHeader(page, pageNum, epoch);
476  } else {
477  // we already have a new page at current
478  // epoch for this page - just grab this page
479  page = multiPages_[pageNum].current();
480  }
481  CHECK(page.fileId >= 0); // make sure page was initialized
482  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
483  size_t bytesWritten;
484  if (pageNum == startPage) {
485  bytesWritten = fileInfo->write(
486  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
487  min(pageDataSize_ - startPageOffset, bytesLeft),
488  curPtr);
489  } else {
490  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
491  min(pageDataSize_, bytesLeft),
492  curPtr);
493  }
494  curPtr += bytesWritten;
495  bytesLeft -= bytesWritten;
496  }
497  CHECK(bytesLeft == 0);
498 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:359
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:368
int64_t * src
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:206
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ calcHeaderBuffer()

void File_Namespace::FileBuffer::calcHeaderBuffer ( )
private

Definition at line 149 of file FileBuffer.cpp.

References chunkKey_, headerBufferOffset_, and reservedHeaderSize_.

Referenced by FileBuffer().

149  {
150  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
151  // sizeof(size_t) is for chunkSize
152  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
153  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
154  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
155  if (headerMod > 0) {
157  }
158  // pageDataSize_ = pageSize_-reservedHeaderSize_;
159 }
static size_t headerBufferOffset_
Definition: FileBuffer.h:169
+ Here is the caller graph for this function:

◆ copyPage()

void File_Namespace::FileBuffer::copyPage ( Page srcPage,
Page destPage,
const size_t  numBytes,
const size_t  offset = 0 
)

Definition at line 339 of file FileBuffer.cpp.

References CHECK, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileInfo::read(), reservedHeaderSize_, and File_Namespace::FileInfo::write().

Referenced by write().

342  {
343  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
344  // FILE *destFile = fm_->files_[destPage.fileId]->f;
345  CHECK(offset + numBytes < pageDataSize_);
346  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
347  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
348 
349  int8_t* buffer = new int8_t[numBytes];
350  size_t bytesRead = srcFileInfo->read(
351  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
352  CHECK(bytesRead == numBytes);
353  size_t bytesWritten = destFileInfo->write(
354  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
355  CHECK(bytesWritten == numBytes);
356  delete[] buffer;
357 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freePages()

void File_Namespace::FileBuffer::freePages ( )

Definition at line 161 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), metadataPages_, multiPages_, and File_Namespace::MultiPage::pageVersions.

161  {
162  // Need to zero headers (actually just first four bytes of header)
163 
164  // First delete metadata pages
165  for (auto metaPageIt = metadataPages_.pageVersions.begin();
166  metaPageIt != metadataPages_.pageVersions.end();
167  ++metaPageIt) {
168  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
169  fileInfo->freePage(metaPageIt->pageNum);
170  }
171 
172  // Now delete regular pages
173  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
174  ++multiPageIt) {
175  for (auto pageIt = multiPageIt->pageVersions.begin();
176  pageIt != multiPageIt->pageVersions.end();
177  ++pageIt) {
178  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
179  fileInfo->freePage(pageIt->pageNum);
180  }
181  }
182 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::deque< Page > pageVersions
Definition: Page.h:71
void freePage(int pageId)
Definition: FileInfo.cpp:202
+ Here is the call graph for this function:

◆ getMemoryPtr()

int8_t* File_Namespace::FileBuffer::getMemoryPtr ( )
inlineoverridevirtual

Not implemented for FileMgr – throws a runtime_error.

Implements Data_Namespace::AbstractBuffer.

Definition at line 118 of file FileBuffer.h.

References logger::FATAL, and LOG.

118  {
119  LOG(FATAL) << "Operation not supported.";
120  return nullptr; // satisfy return-type warning
121  }
#define LOG(tag)
Definition: Logger.h:182

◆ getMultiPage()

virtual std::vector<MultiPage> File_Namespace::FileBuffer::getMultiPage ( ) const
inlinevirtual

Returns vector of MultiPages in the FileBuffer.

Definition at line 137 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and read().

137 { return multiPages_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
+ Here is the caller graph for this function:

◆ getType()

Data_Namespace::MemoryLevel File_Namespace::FileBuffer::getType ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 115 of file FileBuffer.h.

References Data_Namespace::DISK_LEVEL.

◆ isDirty()

bool File_Namespace::FileBuffer::isDirty ( ) const
inlineoverridevirtual

Returns the total number of used bytes in the FileBuffer.

Returns whether or not the FileBuffer has been modified since the last flush/checkpoint.

Reimplemented from Data_Namespace::AbstractBuffer.

Definition at line 149 of file FileBuffer.h.

◆ pageCount()

size_t File_Namespace::FileBuffer::pageCount ( ) const
inlineoverridevirtual

Returns the number of pages in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 124 of file FileBuffer.h.

124 { return multiPages_.size(); }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171

◆ pageDataSize()

virtual size_t File_Namespace::FileBuffer::pageDataSize ( ) const
inlinevirtual

Returns the size in bytes of the data portion of each page in the FileBuffer.

Definition at line 130 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

130 { return pageDataSize_; }
+ Here is the caller graph for this function:

◆ pageSize()

size_t File_Namespace::FileBuffer::pageSize ( ) const
inlineoverridevirtual

Returns the size in bytes of each page in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 127 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), File_Namespace::readForThread(), and writeHeader().

127 { return pageSize_; }
+ Here is the caller graph for this function:

◆ read()

void File_Namespace::FileBuffer::read ( int8_t *const  dst,
const size_t  numBytes = 0,
const size_t  offset = 0,
const MemoryLevel  dstMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 236 of file FileBuffer.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, logger::FATAL, fm_, getMultiPage(), File_Namespace::FileMgr::getNumReaderThreads(), LOG, File_Namespace::readThreadDS::multiPages, multiPages_, pageDataSize_, File_Namespace::readForThread(), File_Namespace::readThreadDS::t_bytesLeft, File_Namespace::readThreadDS::t_curPtr, File_Namespace::readThreadDS::t_endPage, File_Namespace::readThreadDS::t_fm, File_Namespace::readThreadDS::t_isFirstPage, File_Namespace::readThreadDS::t_startPage, and File_Namespace::readThreadDS::t_startPageOffset.

240  {
241  if (dstBufferType != CPU_LEVEL) {
242  LOG(FATAL) << "Unsupported Buffer type";
243  }
244 
245  // variable declarations
246  size_t startPage = offset / pageDataSize_;
247  size_t startPageOffset = offset % pageDataSize_;
248  size_t numPagesToRead =
249  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
250  /*
251  if (startPage + numPagesToRead > multiPages_.size()) {
252  cout << "Start page: " << startPage << endl;
253  cout << "Num pages to read: " << numPagesToRead << endl;
254  cout << "Num multipages: " << multiPages_.size() << endl;
255  cout << "Offset: " << offset << endl;
256  cout << "Num bytes: " << numBytes << endl;
257  }
258  */
259 
260  CHECK(startPage + numPagesToRead <= multiPages_.size());
261 
262  size_t numPagesPerThread = 0;
263  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
264  size_t bytesRead = 0; // total number of bytes already being read
265  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
266  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
267  size_t numThreads = fm_->getNumReaderThreads();
268  std::vector<readThreadDS>
269  threadDSArr; // array of threadDS, needed to avoid racing conditions
270 
271  if (numPagesToRead > numThreads) {
272  numPagesPerThread = numPagesToRead / numThreads;
273  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
274  } else {
275  numThreads = numPagesToRead;
276  numPagesPerThread = 1;
277  }
278 
279  /* set threadDS for the first thread */
280  readThreadDS threadDS;
281  threadDS.t_fm = fm_;
282  threadDS.t_startPage = offset / pageDataSize_;
283  if (numExtraPages > 0) {
284  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
285  numExtraPages--;
286  } else {
287  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
288  }
289  threadDS.t_curPtr = dst;
290  threadDS.t_startPageOffset = offset % pageDataSize_;
291  threadDS.t_isFirstPage = true;
292 
293  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
294  threadDS.t_startPageOffset),
295  numBytesCurrent);
296  threadDS.t_bytesLeft = bytesLeftForThread;
297  threadDS.multiPages = getMultiPage();
298 
299  if (numThreads == 1) {
300  bytesRead += readForThread(this, threadDS);
301  } else {
302  std::vector<std::future<size_t>> threads;
303 
304  for (size_t i = 0; i < numThreads; i++) {
305  threadDSArr.push_back(threadDS);
306  threads.push_back(
307  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
308 
309  // calculate elements of threadDS
310  threadDS.t_fm = fm_;
311  threadDS.t_isFirstPage = false;
312  threadDS.t_curPtr += bytesLeftForThread;
313  threadDS.t_startPage +=
314  threadDS.t_endPage -
315  threadDS.t_startPage; // based on # of pages read on previous iteration
316  if (numExtraPages > 0) {
317  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
318  numExtraPages--;
319  } else {
320  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
321  }
322  numBytesCurrent -= bytesLeftForThread;
323  bytesLeftForThread = min(
324  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
325  threadDS.t_bytesLeft = bytesLeftForThread;
326  threadDS.multiPages = getMultiPage();
327  }
328 
329  for (auto& p : threads) {
330  p.wait();
331  }
332  for (auto& p : threads) {
333  bytesRead += p.get();
334  }
335  }
336  CHECK(bytesRead == numBytes);
337 }
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:195
#define LOG(tag)
Definition: Logger.h:182
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
#define CHECK(condition)
Definition: Logger.h:187
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:212
+ Here is the call graph for this function:

◆ readMetadata()

void File_Namespace::FileBuffer::readMetadata ( const Page page)
private

Definition at line 387 of file FileBuffer.cpp.

References CHECK, Data_Namespace::AbstractBuffer::encoder, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::hasEncoder, Data_Namespace::AbstractBuffer::initEncoder(), METADATA_PAGE_SIZE, METADATA_VERSION, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, SQLTypeInfoCore< TYPE_FACET_PACK >::set_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_notnull(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_type(), Data_Namespace::AbstractBuffer::size_, and Data_Namespace::AbstractBuffer::sqlType.

Referenced by FileBuffer().

387  {
388  FILE* f = fm_->getFileForFileId(page.fileId);
389  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
390  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
391  fread((int8_t*)&size_, sizeof(size_t), 1, f);
392  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
393  // encodingType, encodingBits all as int
394  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
395  int version = typeData[0];
396  CHECK(version == METADATA_VERSION); // add backward compatibility code here
397  hasEncoder = static_cast<bool>(typeData[1]);
398  if (hasEncoder) {
399  sqlType.set_type(static_cast<SQLTypes>(typeData[2]));
400  sqlType.set_subtype(static_cast<SQLTypes>(typeData[3]));
401  sqlType.set_dimension(typeData[4]);
402  sqlType.set_scale(typeData[5]);
403  sqlType.set_notnull(static_cast<bool>(typeData[6]));
404  sqlType.set_compression(static_cast<EncodingType>(typeData[7]));
405  sqlType.set_comp_param(typeData[8]);
406  sqlType.set_size(typeData[9]);
408  encoder->readMetadata(f);
409  }
410 }
void initEncoder(const SQLTypeInfo tmpSqlType)
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
void set_size(int s)
Definition: sqltypes.h:421
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:918
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:413
void set_dimension(int d)
Definition: sqltypes.h:415
void set_scale(int s)
Definition: sqltypes.h:418
void set_compression(EncodingType c)
Definition: sqltypes.h:423
void set_notnull(bool n)
Definition: sqltypes.h:420
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:414
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_comp_param(int p)
Definition: sqltypes.h:424
#define CHECK(condition)
Definition: Logger.h:187
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reserve()

void File_Namespace::FileBuffer::reserve ( const size_t  numBytes)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 138 of file FileBuffer.cpp.

References addNewMultiPage(), File_Namespace::FileMgr::epoch(), fm_, multiPages_, pageSize_, and writeHeader().

138  {
139  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
140  size_t numCurrentPages = multiPages_.size();
141  int epoch = fm_->epoch();
142 
143  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
144  Page page = addNewMultiPage(epoch);
145  writeHeader(page, pageNum, epoch);
146  }
147 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:359
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:368
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:206
+ Here is the call graph for this function:

◆ reservedHeaderSize()

virtual size_t File_Namespace::FileBuffer::reservedHeaderSize ( ) const
inlinevirtual

Returns the size in bytes of the reserved header portion of each page in the FileBuffer.

Definition at line 134 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

134 { return reservedHeaderSize_; }
+ Here is the caller graph for this function:

◆ reservedSize()

size_t File_Namespace::FileBuffer::reservedSize ( ) const
inlineoverridevirtual

Returns the total number of bytes allocated for the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 142 of file FileBuffer.h.

142 { return multiPages_.size() * pageSize_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171

◆ size()

size_t File_Namespace::FileBuffer::size ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 139 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init().

139 { return size_; }
+ Here is the caller graph for this function:

◆ write()

void File_Namespace::FileBuffer::write ( int8_t *  src,
const size_t  numBytes,
const size_t  offset = 0,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Writes the contents of source (src) into new versions of the affected logical pages.

This method will write the contents of source (src) into new version of the affected logical pages. New pages are only appended if the value of epoch (in FileMgr)

Implements Data_Namespace::AbstractBuffer.

Definition at line 500 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, copyPage(), Data_Namespace::CPU_LEVEL, File_Namespace::FileMgr::epoch(), logger::FATAL, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::isAppended_, Data_Namespace::AbstractBuffer::isDirty_, Data_Namespace::AbstractBuffer::isUpdated_, LOG, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

504  {
505  if (srcBufferType != CPU_LEVEL) {
506  LOG(FATAL) << "Unsupported Buffer type";
507  }
508  isDirty_ = true;
509  if (offset < size_) {
510  isUpdated_ = true;
511  }
512  bool tempIsAppended = false;
513 
514  if (offset + numBytes > size_) {
515  tempIsAppended = true; // because isAppended_ could have already been true - to avoid
516  // rewriting header
517  isAppended_ = true;
518  size_ = offset + numBytes;
519  }
520 
521  size_t startPage = offset / pageDataSize_;
522  size_t startPageOffset = offset % pageDataSize_;
523  size_t numPagesToWrite =
524  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
525  size_t bytesLeft = numBytes;
526  int8_t* curPtr = src; // a pointer to the current location in dst being written to
527  size_t initialNumPages = multiPages_.size();
528  int epoch = fm_->epoch();
529 
530  if (startPage >
531  initialNumPages) { // means there is a gap we need to allocate pages for
532  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
533  Page page = addNewMultiPage(epoch);
534  writeHeader(page, pageNum, epoch);
535  }
536  }
537  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
538  Page page;
539  if (pageNum >= initialNumPages) {
540  page = addNewMultiPage(epoch);
541  writeHeader(page, pageNum, epoch);
542  } else if (multiPages_[pageNum].epochs.back() <
543  epoch) { // need to create new page b/c this current one lags epoch and we
544  // can't overwrite it also need to copy if we are on first or
545  // last page
546  Page lastPage = multiPages_[pageNum].current();
547  page = fm_->requestFreePage(pageSize_, false);
548  multiPages_[pageNum].epochs.push_back(epoch);
549  multiPages_[pageNum].pageVersions.push_back(page);
550  if (pageNum == startPage && startPageOffset > 0) {
551  // copyPage takes care of header offset so don't worry
552  // about it
553  copyPage(lastPage, page, startPageOffset, 0);
554  }
555  if (pageNum == startPage + numPagesToWrite &&
556  bytesLeft > 0) { // bytesLeft should always > 0
557  copyPage(lastPage,
558  page,
559  pageDataSize_ - bytesLeft,
560  bytesLeft); // these would be empty if we're appending but we won't
561  // worry about it right now
562  }
563  writeHeader(page, pageNum, epoch);
564  } else {
565  // we already have a new page at current
566  // epoch for this page - just grab this page
567  page = multiPages_[pageNum].current();
568  }
569  CHECK(page.fileId >= 0); // make sure page was initialized
570  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
571  size_t bytesWritten;
572  if (pageNum == startPage) {
573  bytesWritten = fileInfo->write(
574  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
575  min(pageDataSize_ - startPageOffset, bytesLeft),
576  curPtr);
577  } else {
578  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
579  min(pageDataSize_, bytesLeft),
580  curPtr);
581  }
582  curPtr += bytesWritten;
583  bytesLeft -= bytesWritten;
584  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
585  //@todo below can lead to undefined - we're overwriting num
586  // bytes valid at checkpoint
587  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
588  }
589  }
590  CHECK(bytesLeft == 0);
591 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:359
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:368
#define LOG(tag)
Definition: Logger.h:182
int64_t * src
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:206
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:187
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:339
+ Here is the call graph for this function:

◆ writeHeader()

void File_Namespace::FileBuffer::writeHeader ( Page page,
const int  pageId,
const int  epoch,
const bool  writeMetadata = false 
)
private

Write header writes header at top of page in format.

Definition at line 368 of file FileBuffer.cpp.

References chunkKey_, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), METADATA_PAGE_SIZE, File_Namespace::Page::pageNum, pageSize(), pageSize_, and File_Namespace::FileInfo::write().

Referenced by append(), File_Namespace::FileMgr::init(), reserve(), write(), and writeMetadata().

371  {
372  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
373  vector<int> header(intHeaderSize);
374  // in addition to chunkkey we need size of header, pageId, version
375  header[0] =
376  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
377  // value - sizeof(size_t) is for chunkSize
378  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
379  header[intHeaderSize - 2] = pageId;
380  header[intHeaderSize - 1] = epoch;
381  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
383  fileInfo->write(
384  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
385 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:412
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ writeMetadata()

void File_Namespace::FileBuffer::writeMetadata ( const int  epoch)
private

Definition at line 412 of file FileBuffer.cpp.

References Data_Namespace::AbstractBuffer::encoder, File_Namespace::MultiPage::epochs, File_Namespace::Page::fileId, fm_, SQLTypeInfoCore< TYPE_FACET_PACK >::get_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_notnull(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::hasEncoder, METADATA_PAGE_SIZE, METADATA_VERSION, metadataPages_, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, File_Namespace::MultiPage::pageVersions, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, Data_Namespace::AbstractBuffer::sqlType, and writeHeader().

412  {
413  // Right now stats page is size_ (in bytes), bufferType, encodingType,
414  // encodingDataType, numElements
415  Page page = fm_->requestFreePage(METADATA_PAGE_SIZE, true);
416  writeHeader(page, -1, epoch, true);
417  FILE* f = fm_->getFileForFileId(page.fileId);
418  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
419  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
420  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
421  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
422  // encodingType, encodingBits all as int
423  typeData[0] = METADATA_VERSION;
424  typeData[1] = static_cast<int>(hasEncoder);
425  if (hasEncoder) {
426  typeData[2] = static_cast<int>(sqlType.get_type());
427  typeData[3] = static_cast<int>(sqlType.get_subtype());
428  typeData[4] = sqlType.get_dimension();
429  typeData[5] = sqlType.get_scale();
430  typeData[6] = static_cast<int>(sqlType.get_notnull());
431  typeData[7] = static_cast<int>(sqlType.get_compression());
432  typeData[8] = sqlType.get_comp_param();
433  typeData[9] = sqlType.get_size();
434  }
435  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
436  if (hasEncoder) { // redundant
437  encoder->writeMetadata(f);
438  }
439  metadataPages_.epochs.push_back(epoch);
440  metadataPages_.pageVersions.push_back(page);
441 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:325
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:368
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:918
HOST DEVICE int get_scale() const
Definition: sqltypes.h:328
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
std::deque< Page > pageVersions
Definition: Page.h:71
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:324
std::deque< int > epochs
Definition: Page.h:72
#define NUM_METADATA
Definition: FileBuffer.h:35
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:332
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ FileMgr

friend class FileMgr
friend

Definition at line 56 of file FileBuffer.h.

Member Data Documentation

◆ chunkKey_

ChunkKey File_Namespace::FileBuffer::chunkKey_
private

Definition at line 175 of file FileBuffer.h.

Referenced by calcHeaderBuffer(), and writeHeader().

◆ fm_

FileMgr* File_Namespace::FileBuffer::fm_
private

◆ headerBufferOffset_

size_t File_Namespace::FileBuffer::headerBufferOffset_ = 32
staticprivate

Definition at line 169 of file FileBuffer.h.

Referenced by calcHeaderBuffer().

◆ metadataPages_

MultiPage File_Namespace::FileBuffer::metadataPages_
private

Definition at line 170 of file FileBuffer.h.

Referenced by FileBuffer(), freePages(), and writeMetadata().

◆ multiPages_

std::vector<MultiPage> File_Namespace::FileBuffer::multiPages_
private

◆ pageDataSize_

size_t File_Namespace::FileBuffer::pageDataSize_
private

Definition at line 173 of file FileBuffer.h.

Referenced by append(), copyPage(), FileBuffer(), read(), and write().

◆ pageSize_

size_t File_Namespace::FileBuffer::pageSize_
private

◆ reservedHeaderSize_

size_t File_Namespace::FileBuffer::reservedHeaderSize_
private

The documentation for this class was generated from the following files: