OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
File_Namespace::FileBuffer Class Reference

Represents/provides access to contiguous data stored in the file system. More...

#include <FileBuffer.h>

+ Inheritance diagram for File_Namespace::FileBuffer:
+ Collaboration diagram for File_Namespace::FileBuffer:

Public Member Functions

 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
 Constructs a FileBuffer object. More...
 
 FileBuffer (FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const SQLTypeInfo sqlType, const size_t initialSize=0)
 
 FileBuffer (FileMgr *fm, const ChunkKey &chunkKey, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
 
 ~FileBuffer () override
 Destructor. More...
 
Page addNewMultiPage (const int epoch)
 
void reserve (const size_t numBytes) override
 
void freePages ()
 
size_t freeChunkPages ()
 
void freeMetadataPages ()
 
void read (int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void write (int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 Writes the contents of source (src) into new versions of the affected logical pages. More...
 
void append (int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
 
void copyPage (Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
 
Data_Namespace::MemoryLevel getType () const override
 
int8_t * getMemoryPtr () override
 Not implemented for FileMgr – throws a runtime_error. More...
 
size_t pageCount () const override
 Returns the number of pages in the FileBuffer. More...
 
size_t pageSize () const override
 Returns the size in bytes of each page in the FileBuffer. More...
 
virtual size_t pageDataSize () const
 Returns the size in bytes of the data portion of each page in the FileBuffer. More...
 
virtual size_t reservedHeaderSize () const
 
virtual std::vector< MultiPagegetMultiPage () const
 Returns vector of MultiPages in the FileBuffer. More...
 
size_t reservedSize () const override
 Returns the total number of bytes allocated for the FileBuffer. More...
 
- Public Member Functions inherited from Data_Namespace::AbstractBuffer
 AbstractBuffer (const int device_id)
 
 AbstractBuffer (const int device_id, const SQLTypeInfo sql_type)
 
virtual ~AbstractBuffer ()
 
virtual int pin ()
 
virtual int unPin ()
 
virtual int getPinCount ()
 
size_t size () const
 
int getDeviceId () const
 
bool isDirty () const
 
bool isAppended () const
 
bool isUpdated () const
 
bool hasEncoder () const
 
SQLTypeInfo getSqlType () const
 
void setSqlType (const SQLTypeInfo &sql_type)
 
EncodergetEncoder () const
 
void setDirty ()
 
void setUpdated ()
 
void setAppended ()
 
void setSize (const size_t size)
 
void clearDirtyBits ()
 
void initEncoder (const SQLTypeInfo &tmp_sql_type)
 
void syncEncoder (const AbstractBuffer *src_buffer)
 
void copyTo (AbstractBuffer *destination_buffer, const size_t num_bytes=0)
 
void resetToEmpty ()
 

Private Member Functions

void writeHeader (Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
 Returns the total number of used bytes in the FileBuffer. More...
 
void writeMetadata (const int epoch)
 
void readMetadata (const Page &page)
 
void calcHeaderBuffer ()
 

Private Attributes

FileMgrfm_
 
MultiPage metadataPages_
 
std::vector< MultiPagemultiPages_
 
size_t pageSize_
 
size_t pageDataSize_
 
size_t reservedHeaderSize_
 
ChunkKey chunkKey_
 

Static Private Attributes

static size_t headerBufferOffset_ = 32
 

Friends

class FileMgr
 

Additional Inherited Members

- Protected Attributes inherited from Data_Namespace::AbstractBuffer
std::unique_ptr< Encoderencoder_
 
SQLTypeInfo sql_type_
 
size_t size_
 
int device_id_
 

Detailed Description

Represents/provides access to contiguous data stored in the file system.

The FileBuffer consists of logical pages, which can map to any identically-sized page in any file of the underlying file system. A page's metadata (file and page number) are stored in MultiPage objects, and each MultiPage includes page metadata for multiple versions of the same page.

Note that a "Chunk" is brought into a FileBuffer by the FileMgr.

Note(s): Forbid Copying Idiom 4.1

Definition at line 55 of file FileBuffer.h.

Constructor & Destructor Documentation

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const size_t  initialSize = 0 
)

Constructs a FileBuffer object.

Definition at line 40 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

44  : AbstractBuffer(fm->getDeviceId())
45  , fm_(fm)
48  , chunkKey_(chunkKey) {
49  // Create a new FileBuffer
50  CHECK(fm_);
53  //@todo reintroduce initialSize - need to develop easy way of
54  // differentiating these pre-allocated pages from "written-to" pages
55  /*
56  if (initalSize > 0) {
57  // should expand to initialSize bytes
58  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
59  int epoch = fm_->epoch();
60  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
61  Page page = addNewMultiPage(epoch);
62  writeHeader(page,pageNum,epoch);
63  }
64  }
65  */
66 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const size_t  pageSize,
const ChunkKey chunkKey,
const SQLTypeInfo  sqlType,
const size_t  initialSize = 0 
)

Definition at line 68 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, fm_, pageDataSize_, pageSize_, and reservedHeaderSize_.

73  : AbstractBuffer(fm->getDeviceId(), sqlType)
74  , fm_(fm)
77  , chunkKey_(chunkKey) {
78  CHECK(fm_);
81 }
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
AbstractBuffer(const int device_id)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

File_Namespace::FileBuffer::FileBuffer ( FileMgr fm,
const ChunkKey chunkKey,
const std::vector< HeaderInfo >::const_iterator &  headerStartIt,
const std::vector< HeaderInfo >::const_iterator &  headerEndIt 
)

Definition at line 83 of file FileBuffer.cpp.

References calcHeaderBuffer(), CHECK, File_Namespace::MultiPage::epochs, logger::FATAL, fm_, LOG, metadataPages_, multiPages_, pageDataSize_, pageSize_, File_Namespace::MultiPage::pageVersions, readMetadata(), reservedHeaderSize_, and show_chunk().

87  : AbstractBuffer(fm->getDeviceId())
88  , fm_(fm)
90  , pageSize_(0)
91  , chunkKey_(chunkKey) {
92  // We are being assigned an existing FileBuffer on disk
93 
94  CHECK(fm_);
96  // MultiPage multiPage(pageSize_); // why was this here?
97  int lastPageId = -1;
98  // Page lastMetadataPage;
99  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100  int curPageId = vecIt->pageId;
101 
102  // We only want to read last metadata page
103  if (curPageId == -1) { // stats page
104  metadataPages_.epochs.push_back(vecIt->versionEpoch);
105  metadataPages_.pageVersions.push_back(vecIt->page);
106  } else {
107  if (curPageId != lastPageId) {
108  // protect from bad data on disk, and give diagnostics
109  if (curPageId != lastPageId + 1) {
110  LOG(FATAL) << "Failure reading DB file " << show_chunk(chunkKey)
111  << " Current page " << curPageId << " last page " << lastPageId
112  << " epoch " << vecIt->versionEpoch;
113  }
114  if (lastPageId == -1) {
115  // If we are on first real page
116  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
119  }
120  MultiPage multiPage(pageSize_);
121  multiPages_.push_back(multiPage);
122  lastPageId = curPageId;
123  }
124  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
125  multiPages_.back().pageVersions.push_back(vecIt->page);
126  }
127  if (curPageId == -1) { // meaning there was only a metadata page
130  }
131  }
132  // auto lastHeaderIt = std::prev(headerEndIt);
133  // size_ = lastHeaderIt->chunkSize;
134 }
#define LOG(tag)
Definition: Logger.h:188
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
std::deque< Page > pageVersions
Definition: Page.h:71
AbstractBuffer(const int device_id)
std::deque< int > epochs
Definition: Page.h:72
#define CHECK(condition)
Definition: Logger.h:197
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:399

+ Here is the call graph for this function:

File_Namespace::FileBuffer::~FileBuffer ( )
override

Destructor.

Definition at line 136 of file FileBuffer.cpp.

136  {
137  // need to free pages
138  // NOP
139 }

Member Function Documentation

Page File_Namespace::FileBuffer::addNewMultiPage ( const int  epoch)

Definition at line 371 of file FileBuffer.cpp.

References File_Namespace::MultiPage::epochs, fm_, multiPages_, pageSize_, File_Namespace::MultiPage::pageVersions, and File_Namespace::FileMgr::requestFreePage().

Referenced by append(), reserve(), and write().

371  {
372  Page page = fm_->requestFreePage(pageSize_, false);
373  MultiPage multiPage(pageSize_);
374  multiPage.epochs.push_back(epoch);
375  multiPage.pageVersions.push_back(page);
376  multiPages_.push_back(multiPage);
377  return page;
378 }
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::append ( int8_t *  src,
const size_t  numBytes,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 455 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, File_Namespace::FileMgr::epoch(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, Data_Namespace::AbstractBuffer::setAppended(), Data_Namespace::AbstractBuffer::size_, File_Namespace::FileInfo::write(), and writeHeader().

Referenced by File_Namespace::FileMgr::putBuffer().

458  {
459  setAppended();
460 
461  size_t startPage = size_ / pageDataSize_;
462  size_t startPageOffset = size_ % pageDataSize_;
463  size_t numPagesToWrite =
464  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
465  size_t bytesLeft = numBytes;
466  int8_t* curPtr = src; // a pointer to the current location in dst being written to
467  size_t initialNumPages = multiPages_.size();
468  size_ = size_ + numBytes;
469  int epoch = fm_->epoch();
470  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
471  Page page;
472  if (pageNum >= initialNumPages) {
473  page = addNewMultiPage(epoch);
474  writeHeader(page, pageNum, epoch);
475  } else {
476  // we already have a new page at current
477  // epoch for this page - just grab this page
478  page = multiPages_[pageNum].current();
479  }
480  CHECK(page.fileId >= 0); // make sure page was initialized
481  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
482  size_t bytesWritten;
483  if (pageNum == startPage) {
484  bytesWritten = fileInfo->write(
485  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
486  min(pageDataSize_ - startPageOffset, bytesLeft),
487  curPtr);
488  } else {
489  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
490  min(pageDataSize_, bytesLeft),
491  curPtr);
492  }
493  curPtr += bytesWritten;
494  bytesLeft -= bytesWritten;
495  }
496  CHECK(bytesLeft == 0);
497 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:371
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Returns the total number of used bytes in the FileBuffer.
Definition: FileBuffer.cpp:380
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:201
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::calcHeaderBuffer ( )
private

Definition at line 152 of file FileBuffer.cpp.

References chunkKey_, headerBufferOffset_, and reservedHeaderSize_.

Referenced by FileBuffer().

152  {
153  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
154  // sizeof(size_t) is for chunkSize
155  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
156  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
157  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
158  if (headerMod > 0) {
160  }
161  // pageDataSize_ = pageSize_-reservedHeaderSize_;
162 }
static size_t headerBufferOffset_
Definition: FileBuffer.h:165

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::copyPage ( Page srcPage,
Page destPage,
const size_t  numBytes,
const size_t  offset = 0 
)

Definition at line 351 of file FileBuffer.cpp.

References CHECK, checked_malloc(), File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileInfo::read(), reservedHeaderSize_, and File_Namespace::FileInfo::write().

Referenced by write().

354  {
355  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
356  // FILE *destFile = fm_->files_[destPage.fileId]->f;
357  CHECK(offset + numBytes < pageDataSize_);
358  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
359  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
360 
361  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
362  size_t bytesRead = srcFileInfo->read(
363  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
364  CHECK(bytesRead == numBytes);
365  size_t bytesWritten = destFileInfo->write(
366  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
367  CHECK(bytesWritten == numBytes);
368  free(buffer);
369 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t File_Namespace::FileBuffer::freeChunkPages ( )

Definition at line 176 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), and multiPages_.

Referenced by foreign_storage::ForeignStorageCache::eraseChunk(), foreign_storage::ForeignStorageCache::eraseChunkByIterator(), and freePages().

176  {
177  size_t num_pages_freed = multiPages_.size();
178  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
179  ++multiPageIt) {
180  for (auto pageIt = multiPageIt->pageVersions.begin();
181  pageIt != multiPageIt->pageVersions.end();
182  ++pageIt) {
183  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
184  fileInfo->freePage(pageIt->pageNum);
185  }
186  }
187  multiPages_.clear();
188  return num_pages_freed;
189 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
void freePage(int pageId)
Definition: FileInfo.cpp:202

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::freeMetadataPages ( )

Definition at line 164 of file FileBuffer.cpp.

References fm_, File_Namespace::FileInfo::freePage(), File_Namespace::FileMgr::getFileInfoForFileId(), metadataPages_, File_Namespace::MultiPage::pageVersions, and File_Namespace::MultiPage::pop().

Referenced by freePages().

164  {
165  for (auto metaPageIt = metadataPages_.pageVersions.begin();
166  metaPageIt != metadataPages_.pageVersions.end();
167  ++metaPageIt) {
168  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
169  fileInfo->freePage(metaPageIt->pageNum);
170  }
171  while (metadataPages_.pageVersions.size() > 0) {
173  }
174 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void pop()
Purges the oldest Page.
Definition: Page.h:104
std::deque< Page > pageVersions
Definition: Page.h:71
void freePage(int pageId)
Definition: FileInfo.cpp:202

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::freePages ( )

Definition at line 191 of file FileBuffer.cpp.

References freeChunkPages(), and freeMetadataPages().

191  {
193  freeChunkPages();
194 }

+ Here is the call graph for this function:

int8_t* File_Namespace::FileBuffer::getMemoryPtr ( )
inlineoverridevirtual

Not implemented for FileMgr – throws a runtime_error.

Implements Data_Namespace::AbstractBuffer.

Definition at line 120 of file FileBuffer.h.

References logger::FATAL, and LOG.

120  {
121  LOG(FATAL) << "Operation not supported.";
122  return nullptr; // satisfy return-type warning
123  }
#define LOG(tag)
Definition: Logger.h:188
virtual std::vector<MultiPage> File_Namespace::FileBuffer::getMultiPage ( ) const
inlinevirtual

Returns vector of MultiPages in the FileBuffer.

Definition at line 139 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and read().

139 { return multiPages_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167

+ Here is the caller graph for this function:

Data_Namespace::MemoryLevel File_Namespace::FileBuffer::getType ( ) const
inlineoverridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 117 of file FileBuffer.h.

References Data_Namespace::DISK_LEVEL.

size_t File_Namespace::FileBuffer::pageCount ( ) const
inlineoverridevirtual

Returns the number of pages in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 126 of file FileBuffer.h.

126 { return multiPages_.size(); }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
virtual size_t File_Namespace::FileBuffer::pageDataSize ( ) const
inlinevirtual

Returns the size in bytes of the data portion of each page in the FileBuffer.

Definition at line 132 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

132 { return pageDataSize_; }

+ Here is the caller graph for this function:

size_t File_Namespace::FileBuffer::pageSize ( ) const
inlineoverridevirtual

Returns the size in bytes of each page in the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 129 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), File_Namespace::readForThread(), and writeHeader().

129 { return pageSize_; }

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::read ( int8_t *const  dst,
const size_t  numBytes = 0,
const size_t  offset = 0,
const MemoryLevel  dstMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 248 of file FileBuffer.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, logger::FATAL, fm_, getMultiPage(), File_Namespace::FileMgr::getNumReaderThreads(), LOG, File_Namespace::readThreadDS::multiPages, multiPages_, pageDataSize_, File_Namespace::readForThread(), File_Namespace::readThreadDS::t_bytesLeft, File_Namespace::readThreadDS::t_curPtr, File_Namespace::readThreadDS::t_endPage, File_Namespace::readThreadDS::t_fm, File_Namespace::readThreadDS::t_isFirstPage, File_Namespace::readThreadDS::t_startPage, and File_Namespace::readThreadDS::t_startPageOffset.

252  {
253  if (dstBufferType != CPU_LEVEL) {
254  LOG(FATAL) << "Unsupported Buffer type";
255  }
256 
257  // variable declarations
258  size_t startPage = offset / pageDataSize_;
259  size_t startPageOffset = offset % pageDataSize_;
260  size_t numPagesToRead =
261  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
262  /*
263  if (startPage + numPagesToRead > multiPages_.size()) {
264  cout << "Start page: " << startPage << endl;
265  cout << "Num pages to read: " << numPagesToRead << endl;
266  cout << "Num multipages: " << multiPages_.size() << endl;
267  cout << "Offset: " << offset << endl;
268  cout << "Num bytes: " << numBytes << endl;
269  }
270  */
271 
272  CHECK(startPage + numPagesToRead <= multiPages_.size());
273 
274  size_t numPagesPerThread = 0;
275  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
276  size_t bytesRead = 0; // total number of bytes already being read
277  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
278  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
279  size_t numThreads = fm_->getNumReaderThreads();
280  std::vector<readThreadDS>
281  threadDSArr; // array of threadDS, needed to avoid racing conditions
282 
283  if (numPagesToRead > numThreads) {
284  numPagesPerThread = numPagesToRead / numThreads;
285  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
286  } else {
287  numThreads = numPagesToRead;
288  numPagesPerThread = 1;
289  }
290 
291  /* set threadDS for the first thread */
292  readThreadDS threadDS;
293  threadDS.t_fm = fm_;
294  threadDS.t_startPage = offset / pageDataSize_;
295  if (numExtraPages > 0) {
296  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
297  numExtraPages--;
298  } else {
299  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
300  }
301  threadDS.t_curPtr = dst;
302  threadDS.t_startPageOffset = offset % pageDataSize_;
303  threadDS.t_isFirstPage = true;
304 
305  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
306  threadDS.t_startPageOffset),
307  numBytesCurrent);
308  threadDS.t_bytesLeft = bytesLeftForThread;
309  threadDS.multiPages = getMultiPage();
310 
311  if (numThreads == 1) {
312  bytesRead += readForThread(this, threadDS);
313  } else {
314  std::vector<std::future<size_t>> threads;
315 
316  for (size_t i = 0; i < numThreads; i++) {
317  threadDSArr.push_back(threadDS);
318  threads.push_back(
319  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
320 
321  // calculate elements of threadDS
322  threadDS.t_fm = fm_;
323  threadDS.t_isFirstPage = false;
324  threadDS.t_curPtr += bytesLeftForThread;
325  threadDS.t_startPage +=
326  threadDS.t_endPage -
327  threadDS.t_startPage; // based on # of pages read on previous iteration
328  if (numExtraPages > 0) {
329  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
330  numExtraPages--;
331  } else {
332  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
333  }
334  numBytesCurrent -= bytesLeftForThread;
335  bytesLeftForThread = min(
336  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
337  threadDS.t_bytesLeft = bytesLeftForThread;
338  threadDS.multiPages = getMultiPage();
339  }
340 
341  for (auto& p : threads) {
342  p.wait();
343  }
344  for (auto& p : threads) {
345  bytesRead += p.get();
346  }
347  }
348  CHECK(bytesRead == numBytes);
349 }
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:139
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:207
#define LOG(tag)
Definition: Logger.h:188
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
#define CHECK(condition)
Definition: Logger.h:197
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:207

+ Here is the call graph for this function:

void File_Namespace::FileBuffer::readMetadata ( const Page page)
private

Definition at line 399 of file FileBuffer.cpp.

References CHECK, Data_Namespace::AbstractBuffer::encoder_, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::initEncoder(), METADATA_PAGE_SIZE, METADATA_VERSION, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, reservedHeaderSize_, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_dimension(), SQLTypeInfo::set_notnull(), SQLTypeInfo::set_scale(), SQLTypeInfo::set_size(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Data_Namespace::AbstractBuffer::size_, Data_Namespace::AbstractBuffer::sql_type_, and setup::version.

Referenced by FileBuffer().

399  {
400  FILE* f = fm_->getFileForFileId(page.fileId);
401  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
402  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
403  fread((int8_t*)&size_, sizeof(size_t), 1, f);
404  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
405  // encodingType, encodingBits all as int
406  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
407  int version = typeData[0];
408  CHECK(version == METADATA_VERSION); // add backward compatibility code here
409  bool has_encoder = static_cast<bool>(typeData[1]);
410  if (has_encoder) {
411  sql_type_.set_type(static_cast<SQLTypes>(typeData[2]));
412  sql_type_.set_subtype(static_cast<SQLTypes>(typeData[3]));
413  sql_type_.set_dimension(typeData[4]);
414  sql_type_.set_scale(typeData[5]);
415  sql_type_.set_notnull(static_cast<bool>(typeData[6]));
416  sql_type_.set_compression(static_cast<EncodingType>(typeData[7]));
417  sql_type_.set_comp_param(typeData[8]);
418  sql_type_.set_size(typeData[9]);
420  encoder_->readMetadata(f);
421  }
422 }
void set_compression(EncodingType c)
Definition: sqltypes.h:430
void set_size(int s)
Definition: sqltypes.h:428
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:918
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:421
void initEncoder(const SQLTypeInfo &tmp_sql_type)
string version
Definition: setup.py:56
void set_scale(int s)
Definition: sqltypes.h:425
void set_comp_param(int p)
Definition: sqltypes.h:431
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:422
std::unique_ptr< Encoder > encoder_
void set_notnull(bool n)
Definition: sqltypes.h:427
#define CHECK(condition)
Definition: Logger.h:197
#define METADATA_VERSION
Definition: FileBuffer.h:36
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:420

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::reserve ( const size_t  numBytes)
overridevirtual

Implements Data_Namespace::AbstractBuffer.

Definition at line 141 of file FileBuffer.cpp.

References addNewMultiPage(), File_Namespace::FileMgr::epoch(), fm_, multiPages_, pageSize_, and writeHeader().

141  {
142  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
143  size_t numCurrentPages = multiPages_.size();
144  int epoch = fm_->epoch();
145 
146  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
147  Page page = addNewMultiPage(epoch);
148  writeHeader(page, pageNum, epoch);
149  }
150 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:371
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Returns the total number of used bytes in the FileBuffer.
Definition: FileBuffer.cpp:380
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:201

+ Here is the call graph for this function:

virtual size_t File_Namespace::FileBuffer::reservedHeaderSize ( ) const
inlinevirtual

Returns the size in bytes of the reserved header portion of each page in the FileBuffer.

Definition at line 136 of file FileBuffer.h.

Referenced by File_Namespace::FileMgr::init(), and File_Namespace::readForThread().

136 { return reservedHeaderSize_; }

+ Here is the caller graph for this function:

size_t File_Namespace::FileBuffer::reservedSize ( ) const
inlineoverridevirtual

Returns the total number of bytes allocated for the FileBuffer.

Implements Data_Namespace::AbstractBuffer.

Definition at line 142 of file FileBuffer.h.

142 { return multiPages_.size() * pageSize_; }
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
void File_Namespace::FileBuffer::write ( int8_t *  src,
const size_t  numBytes,
const size_t  offset = 0,
const MemoryLevel  srcMemoryLevel = CPU_LEVEL,
const int  deviceId = -1 
)
overridevirtual

Writes the contents of source (src) into new versions of the affected logical pages.

This method will write the contents of source (src) into new version of the affected logical pages. New pages are only appended if the value of epoch (in FileMgr)

Implements Data_Namespace::AbstractBuffer.

Definition at line 499 of file FileBuffer.cpp.

References addNewMultiPage(), CHECK, copyPage(), Data_Namespace::CPU_LEVEL, File_Namespace::FileMgr::epoch(), logger::FATAL, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), LOG, multiPages_, pageDataSize_, File_Namespace::Page::pageNum, pageSize_, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::setAppended(), Data_Namespace::AbstractBuffer::setDirty(), Data_Namespace::AbstractBuffer::setUpdated(), Data_Namespace::AbstractBuffer::size_, File_Namespace::FileInfo::write(), and writeHeader().

Referenced by File_Namespace::FileMgr::putBuffer().

503  {
504  if (srcBufferType != CPU_LEVEL) {
505  LOG(FATAL) << "Unsupported Buffer type";
506  }
507 
508  bool tempIsAppended = false;
509  setDirty();
510  if (offset < size_) {
511  setUpdated();
512  }
513  if (offset + numBytes > size_) {
514  tempIsAppended = true; // because is_appended_ could have already been true - to
515  // avoid rewriting header
516  setAppended();
517  size_ = offset + numBytes;
518  }
519 
520  size_t startPage = offset / pageDataSize_;
521  size_t startPageOffset = offset % pageDataSize_;
522  size_t numPagesToWrite =
523  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
524  size_t bytesLeft = numBytes;
525  int8_t* curPtr = src; // a pointer to the current location in dst being written to
526  size_t initialNumPages = multiPages_.size();
527  int epoch = fm_->epoch();
528 
529  if (startPage >
530  initialNumPages) { // means there is a gap we need to allocate pages for
531  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
532  Page page = addNewMultiPage(epoch);
533  writeHeader(page, pageNum, epoch);
534  }
535  }
536  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
537  Page page;
538  if (pageNum >= initialNumPages) {
539  page = addNewMultiPage(epoch);
540  writeHeader(page, pageNum, epoch);
541  } else if (multiPages_[pageNum].epochs.back() <
542  epoch) { // need to create new page b/c this current one lags epoch and we
543  // can't overwrite it also need to copy if we are on first or
544  // last page
545  Page lastPage = multiPages_[pageNum].current();
546  page = fm_->requestFreePage(pageSize_, false);
547  multiPages_[pageNum].epochs.push_back(epoch);
548  multiPages_[pageNum].pageVersions.push_back(page);
549  if (pageNum == startPage && startPageOffset > 0) {
550  // copyPage takes care of header offset so don't worry
551  // about it
552  copyPage(lastPage, page, startPageOffset, 0);
553  }
554  if (pageNum == startPage + numPagesToWrite &&
555  bytesLeft > 0) { // bytesLeft should always > 0
556  copyPage(lastPage,
557  page,
558  pageDataSize_ - bytesLeft,
559  bytesLeft); // these would be empty if we're appending but we won't
560  // worry about it right now
561  }
562  writeHeader(page, pageNum, epoch);
563  } else {
564  // we already have a new page at current
565  // epoch for this page - just grab this page
566  page = multiPages_[pageNum].current();
567  }
568  CHECK(page.fileId >= 0); // make sure page was initialized
569  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
570  size_t bytesWritten;
571  if (pageNum == startPage) {
572  bytesWritten = fileInfo->write(
573  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
574  min(pageDataSize_ - startPageOffset, bytesLeft),
575  curPtr);
576  } else {
577  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
578  min(pageDataSize_, bytesLeft),
579  curPtr);
580  }
581  curPtr += bytesWritten;
582  bytesLeft -= bytesWritten;
583  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
584  //@todo below can lead to undefined - we're overwriting num
585  // bytes valid at checkpoint
586  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
587  }
588  }
589  CHECK(bytesLeft == 0);
590 }
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:371
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Returns the total number of used bytes in the FileBuffer.
Definition: FileBuffer.cpp:380
#define LOG(tag)
Definition: Logger.h:188
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:167
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:201
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
#define CHECK(condition)
Definition: Logger.h:197
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:351

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::writeHeader ( Page page,
const int  pageId,
const int  epoch,
const bool  writeMetadata = false 
)
private

Returns the total number of used bytes in the FileBuffer.

Write header writes header at top of page in format

Definition at line 380 of file FileBuffer.cpp.

References chunkKey_, File_Namespace::Page::fileId, fm_, File_Namespace::FileMgr::getFileInfoForFileId(), METADATA_PAGE_SIZE, File_Namespace::Page::pageNum, pageSize(), pageSize_, and File_Namespace::FileInfo::write().

Referenced by append(), File_Namespace::FileMgr::init(), reserve(), write(), and writeMetadata().

383  {
384  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
385  vector<int> header(intHeaderSize);
386  // in addition to chunkkey we need size of header, pageId, version
387  header[0] =
388  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
389  // value - sizeof(size_t) is for chunkSize
390  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
391  header[intHeaderSize - 2] = pageId;
392  header[intHeaderSize - 1] = epoch;
393  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
395  fileInfo->write(
396  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
397 }
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:424

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void File_Namespace::FileBuffer::writeMetadata ( const int  epoch)
private

Definition at line 424 of file FileBuffer.cpp.

References Data_Namespace::AbstractBuffer::encoder_, File_Namespace::MultiPage::epochs, File_Namespace::Page::fileId, fm_, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_notnull(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), File_Namespace::FileMgr::getFileForFileId(), Data_Namespace::AbstractBuffer::hasEncoder(), METADATA_PAGE_SIZE, METADATA_VERSION, metadataPages_, NUM_METADATA, File_Namespace::Page::pageNum, pageSize_, File_Namespace::MultiPage::pageVersions, File_Namespace::FileMgr::requestFreePage(), reservedHeaderSize_, Data_Namespace::AbstractBuffer::size_, Data_Namespace::AbstractBuffer::sql_type_, and writeHeader().

424  {
425  // Right now stats page is size_ (in bytes), bufferType, encodingType,
426  // encodingDataType, numElements
427  Page page = fm_->requestFreePage(METADATA_PAGE_SIZE, true);
428  writeHeader(page, -1, epoch, true);
429  FILE* f = fm_->getFileForFileId(page.fileId);
430  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
431  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
432  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
433  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
434  // encodingType, encodingBits all as int
435  typeData[0] = METADATA_VERSION;
436  typeData[1] = static_cast<int>(hasEncoder());
437  if (hasEncoder()) {
438  typeData[2] = static_cast<int>(sql_type_.get_type());
439  typeData[3] = static_cast<int>(sql_type_.get_subtype());
440  typeData[4] = sql_type_.get_dimension();
441  typeData[5] = sql_type_.get_scale();
442  typeData[6] = static_cast<int>(sql_type_.get_notnull());
443  typeData[7] = static_cast<int>(sql_type_.get_compression());
444  typeData[8] = sql_type_.get_comp_param();
445  typeData[9] = sql_type_.get_size();
446  }
447  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
448  if (hasEncoder()) { // redundant
449  encoder_->writeMetadata(f);
450  }
451  metadataPages_.epochs.push_back(epoch);
452  metadataPages_.pageVersions.push_back(page);
453 }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:331
HOST DEVICE int get_size() const
Definition: sqltypes.h:340
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Returns the total number of used bytes in the FileBuffer.
Definition: FileBuffer.cpp:380
HOST DEVICE int get_scale() const
Definition: sqltypes.h:335
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:918
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:330
std::deque< Page > pageVersions
Definition: Page.h:71
std::deque< int > epochs
Definition: Page.h:72
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:338
#define NUM_METADATA
Definition: FileBuffer.h:35
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:332
std::unique_ptr< Encoder > encoder_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:339
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:337
#define METADATA_VERSION
Definition: FileBuffer.h:36

+ Here is the call graph for this function:

Friends And Related Function Documentation

friend class FileMgr
friend

Definition at line 56 of file FileBuffer.h.

Member Data Documentation

ChunkKey File_Namespace::FileBuffer::chunkKey_
private

Definition at line 171 of file FileBuffer.h.

Referenced by calcHeaderBuffer(), and writeHeader().

FileMgr* File_Namespace::FileBuffer::fm_
private
size_t File_Namespace::FileBuffer::headerBufferOffset_ = 32
staticprivate

Definition at line 165 of file FileBuffer.h.

Referenced by calcHeaderBuffer().

MultiPage File_Namespace::FileBuffer::metadataPages_
private

Definition at line 166 of file FileBuffer.h.

Referenced by FileBuffer(), freeMetadataPages(), and writeMetadata().

std::vector<MultiPage> File_Namespace::FileBuffer::multiPages_
private
size_t File_Namespace::FileBuffer::pageDataSize_
private

Definition at line 169 of file FileBuffer.h.

Referenced by append(), copyPage(), FileBuffer(), read(), and write().

size_t File_Namespace::FileBuffer::pageSize_
private
size_t File_Namespace::FileBuffer::reservedHeaderSize_
private

The documentation for this class was generated from the following files: