OmniSciDB  04ee39c94c
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "FileBuffer.h"
24 #include <future>
25 #include <map>
26 #include <thread>
27 #include "../../Shared/File.h"
28 #include "FileMgr.h"
29 
30 #define METADATA_PAGE_SIZE 4096
31 
32 using namespace std;
33 
34 namespace File_Namespace {
35 size_t FileBuffer::headerBufferOffset_ = 32;
36 
37 FileBuffer::FileBuffer(FileMgr* fm,
38  const size_t pageSize,
39  const ChunkKey& chunkKey,
40  const size_t initialSize)
41  : AbstractBuffer(fm->getDeviceId())
42  , fm_(fm)
43  , metadataPages_(METADATA_PAGE_SIZE)
44  , pageSize_(pageSize)
45  , chunkKey_(chunkKey) {
46  // Create a new FileBuffer
47  CHECK(fm_);
50  //@todo reintroduce initialSize - need to develop easy way of
51  // differentiating these pre-allocated pages from "written-to" pages
52  /*
53  if (initalSize > 0) {
54  // should expand to initialSize bytes
55  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
56  int epoch = fm_->epoch();
57  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
58  Page page = addNewMultiPage(epoch);
59  writeHeader(page,pageNum,epoch);
60  }
61  }
62  */
63 }
64 
66  const size_t pageSize,
67  const ChunkKey& chunkKey,
68  const SQLTypeInfo sqlType,
69  const size_t initialSize)
70  : AbstractBuffer(fm->getDeviceId(), sqlType)
71  , fm_(fm)
73  , pageSize_(pageSize)
74  , chunkKey_(chunkKey) {
75  CHECK(fm_);
78 }
79 
81  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
82  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
83  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
85  , fm_(fm)
87  , pageSize_(0)
88  , chunkKey_(chunkKey) {
89  // We are being assigned an existing FileBuffer on disk
90 
91  CHECK(fm_);
93  // MultiPage multiPage(pageSize_); // why was this here?
94  int lastPageId = -1;
95  // Page lastMetadataPage;
96  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
97  int curPageId = vecIt->pageId;
98 
99  // We only want to read last metadata page
100  if (curPageId == -1) { // stats page
101  metadataPages_.epochs.push_back(vecIt->versionEpoch);
102  metadataPages_.pageVersions.push_back(vecIt->page);
103  } else {
104  if (curPageId != lastPageId) {
105  // protect from bad data on disk, and give diagnostics
106  if (curPageId != lastPageId + 1) {
107  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
108  << " Current page " << curPageId << " last page " << lastPageId
109  << " epoch " << vecIt->versionEpoch;
110  }
111  if (lastPageId == -1) {
112  // If we are on first real page
113  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
116  }
117  MultiPage multiPage(pageSize_);
118  multiPages_.push_back(multiPage);
119  lastPageId = curPageId;
120  }
121  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
122  multiPages_.back().pageVersions.push_back(vecIt->page);
123  }
124  if (curPageId == -1) { // meaning there was only a metadata page
127  }
128  }
129  // auto lastHeaderIt = std::prev(headerEndIt);
130  // size_ = lastHeaderIt->chunkSize;
131 }
132 
134  // need to free pages
135  // NOP
136 }
137 
138 void FileBuffer::reserve(const size_t numBytes) {
139  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
140  size_t numCurrentPages = multiPages_.size();
141  int epoch = fm_->epoch();
142 
143  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
144  Page page = addNewMultiPage(epoch);
145  writeHeader(page, pageNum, epoch);
146  }
147 }
148 
150  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
151  // sizeof(size_t) is for chunkSize
152  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
153  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
154  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
155  if (headerMod > 0) {
157  }
158  // pageDataSize_ = pageSize_-reservedHeaderSize_;
159 }
160 
162  // Need to zero headers (actually just first four bytes of header)
163 
164  // First delete metadata pages
165  for (auto metaPageIt = metadataPages_.pageVersions.begin();
166  metaPageIt != metadataPages_.pageVersions.end();
167  ++metaPageIt) {
168  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
169  fileInfo->freePage(metaPageIt->pageNum);
170  }
171 
172  // Now delete regular pages
173  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
174  ++multiPageIt) {
175  for (auto pageIt = multiPageIt->pageVersions.begin();
176  pageIt != multiPageIt->pageVersions.end();
177  ++pageIt) {
178  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
179  fileInfo->freePage(pageIt->pageNum);
180  }
181  }
182 }
183 
184 struct readThreadDS {
185  FileMgr* t_fm; // ptr to FileMgr
186  size_t t_startPage; // start page for the thread
187  size_t t_endPage; // last page for the thread
188  int8_t* t_curPtr; // pointer to the current location of the target for the thread
189  size_t t_bytesLeft; // number of bytes to be read in the thread
190  size_t t_startPageOffset; // offset - used for the first page of the buffer
191  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
192  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
193 };
194 
195 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
196  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
197  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
198  int8_t* curPtr = threadDS.t_curPtr;
199  size_t bytesLeft = threadDS.t_bytesLeft;
200  size_t totalBytesRead = 0;
201  bool isFirstPage = threadDS.t_isFirstPage;
202 
203  // Traverse the logical pages
204  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
205  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
206  Page page = threadDS.multiPages[pageNum].current();
207 
208  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
209  CHECK(fileInfo);
210 
211  // Read the page into the destination (dst) buffer at its
212  // current (cur) location
213  size_t bytesRead = 0;
214  if (isFirstPage) {
215  bytesRead = fileInfo->read(
216  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
217  fileBuffer->reservedHeaderSize(),
218  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
219  curPtr);
220  isFirstPage = false;
221  } else {
222  bytesRead = fileInfo->read(
223  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
224  min(fileBuffer->pageDataSize(), bytesLeft),
225  curPtr);
226  }
227  curPtr += bytesRead;
228  bytesLeft -= bytesRead;
229  totalBytesRead += bytesRead;
230  }
231  CHECK(bytesLeft == 0);
232 
233  return (totalBytesRead);
234 }
235 
236 void FileBuffer::read(int8_t* const dst,
237  const size_t numBytes,
238  const size_t offset,
239  const MemoryLevel dstBufferType,
240  const int deviceId) {
241  if (dstBufferType != CPU_LEVEL) {
242  LOG(FATAL) << "Unsupported Buffer type";
243  }
244 
245  // variable declarations
246  size_t startPage = offset / pageDataSize_;
247  size_t startPageOffset = offset % pageDataSize_;
248  size_t numPagesToRead =
249  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
250  /*
251  if (startPage + numPagesToRead > multiPages_.size()) {
252  cout << "Start page: " << startPage << endl;
253  cout << "Num pages to read: " << numPagesToRead << endl;
254  cout << "Num multipages: " << multiPages_.size() << endl;
255  cout << "Offset: " << offset << endl;
256  cout << "Num bytes: " << numBytes << endl;
257  }
258  */
259 
260  CHECK(startPage + numPagesToRead <= multiPages_.size());
261 
262  size_t numPagesPerThread = 0;
263  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
264  size_t bytesRead = 0; // total number of bytes already being read
265  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
266  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
267  size_t numThreads = fm_->getNumReaderThreads();
268  std::vector<readThreadDS>
269  threadDSArr; // array of threadDS, needed to avoid racing conditions
270 
271  if (numPagesToRead > numThreads) {
272  numPagesPerThread = numPagesToRead / numThreads;
273  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
274  } else {
275  numThreads = numPagesToRead;
276  numPagesPerThread = 1;
277  }
278 
279  /* set threadDS for the first thread */
280  readThreadDS threadDS;
281  threadDS.t_fm = fm_;
282  threadDS.t_startPage = offset / pageDataSize_;
283  if (numExtraPages > 0) {
284  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
285  numExtraPages--;
286  } else {
287  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
288  }
289  threadDS.t_curPtr = dst;
290  threadDS.t_startPageOffset = offset % pageDataSize_;
291  threadDS.t_isFirstPage = true;
292 
293  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
294  threadDS.t_startPageOffset),
295  numBytesCurrent);
296  threadDS.t_bytesLeft = bytesLeftForThread;
297  threadDS.multiPages = getMultiPage();
298 
299  if (numThreads == 1) {
300  bytesRead += readForThread(this, threadDS);
301  } else {
302  std::vector<std::future<size_t>> threads;
303 
304  for (size_t i = 0; i < numThreads; i++) {
305  threadDSArr.push_back(threadDS);
306  threads.push_back(
307  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
308 
309  // calculate elements of threadDS
310  threadDS.t_fm = fm_;
311  threadDS.t_isFirstPage = false;
312  threadDS.t_curPtr += bytesLeftForThread;
313  threadDS.t_startPage +=
314  threadDS.t_endPage -
315  threadDS.t_startPage; // based on # of pages read on previous iteration
316  if (numExtraPages > 0) {
317  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
318  numExtraPages--;
319  } else {
320  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
321  }
322  numBytesCurrent -= bytesLeftForThread;
323  bytesLeftForThread = min(
324  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
325  threadDS.t_bytesLeft = bytesLeftForThread;
326  threadDS.multiPages = getMultiPage();
327  }
328 
329  for (auto& p : threads) {
330  p.wait();
331  }
332  for (auto& p : threads) {
333  bytesRead += p.get();
334  }
335  }
336  CHECK(bytesRead == numBytes);
337 }
338 
340  Page& destPage,
341  const size_t numBytes,
342  const size_t offset) {
343  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
344  // FILE *destFile = fm_->files_[destPage.fileId]->f;
345  CHECK(offset + numBytes < pageDataSize_);
346  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
347  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
348 
349  int8_t* buffer = new int8_t[numBytes];
350  size_t bytesRead = srcFileInfo->read(
351  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
352  CHECK(bytesRead == numBytes);
353  size_t bytesWritten = destFileInfo->write(
354  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
355  CHECK(bytesWritten == numBytes);
356  delete[] buffer;
357 }
358 
360  Page page = fm_->requestFreePage(pageSize_, false);
361  MultiPage multiPage(pageSize_);
362  multiPage.epochs.push_back(epoch);
363  multiPage.pageVersions.push_back(page);
364  multiPages_.push_back(multiPage);
365  return page;
366 }
367 
369  const int pageId,
370  const int epoch,
371  const bool writeMetadata) {
372  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
373  vector<int> header(intHeaderSize);
374  // in addition to chunkkey we need size of header, pageId, version
375  header[0] =
376  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
377  // value - sizeof(size_t) is for chunkSize
378  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
379  header[intHeaderSize - 2] = pageId;
380  header[intHeaderSize - 1] = epoch;
381  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
382  size_t pageSize = writeMetadata ? METADATA_PAGE_SIZE : pageSize_;
383  fileInfo->write(
384  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
385 }
386 
387 void FileBuffer::readMetadata(const Page& page) {
388  FILE* f = fm_->getFileForFileId(page.fileId);
389  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
390  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
391  fread((int8_t*)&size_, sizeof(size_t), 1, f);
392  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
393  // encodingType, encodingBits all as int
394  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
395  int version = typeData[0];
396  CHECK(version == METADATA_VERSION); // add backward compatibility code here
397  hasEncoder = static_cast<bool>(typeData[1]);
398  if (hasEncoder) {
399  sqlType.set_type(static_cast<SQLTypes>(typeData[2]));
400  sqlType.set_subtype(static_cast<SQLTypes>(typeData[3]));
401  sqlType.set_dimension(typeData[4]);
402  sqlType.set_scale(typeData[5]);
403  sqlType.set_notnull(static_cast<bool>(typeData[6]));
404  sqlType.set_compression(static_cast<EncodingType>(typeData[7]));
405  sqlType.set_comp_param(typeData[8]);
406  sqlType.set_size(typeData[9]);
408  encoder->readMetadata(f);
409  }
410 }
411 
412 void FileBuffer::writeMetadata(const int epoch) {
413  // Right now stats page is size_ (in bytes), bufferType, encodingType,
414  // encodingDataType, numElements
416  writeHeader(page, -1, epoch, true);
417  FILE* f = fm_->getFileForFileId(page.fileId);
418  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
419  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
420  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
421  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
422  // encodingType, encodingBits all as int
423  typeData[0] = METADATA_VERSION;
424  typeData[1] = static_cast<int>(hasEncoder);
425  if (hasEncoder) {
426  typeData[2] = static_cast<int>(sqlType.get_type());
427  typeData[3] = static_cast<int>(sqlType.get_subtype());
428  typeData[4] = sqlType.get_dimension();
429  typeData[5] = sqlType.get_scale();
430  typeData[6] = static_cast<int>(sqlType.get_notnull());
431  typeData[7] = static_cast<int>(sqlType.get_compression());
432  typeData[8] = sqlType.get_comp_param();
433  typeData[9] = sqlType.get_size();
434  }
435  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
436  if (hasEncoder) { // redundant
437  encoder->writeMetadata(f);
438  }
439  metadataPages_.epochs.push_back(epoch);
440  metadataPages_.pageVersions.push_back(page);
441 }
442 
443 /*
444 void FileBuffer::checkpoint() {
445  if (isAppended_) {
446  Page page = multiPages_[multiPages.size()-1].current();
447  writeHeader(page,0,multiPages_[0].epochs.back());
448  }
449  isDirty_ = false;
450  isUpdated_ = false;
451  isAppended_ = false;
452 }
453 */
454 
455 void FileBuffer::append(int8_t* src,
456  const size_t numBytes,
457  const MemoryLevel srcBufferType,
458  const int deviceId) {
459  isDirty_ = true;
460  isAppended_ = true;
461 
462  size_t startPage = size_ / pageDataSize_;
463  size_t startPageOffset = size_ % pageDataSize_;
464  size_t numPagesToWrite =
465  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
466  size_t bytesLeft = numBytes;
467  int8_t* curPtr = src; // a pointer to the current location in dst being written to
468  size_t initialNumPages = multiPages_.size();
469  size_ = size_ + numBytes;
470  int epoch = fm_->epoch();
471  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
472  Page page;
473  if (pageNum >= initialNumPages) {
474  page = addNewMultiPage(epoch);
475  writeHeader(page, pageNum, epoch);
476  } else {
477  // we already have a new page at current
478  // epoch for this page - just grab this page
479  page = multiPages_[pageNum].current();
480  }
481  CHECK(page.fileId >= 0); // make sure page was initialized
482  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
483  size_t bytesWritten;
484  if (pageNum == startPage) {
485  bytesWritten = fileInfo->write(
486  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
487  min(pageDataSize_ - startPageOffset, bytesLeft),
488  curPtr);
489  } else {
490  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
491  min(pageDataSize_, bytesLeft),
492  curPtr);
493  }
494  curPtr += bytesWritten;
495  bytesLeft -= bytesWritten;
496  }
497  CHECK(bytesLeft == 0);
498 }
499 
500 void FileBuffer::write(int8_t* src,
501  const size_t numBytes,
502  const size_t offset,
503  const MemoryLevel srcBufferType,
504  const int deviceId) {
505  if (srcBufferType != CPU_LEVEL) {
506  LOG(FATAL) << "Unsupported Buffer type";
507  }
508  isDirty_ = true;
509  if (offset < size_) {
510  isUpdated_ = true;
511  }
512  bool tempIsAppended = false;
513 
514  if (offset + numBytes > size_) {
515  tempIsAppended = true; // because isAppended_ could have already been true - to avoid
516  // rewriting header
517  isAppended_ = true;
518  size_ = offset + numBytes;
519  }
520 
521  size_t startPage = offset / pageDataSize_;
522  size_t startPageOffset = offset % pageDataSize_;
523  size_t numPagesToWrite =
524  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
525  size_t bytesLeft = numBytes;
526  int8_t* curPtr = src; // a pointer to the current location in dst being written to
527  size_t initialNumPages = multiPages_.size();
528  int epoch = fm_->epoch();
529 
530  if (startPage >
531  initialNumPages) { // means there is a gap we need to allocate pages for
532  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
533  Page page = addNewMultiPage(epoch);
534  writeHeader(page, pageNum, epoch);
535  }
536  }
537  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
538  Page page;
539  if (pageNum >= initialNumPages) {
540  page = addNewMultiPage(epoch);
541  writeHeader(page, pageNum, epoch);
542  } else if (multiPages_[pageNum].epochs.back() <
543  epoch) { // need to create new page b/c this current one lags epoch and we
544  // can't overwrite it also need to copy if we are on first or
545  // last page
546  Page lastPage = multiPages_[pageNum].current();
547  page = fm_->requestFreePage(pageSize_, false);
548  multiPages_[pageNum].epochs.push_back(epoch);
549  multiPages_[pageNum].pageVersions.push_back(page);
550  if (pageNum == startPage && startPageOffset > 0) {
551  // copyPage takes care of header offset so don't worry
552  // about it
553  copyPage(lastPage, page, startPageOffset, 0);
554  }
555  if (pageNum == startPage + numPagesToWrite &&
556  bytesLeft > 0) { // bytesLeft should always > 0
557  copyPage(lastPage,
558  page,
559  pageDataSize_ - bytesLeft,
560  bytesLeft); // these would be empty if we're appending but we won't
561  // worry about it right now
562  }
563  writeHeader(page, pageNum, epoch);
564  } else {
565  // we already have a new page at current
566  // epoch for this page - just grab this page
567  page = multiPages_[pageNum].current();
568  }
569  CHECK(page.fileId >= 0); // make sure page was initialized
570  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
571  size_t bytesWritten;
572  if (pageNum == startPage) {
573  bytesWritten = fileInfo->write(
574  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
575  min(pageDataSize_ - startPageOffset, bytesLeft),
576  curPtr);
577  } else {
578  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
579  min(pageDataSize_, bytesLeft),
580  curPtr);
581  }
582  curPtr += bytesWritten;
583  bytesLeft -= bytesWritten;
584  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
585  //@todo below can lead to undefined - we're overwriting num
586  // bytes valid at checkpoint
587  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
588  }
589  }
590  CHECK(bytesLeft == 0);
591 }
592 
593 } // namespace File_Namespace
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:359
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:130
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:157
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:325
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:801
static size_t headerBufferOffset_
Definition: FileBuffer.h:169
void initEncoder(const SQLTypeInfo tmpSqlType)
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:368
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:195
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:182
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:30
void set_size(int s)
Definition: sqltypes.h:421
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:918
HOST DEVICE int get_scale() const
Definition: sqltypes.h:328
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
int64_t * src
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:413
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
void set_dimension(int d)
Definition: sqltypes.h:415
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:192
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:500
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void set_scale(int s)
Definition: sqltypes.h:418
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
void set_compression(EncodingType c)
Definition: sqltypes.h:423
void set_notnull(bool n)
Definition: sqltypes.h:420
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:206
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
std::deque< Page > pageVersions
Definition: Page.h:71
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:414
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:324
An AbstractBuffer is a unit of data management for a data manager.
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:236
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:412
void freePage(int pageId)
Definition: FileInfo.cpp:202
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
std::deque< int > epochs
Definition: Page.h:72
#define NUM_METADATA
Definition: FileBuffer.h:35
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:133
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
virtual int getDeviceId() const
void set_comp_param(int p)
Definition: sqltypes.h:424
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:332
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:37
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:138
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:134
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:387
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:339
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:455
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:212