OmniSciDB  8a228a1076
File_Namespace::FileBuffer Class Reference

Represents/provides access to contiguous data stored in the file system. More...

#include <FileBuffer.h>

+ Inheritance diagram for File_Namespace::FileBuffer:
+ Collaboration diagram for File_Namespace::FileBuffer:

Public Member Functions

 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
 Constructs a FileBuffer object. More...
 
 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const SQLTypeInfo sqlType, const size_t initialSize=0)
 
 FileBuffer (FileMgr *fm, const ChunkKey &chunkKey, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
 
 ~FileBuffer () override
 Destructor. More...
 
Page addNewMultiPage (const int epoch)
 
void reserve (const size_t numBytes) override
 
void freePages ()
 
void freeChunkPages ()
 
void freeMetadataPages ()
 
void read (int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void write (int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 Writes the contents of source (src) into new versions of the affected logical pages. More...
 
void append (int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void copyPage (Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
 
Data_Namespace::MemoryLevel getType () const override
 
int8_t * getMemoryPtr () override
 Not implemented for FileMgr – throws a runtime_error. More...
 
size_t pageCount () const override
 Returns the number of pages in the FileBuffer. More...
 
size_t pageSize () const override
 Returns the size in bytes of each page in the FileBuffer. More...
 
virtual size_t pageDataSize () const
 Returns the size in bytes of the data portion of each page in the FileBuffer. More...
 
virtual size_t reservedHeaderSize () const
 
virtual std::vector< MultiPagegetMultiPage () const
 Returns vector of MultiPages in the FileBuffer. More...
 
size_t size () const override
 
size_t reservedSize () const override
 Returns the total number of bytes allocated for the FileBuffer. More...
 
bool isDirty () const override
 Returns the total number of used bytes in the FileBuffer. More...
 
- Public Member Functions inherited from Data_Namespace::AbstractBuffer
 AbstractBuffer (const int device_id)
 
 AbstractBuffer (const int device_id, const SQLTypeInfo sql_type)
 
virtual ~AbstractBuffer ()
 
virtual int getDeviceId () const
 
virtual int pin ()
 
virtual int unPin ()
 
virtual int getPinCount ()
 
virtual bool isAppended () const
 
virtual bool isUpdated () const
 
virtual void setDirty ()
 
virtual void setUpdated ()
 
virtual void setAppended ()
 
virtual void setSize (const size_t size)
 
void clearDirtyBits ()
 
void initEncoder (const SQLTypeInfo tmp_sql_type)
 
void syncEncoder (const AbstractBuffer *src_buffer)
 

Private Member Functions

void writeHeader (Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
 Write header writes header at top of page in format. More...
 
void writeMetadata (const int epoch)
 
void readMetadata (const Page &page)
 
void calcHeaderBuffer ()
 

Private Attributes

FileMgrfm_
 
MultiPage metadataPages_
 
std::vector< MultiPagemultiPages_
 
size_t pageSize_
 
size_t pageDataSize_
 
size_t reservedHeaderSize_
 
ChunkKey chunkKey_
 

Static Private Attributes

static size_t headerBufferOffset_ = 32
 

Friends

class FileMgr
 

Additional Inherited Members

- Public Attributes inherited from Data_Namespace::AbstractBuffer
std::unique_ptr< Encoderencoder
 
bool has_encoder
 
SQLTypeInfo sql_type
 
- Protected Attributes inherited from Data_Namespace::AbstractBuffer
size_t size_
 
bool is_dirty_
 
bool is_appended_
 
bool is_updated_
 
int device_id_
 

Detailed Description

Represents/provides access to contiguous data stored in the file system.

The FileBuffer consists of logical pages, which can map to any identically-sized page in any file of the underlying file system. A page's metadata (file and page number) are stored in MultiPage objects, and each MultiPage includes page metadata for multiple versions of the same page.

Note that a "Chunk" is brought into a FileBuffer by the FileMgr.

Note(s): Forbid Copying Idiom 4.1

Definition at line 55 of file FileBuffer.h.

Constructor & Destructor Documentation

◆ FileBuffer() [1/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const size_t  initialSize = 0 
)

Constructs a FileBuffer object.

Definition at line 40 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

44  : AbstractBuffer(fm->getDeviceId())
45  , fm_(fm)
48  , chunkKey_(chunkKey) {
49  // Create a new FileBuffer
50  CHECK(fm_);
53  //@todo reintroduce initialSize - need to develop easy way of
54  // differentiating these pre-allocated pages from "written-to" pages
55  /*
56  if (initalSize > 0) {
57  // should expand to initialSize bytes
58  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
59  int epoch = fm_->epoch();
60  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
61  Page page = addNewMultiPage(epoch);
62  writeHeader(page,pageNum,epoch);
63  }
64  }
65  */
66 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ FileBuffer() [2/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const SQLTypeInfo  sqlType,
const size_t  initialSize = 0 
)

Definition at line 68 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

73  : AbstractBuffer(fm->getDeviceId(), sqlType)
74  , fm_(fm)
77  , chunkKey_(chunkKey) {
78  CHECK(fm_);
81 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ FileBuffer() [3/3]

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const ChunkKey chunkKey,
const std::vector< HeaderInfo >::const_iterator &  headerStartIt,
const std::vector< HeaderInfo >::const_iterator &  headerEndIt 
)

Definition at line 83 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, File_Namespace::MultiPage::epochs, logger::FATAL, fm_, LOG, metadataPages_, multiPages_, pageDataSize_, pageSize_, File_Namespace::MultiPage::pageVersions, readMetadata(), reservedHeaderSize_, and showChunk().

87  : AbstractBuffer(fm->getDeviceId())
88  , fm_(fm)
90  , pageSize_(0)
91  , chunkKey_(chunkKey) {
92  // We are being assigned an existing FileBuffer on disk
93 
94  CHECK(fm_);
96  // MultiPage multiPage(pageSize_); // why was this here?
97  int lastPageId = -1;
98  // Page lastMetadataPage;
99  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100  int curPageId = vecIt->pageId;
101 
102  // We only want to read last metadata page
103  if (curPageId == -1) { // stats page
104  metadataPages_.epochs.push_back(vecIt->versionEpoch);
105  metadataPages_.pageVersions.push_back(vecIt->page);
106  } else {
107  if (curPageId != lastPageId) {
108  // protect from bad data on disk, and give diagnostics
109  if (curPageId != lastPageId + 1) {
110  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
111  << " Current page " << curPageId << " last page " << lastPageId
112  << " epoch " << vecIt->versionEpoch;
113  }
114  if (lastPageId == -1) {
115  // If we are on first real page
116  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
119  }
120  MultiPage multiPage(pageSize_);
121  multiPages_.push_back(multiPage);
122  lastPageId = curPageId;
123  }
124  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
125  multiPages_.back().pageVersions.push_back(vecIt->page);
126  }
127  if (curPageId == -1) { // meaning there was only a metadata page
130  }
131  }
132  // auto lastHeaderIt = std::prev(headerEndIt);
133  // size_ = lastHeaderIt->chunkSize;
134 }
#define LOG(tag)
Definition: Logger.h:188
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
std::string showChunk(const ChunkKey &key)
Definition: types.h:62
std::deque< Page > pageVersions
Definition: Page.h:71
AbstractBuffer(const int device_id)
std::deque< int > epochs
Definition: Page.h:72
#define CHECK(condition)
Definition: Logger.h:197
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:393
+ Here is the call graph for this function:

◆ ~FileBuffer()

File_Namespace::FileBuffer::~FileBuffer ( )
override

Destructor.

Definition at line 136 of file FileBuffer.cpp.

136  {
137  // need to free pages
138  // NOP
139 }

Member Function Documentation

◆ addNewMultiPage()

Page File_Namespace::FileBuffer::addNewMultiPage ( const int  epoch)

Definition at line 365 of file FileBuffer.cpp.

References File_Namespace::MultiPage::epochs, fm_, multiPages_, pageSize_, File_Namespace::MultiPage::pageVersions, and File_Namespace::FileMgr::requestFreePage().

Referenced by append(), reserve(), and write().

365  {
366  Page page = fm_->requestFreePage(pageSize_, false);
367  MultiPage multiPage(pageSize_);
368  multiPage.epochs.push_back(epoch);
369  multiPage.pageVersions.push_back(page);
370  multiPages_.push_back(multiPage);
371  return page;
372 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:823
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ append()

void File_Namespace::FileBuffer::append ( int8_t *  src,
const size_t  numBytes,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 461 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, File_Namespace::FileMgr::epoch(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::is_appended_, Data_Namespace::AbstractBuffer::is_dirty_, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

464  {
465  is_dirty_ = true;
466  is_appended_ = true;
467 
468  size_t startPage = size_ / pageDataSize_;
469  size_t startPageOffset = size_ % pageDataSize_;
470  size_t numPagesToWrite =
471  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
472  size_t bytesLeft = numBytes;
473  int8_t* curPtr = src; // a pointer to the current location in dst being written to
474  size_t initialNumPages = multiPages_.size();
475  size_ = size_ + numBytes;
476  int epoch = fm_->epoch();
477  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
478  Page page;
479  if (pageNum >= initialNumPages) {
480  page = addNewMultiPage(epoch);
481  writeHeader(page, pageNum, epoch);
482  } else {
483  // we already have a new page at current
484  // epoch for this page - just grab this page
485  page = multiPages_[pageNum].current();
486  }
487  CHECK(page.fileId >= 0); // make sure page was initialized
488  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
489  size_t bytesWritten;
490  if (pageNum == startPage) {
491  bytesWritten = fileInfo->write(
492  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
493  min(pageDataSize_ - startPageOffset, bytesLeft),
494  curPtr);
495  } else {
496  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
497  min(pageDataSize_, bytesLeft),
498  curPtr);
499  }
500  curPtr += bytesWritten;
501  bytesLeft -= bytesWritten;
502  }
503  CHECK(bytesLeft == 0);
504 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:365
int64_t * src
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:374
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ calcHeaderBuffer()

void File_Namespace::FileBuffer::calcHeaderBuffer ( )
private

Definition at line 152 of file FileBuffer.cpp.

References chunkKey_, headerBufferOffset_, and reservedHeaderSize_.

Referenced by FileBuffer().

152  {
153  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
154  // sizeof(size_t) is for chunkSize
155  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
156  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
157  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
158  if (headerMod > 0) {
160  }
161  // pageDataSize_ = pageSize_-reservedHeaderSize_;
162 }
static size_t headerBufferOffset_
Definition: FileBuffer.h:171
+ Here is the caller graph for this function:

◆ copyPage()

void File_Namespace::FileBuffer::copyPage ( Page srcPage,
Page destPage,
const size_t  numBytes,
const size_t  offset = 0 
)

Definition at line 345 of file FileBuffer.cpp.

References CHECK, checked_malloc(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileInfo::read(), reservedHeaderSize_, and File_Namespace::FileInfo::write().

Referenced by write().

348  {
349  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
350  // FILE *destFile = fm_->files_[destPage.fileId]->f;
351  CHECK(offset + numBytes < pageDataSize_);
352  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
353  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
354 
355  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
356  size_t bytesRead = srcFileInfo->read(
357  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
358  CHECK(bytesRead == numBytes);
359  size_t bytesWritten = destFileInfo->write(
360  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
361  CHECK(bytesWritten == numBytes);
362  free(buffer);
363 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freeChunkPages()

void File_Namespace::FileBuffer::freeChunkPages ( )

Definition at line 173 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), and multiPages_.

Referenced by foreign_storage::ForeignStorageCache::eraseChunk(), and freePages().

173  {
174  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
175  ++multiPageIt) {
176  for (auto pageIt = multiPageIt->pageVersions.begin();
177  pageIt != multiPageIt->pageVersions.end();
178  ++pageIt) {
179  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
180  fileInfo->freePage(pageIt->pageNum);
181  }
182  }
183 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
void freePage(int pageId)
Definition: FileInfo.cpp:202
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freeMetadataPages()

void File_Namespace::FileBuffer::freeMetadataPages ( )

Definition at line 164 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), metadataPages_, and File_Namespace::MultiPage::pageVersions.

Referenced by freePages().

164  {
165  for (auto metaPageIt = metadataPages_.pageVersions.begin();
166  metaPageIt != metadataPages_.pageVersions.end();
167  ++metaPageIt) {
168  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
169  fileInfo->freePage(metaPageIt->pageNum);
170  }
171 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::deque< Page > pageVersions
Definition: Page.h:71
void freePage(int pageId)
Definition: FileInfo.cpp:202
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freePages()

void File_Namespace::FileBuffer::freePages ( )

Definition at line 185 of file FileBuffer.cpp.

References freeChunkPages(), and freeMetadataPages().

185  {
187  freeChunkPages();
188 }
+ Here is the call graph for this function:

◆ getMemoryPtr()

int8_t* File_Namespace::FileBuffer::getMemoryPtr ( )
inlineoverridevirtual

Not implemented for FileMgr – throws a runtime_error.

Implements Data_Namespace::AbstractBuffer.

Definition at line 120 of file FileBuffer.h.

References logger::FATAL, and LOG.

120  {
121  LOG(FATAL) << "Operation not supported.";
122  return nullptr; // satisfy return-type warning
123  }
#define LOG(tag)
Definition: Logger.h:188

◆ getMultiPage()

virtual std::vector<MultiPage> File_Namespace::FileBuffer::getMultiPage ( ) const
inlinevirtual

Returns vector of MultiPages in the FileBuffer.

Definition at line 139 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and read().

139 { return multiPages_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
+ Here is the caller graph for this function:

◆ getType()

Data_Namespace::MemoryLevel File_Namespace::FileBuffer::getType ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 117 of file FileBuffer.h.

References Data_Namespace::DISK_LEVEL.

◆ isDirty()

bool File_Namespace::FileBuffer::isDirty ( ) const
inlineoverridevirtual

Returns the total number of used bytes in the FileBuffer.

Returns whether or not the FileBuffer has been modified since the last flush/checkpoint.

Reimplemented from Data_Namespace::AbstractBuffer.

Definition at line 151 of file FileBuffer.h.

◆ pageCount()

size_t File_Namespace::FileBuffer::pageCount ( ) const
inlineoverridevirtual

Returns the number of pages in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 126 of file FileBuffer.h.

126 { return multiPages_.size(); }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173

◆ pageDataSize()

virtual size_t File_Namespace::FileBuffer::pageDataSize ( ) const
inlinevirtual

Returns the size in bytes of the data portion of each page in the FileBuffer.

Definition at line 132 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

132 { return pageDataSize_; }
+ Here is the caller graph for this function:

◆ pageSize()

size_t File_Namespace::FileBuffer::pageSize ( ) const
inlineoverridevirtual

Returns the size in bytes of each page in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 129 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), File_Namespace::readForThread(), and writeHeader().

129 { return pageSize_; }
+ Here is the caller graph for this function:

◆ read()

void File_Namespace::FileBuffer::read ( int8_t *const  dst,
const size_t  numBytes = 0,
const size_t  offset = 0,
const MemoryLevel  dstMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 242 of file FileBuffer.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, logger::FATAL, fm_, getMultiPage(), File_Namespace::FileMgr::getNumReaderThreads(), LOG, File_Namespace::readThreadDS::multiPages, multiPages_, pageDataSize_, File_Namespace::readForThread(), File_Namespace::readThreadDS::t_bytesLeft, File_Namespace::readThreadDS::t_curPtr, File_Namespace::readThreadDS::t_endPage, File_Namespace::readThreadDS::t_fm, File_Namespace::readThreadDS::t_isFirstPage, File_Namespace::readThreadDS::t_startPage, and File_Namespace::readThreadDS::t_startPageOffset.

246  {
247  if (dstBufferType != CPU_LEVEL) {
248  LOG(FATAL) << "Unsupported Buffer type";
249  }
250 
251  // variable declarations
252  size_t startPage = offset / pageDataSize_;
253  size_t startPageOffset = offset % pageDataSize_;
254  size_t numPagesToRead =
255  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
256  /*
257  if (startPage + numPagesToRead > multiPages_.size()) {
258  cout << "Start page: " << startPage << endl;
259  cout << "Num pages to read: " << numPagesToRead << endl;
260  cout << "Num multipages: " << multiPages_.size() << endl;
261  cout << "Offset: " << offset << endl;
262  cout << "Num bytes: " << numBytes << endl;
263  }
264  */
265 
266  CHECK(startPage + numPagesToRead <= multiPages_.size());
267 
268  size_t numPagesPerThread = 0;
269  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
270  size_t bytesRead = 0; // total number of bytes already being read
271  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
272  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
273  size_t numThreads = fm_->getNumReaderThreads();
274  std::vector<readThreadDS>
275  threadDSArr; // array of threadDS, needed to avoid racing conditions
276 
277  if (numPagesToRead > numThreads) {
278  numPagesPerThread = numPagesToRead / numThreads;
279  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
280  } else {
281  numThreads = numPagesToRead;
282  numPagesPerThread = 1;
283  }
284 
285  /* set threadDS for the first thread */
286  readThreadDS threadDS;
287  threadDS.t_fm = fm_;
288  threadDS.t_startPage = offset / pageDataSize_;
289  if (numExtraPages > 0) {
290  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
291  numExtraPages--;
292  } else {
293  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
294  }
295  threadDS.t_curPtr = dst;
296  threadDS.t_startPageOffset = offset % pageDataSize_;
297  threadDS.t_isFirstPage = true;
298 
299  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
300  threadDS.t_startPageOffset),
301  numBytesCurrent);
302  threadDS.t_bytesLeft = bytesLeftForThread;
303  threadDS.multiPages = getMultiPage();
304 
305  if (numThreads == 1) {
306  bytesRead += readForThread(this, threadDS);
307  } else {
308  std::vector<std::future<size_t>> threads;
309 
310  for (size_t i = 0; i < numThreads; i++) {
311  threadDSArr.push_back(threadDS);
312  threads.push_back(
313  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
314 
315  // calculate elements of threadDS
316  threadDS.t_fm = fm_;
317  threadDS.t_isFirstPage = false;
318  threadDS.t_curPtr += bytesLeftForThread;
319  threadDS.t_startPage +=
320  threadDS.t_endPage -
321  threadDS.t_startPage; // based on # of pages read on previous iteration
322  if (numExtraPages > 0) {
323  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
324  numExtraPages--;
325  } else {
326  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
327  }
328  numBytesCurrent -= bytesLeftForThread;
329  bytesLeftForThread = min(
330  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
331  threadDS.t_bytesLeft = bytesLeftForThread;
332  threadDS.multiPages = getMultiPage();
333  }
334 
335  for (auto& p : threads) {
336  p.wait();
337  }
338  for (auto& p : threads) {
339  bytesRead += p.get();
340  }
341  }
342  CHECK(bytesRead == numBytes);
343 }
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:201
#define LOG(tag)
Definition: Logger.h:188
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:139
#define CHECK(condition)
Definition: Logger.h:197
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:208
+ Here is the call graph for this function:

◆ readMetadata()

void File_Namespace::FileBuffer::readMetadata ( const Page page)
private

Definition at line 393 of file FileBuffer.cpp.

References CHECK, Data_Namespace::AbstractBuffer::encoder, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::has_encoder, Data_Namespace::AbstractBuffer::initEncoder(), METADATA_PAGE_SIZE, METADATA_VERSION, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_dimension(), SQLTypeInfo::set_notnull(), SQLTypeInfo::set_scale(), SQLTypeInfo::set_size(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Data_Namespace::AbstractBuffer::size_, and Data_Namespace::AbstractBuffer::sql_type.

Referenced by FileBuffer().

393  {
394  FILE* f = fm_->getFileForFileId(page.fileId);
395  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
396  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
397  fread((int8_t*)&size_, sizeof(size_t), 1, f);
398  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
399  // encodingType, encodingBits all as int
400  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
401  int version = typeData[0];
402  CHECK(version == METADATA_VERSION); // add backward compatibility code here
403  has_encoder = static_cast<bool>(typeData[1]);
404  if (has_encoder) {
405  sql_type.set_type(static_cast<SQLTypes>(typeData[2]));
406  sql_type.set_subtype(static_cast<SQLTypes>(typeData[3]));
407  sql_type.set_dimension(typeData[4]);
408  sql_type.set_scale(typeData[5]);
409  sql_type.set_notnull(static_cast<bool>(typeData[6]));
410  sql_type.set_compression(static_cast<EncodingType>(typeData[7]));
411  sql_type.set_comp_param(typeData[8]);
412  sql_type.set_size(typeData[9]);
414  encoder->readMetadata(f);
415  }
416 }
void initEncoder(const SQLTypeInfo tmp_sql_type)
void set_compression(EncodingType c)
Definition: sqltypes.h:359
void set_size(int s)
Definition: sqltypes.h:357
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:940
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:350
void set_scale(int s)
Definition: sqltypes.h:354
void set_comp_param(int p)
Definition: sqltypes.h:360
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:351
void set_notnull(bool n)
Definition: sqltypes.h:356
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:349
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reserve()

void File_Namespace::FileBuffer::reserve ( const size_t  numBytes)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 141 of file FileBuffer.cpp.

References addNewMultiPage(), File_Namespace::FileMgr::epoch(), fm_, multiPages_, pageSize_, and writeHeader().

141  {
142  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
143  size_t numCurrentPages = multiPages_.size();
144  int epoch = fm_->epoch();
145 
146  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
147  Page page = addNewMultiPage(epoch);
148  writeHeader(page, pageNum, epoch);
149  }
150 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:365
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:374
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
+ Here is the call graph for this function:

◆ reservedHeaderSize()

virtual size_t File_Namespace::FileBuffer::reservedHeaderSize ( ) const
inlinevirtual

Returns the size in bytes of the reserved header portion of each page in the FileBuffer.

Definition at line 136 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

136 { return reservedHeaderSize_; }
+ Here is the caller graph for this function:

◆ reservedSize()

size_t File_Namespace::FileBuffer::reservedSize ( ) const
inlineoverridevirtual

Returns the total number of bytes allocated for the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 144 of file FileBuffer.h.

144 { return multiPages_.size() * pageSize_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173

◆ size()

size_t File_Namespace::FileBuffer::size ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 141 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init().

141 { return size_; }
+ Here is the caller graph for this function:

◆ write()

void File_Namespace::FileBuffer::write ( int8_t *  src,
const size_t  numBytes,
const size_t  offset = 0,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Writes the contents of source (src) into new versions of the affected logical pages.

This method will write the contents of source (src) into new version of the affected logical pages. New pages are only appended if the value of epoch (in FileMgr)

Implements Data_Namespace::AbstractBuffer.

Definition at line 506 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, copyPage(), Data_Namespace::CPU_LEVEL, File_Namespace::FileMgr::epoch(), logger::FATAL, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), Data_Namespace::AbstractBuffer::is_appended_, Data_Namespace::AbstractBuffer::is_dirty_, Data_Namespace::AbstractBuffer::is_updated_, LOG, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, src, File_Namespace::FileInfo::write(), and writeHeader().

510  {
511  if (srcBufferType != CPU_LEVEL) {
512  LOG(FATAL) << "Unsupported Buffer type";
513  }
514  is_dirty_ = true;
515  if (offset < size_) {
516  is_updated_ = true;
517  }
518  bool tempIsAppended = false;
519 
520  if (offset + numBytes > size_) {
521  tempIsAppended = true; // because is_appended_ could have already been true - to
522  // avoid rewriting header
523  is_appended_ = true;
524  size_ = offset + numBytes;
525  }
526 
527  size_t startPage = offset / pageDataSize_;
528  size_t startPageOffset = offset % pageDataSize_;
529  size_t numPagesToWrite =
530  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
531  size_t bytesLeft = numBytes;
532  int8_t* curPtr = src; // a pointer to the current location in dst being written to
533  size_t initialNumPages = multiPages_.size();
534  int epoch = fm_->epoch();
535 
536  if (startPage >
537  initialNumPages) { // means there is a gap we need to allocate pages for
538  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
539  Page page = addNewMultiPage(epoch);
540  writeHeader(page, pageNum, epoch);
541  }
542  }
543  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
544  Page page;
545  if (pageNum >= initialNumPages) {
546  page = addNewMultiPage(epoch);
547  writeHeader(page, pageNum, epoch);
548  } else if (multiPages_[pageNum].epochs.back() <
549  epoch) { // need to create new page b/c this current one lags epoch and we
550  // can't overwrite it also need to copy if we are on first or
551  // last page
552  Page lastPage = multiPages_[pageNum].current();
553  page = fm_->requestFreePage(pageSize_, false);
554  multiPages_[pageNum].epochs.push_back(epoch);
555  multiPages_[pageNum].pageVersions.push_back(page);
556  if (pageNum == startPage && startPageOffset > 0) {
557  // copyPage takes care of header offset so don't worry
558  // about it
559  copyPage(lastPage, page, startPageOffset, 0);
560  }
561  if (pageNum == startPage + numPagesToWrite &&
562  bytesLeft > 0) { // bytesLeft should always > 0
563  copyPage(lastPage,
564  page,
565  pageDataSize_ - bytesLeft,
566  bytesLeft); // these would be empty if we're appending but we won't
567  // worry about it right now
568  }
569  writeHeader(page, pageNum, epoch);
570  } else {
571  // we already have a new page at current
572  // epoch for this page - just grab this page
573  page = multiPages_[pageNum].current();
574  }
575  CHECK(page.fileId >= 0); // make sure page was initialized
576  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
577  size_t bytesWritten;
578  if (pageNum == startPage) {
579  bytesWritten = fileInfo->write(
580  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
581  min(pageDataSize_ - startPageOffset, bytesLeft),
582  curPtr);
583  } else {
584  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
585  min(pageDataSize_, bytesLeft),
586  curPtr);
587  }
588  curPtr += bytesWritten;
589  bytesLeft -= bytesWritten;
590  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
591  //@todo below can lead to undefined - we're overwriting num
592  // bytes valid at checkpoint
593  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
594  }
595  }
596  CHECK(bytesLeft == 0);
597 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:365
int64_t * src
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:823
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:374
#define LOG(tag)
Definition: Logger.h:188
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:197
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:345
+ Here is the call graph for this function:

◆ writeHeader()

void File_Namespace::FileBuffer::writeHeader ( Page page,
const int  pageId,
const int  epoch,
const bool  writeMetadata = false 
)
private

Write header writes header at top of page in format.

Definition at line 374 of file FileBuffer.cpp.

References chunkKey_, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), METADATA_PAGE_SIZE, File_Namespace::Page::pageNum, pageSize(), pageSize_, and File_Namespace::FileInfo::write().

Referenced by append(), File_Namespace::FileMgr::init(), reserve(), write(), and writeMetadata().

377  {
378  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
379  vector<int> header(intHeaderSize);
380  // in addition to chunkkey we need size of header, pageId, version
381  header[0] =
382  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
383  // value - sizeof(size_t) is for chunkSize
384  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
385  header[intHeaderSize - 2] = pageId;
386  header[intHeaderSize - 1] = epoch;
387  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
389  fileInfo->write(
390  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
391 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:418
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ writeMetadata()

void File_Namespace::FileBuffer::writeMetadata ( const int  epoch)
private

Definition at line 418 of file FileBuffer.cpp.

References Data_Namespace::AbstractBuffer::encoder, File_Namespace::MultiPage::epochs, File_Namespace::Page::fileId, fm_, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_notnull(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::has_encoder, METADATA_PAGE_SIZE, METADATA_VERSION, metadataPages_, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, File_Namespace::MultiPage::pageVersions, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, Data_Namespace::AbstractBuffer::sql_type, and writeHeader().

418  {
419  // Right now stats page is size_ (in bytes), bufferType, encodingType,
420  // encodingDataType, numElements
421  Page page = fm_->requestFreePage(METADATA_PAGE_SIZE, true);
422  writeHeader(page, -1, epoch, true);
423  FILE* f = fm_->getFileForFileId(page.fileId);
424  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
425  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
426  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
427  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
428  // encodingType, encodingBits all as int
429  typeData[0] = METADATA_VERSION;
430  typeData[1] = static_cast<int>(has_encoder);
431  if (has_encoder) {
432  typeData[2] = static_cast<int>(sql_type.get_type());
433  typeData[3] = static_cast<int>(sql_type.get_subtype());
434  typeData[4] = sql_type.get_dimension();
435  typeData[5] = sql_type.get_scale();
436  typeData[6] = static_cast<int>(sql_type.get_notnull());
437  typeData[7] = static_cast<int>(sql_type.get_compression());
438  typeData[8] = sql_type.get_comp_param();
439  typeData[9] = sql_type.get_size();
440  }
441  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
442  if (has_encoder) { // redundant
443  encoder->writeMetadata(f);
444  }
445  metadataPages_.epochs.push_back(epoch);
446  metadataPages_.pageVersions.push_back(page);
447 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:823
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:374
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:940
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
std::deque< Page > pageVersions
Definition: Page.h:71
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:260
std::deque< int > epochs
Definition: Page.h:72
#define NUM_METADATA
Definition: FileBuffer.h:35
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:261
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ FileMgr

friend class FileMgr
friend

Definition at line 56 of file FileBuffer.h.

Member Data Documentation

◆ chunkKey_

ChunkKey File_Namespace::FileBuffer::chunkKey_
private

Definition at line 177 of file FileBuffer.h.

Referenced by calcHeaderBuffer(), and writeHeader().

◆ fm_

FileMgr* File_Namespace::FileBuffer::fm_
private

◆ headerBufferOffset_

size_t File_Namespace::FileBuffer::headerBufferOffset_ = 32
staticprivate

Definition at line 171 of file FileBuffer.h.

Referenced by calcHeaderBuffer().

◆ metadataPages_

MultiPage File_Namespace::FileBuffer::metadataPages_
private

Definition at line 172 of file FileBuffer.h.

Referenced by FileBuffer(), freeMetadataPages(), and writeMetadata().

◆ multiPages_

std::vector<MultiPage> File_Namespace::FileBuffer::multiPages_
private

◆ pageDataSize_

size_t File_Namespace::FileBuffer::pageDataSize_
private

Definition at line 175 of file FileBuffer.h.

Referenced by append(), copyPage(), FileBuffer(), read(), and write().

◆ pageSize_

size_t File_Namespace::FileBuffer::pageSize_
private

◆ reservedHeaderSize_

size_t File_Namespace::FileBuffer::reservedHeaderSize_
private

The documentation for this class was generated from the following files: