OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <future>
26 #include <map>
27 #include <thread>
28 
30 #include "Shared/File.h"
31 #include "Shared/checked_alloc.h"
32 
33 #define METADATA_PAGE_SIZE 4096
34 
35 using namespace std;
36 
37 namespace File_Namespace {
38 size_t FileBuffer::headerBufferOffset_ = 32;
39 
40 FileBuffer::FileBuffer(FileMgr* fm,
41  const size_t pageSize,
42  const ChunkKey& chunkKey,
43  const size_t initialSize)
44  : AbstractBuffer(fm->getDeviceId())
45  , fm_(fm)
46  , metadataPages_(METADATA_PAGE_SIZE)
47  , pageSize_(pageSize)
48  , chunkKey_(chunkKey) {
49  // Create a new FileBuffer
50  CHECK(fm_);
53  //@todo reintroduce initialSize - need to develop easy way of
54  // differentiating these pre-allocated pages from "written-to" pages
55  /*
56  if (initalSize > 0) {
57  // should expand to initialSize bytes
58  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
59  int epoch = fm_->epoch();
60  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
61  Page page = addNewMultiPage(epoch);
62  writeHeader(page,pageNum,epoch);
63  }
64  }
65  */
66 }
67 
69  const size_t pageSize,
70  const ChunkKey& chunkKey,
71  const SQLTypeInfo sqlType,
72  const size_t initialSize)
73  : AbstractBuffer(fm->getDeviceId(), sqlType)
74  , fm_(fm)
75  , metadataPages_(METADATA_PAGE_SIZE)
76  , pageSize_(pageSize)
77  , chunkKey_(chunkKey) {
78  CHECK(fm_);
81 }
82 
84  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
85  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
86  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
87  : AbstractBuffer(fm->getDeviceId())
88  , fm_(fm)
89  , metadataPages_(METADATA_PAGE_SIZE)
90  , pageSize_(0)
91  , chunkKey_(chunkKey) {
92  // We are being assigned an existing FileBuffer on disk
93 
94  CHECK(fm_);
96  // MultiPage multiPage(pageSize_); // why was this here?
97  int lastPageId = -1;
98  // Page lastMetadataPage;
99  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100  int curPageId = vecIt->pageId;
101 
102  // We only want to read last metadata page
103  if (curPageId == -1) { // stats page
104  metadataPages_.epochs.push_back(vecIt->versionEpoch);
105  metadataPages_.pageVersions.push_back(vecIt->page);
106  } else {
107  if (curPageId != lastPageId) {
108  // protect from bad data on disk, and give diagnostics
109  if (curPageId != lastPageId + 1) {
110  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
111  << " Current page " << curPageId << " last page " << lastPageId
112  << " epoch " << vecIt->versionEpoch;
113  }
114  if (lastPageId == -1) {
115  // If we are on first real page
116  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
119  }
120  MultiPage multiPage(pageSize_);
121  multiPages_.push_back(multiPage);
122  lastPageId = curPageId;
123  }
124  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
125  multiPages_.back().pageVersions.push_back(vecIt->page);
126  }
127  if (curPageId == -1) { // meaning there was only a metadata page
130  }
131  }
132  // auto lastHeaderIt = std::prev(headerEndIt);
133  // size_ = lastHeaderIt->chunkSize;
134 }
135 
137  // need to free pages
138  // NOP
139 }
140 
141 void FileBuffer::reserve(const size_t numBytes) {
142  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
143  size_t numCurrentPages = multiPages_.size();
144  int epoch = fm_->epoch();
145 
146  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
147  Page page = addNewMultiPage(epoch);
148  writeHeader(page, pageNum, epoch);
149  }
150 }
151 
153  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
154  // sizeof(size_t) is for chunkSize
155  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
156  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
157  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
158  if (headerMod > 0) {
160  }
161  // pageDataSize_ = pageSize_-reservedHeaderSize_;
162 }
163 
165  // Need to zero headers (actually just first four bytes of header)
166 
167  // First delete metadata pages
168  for (auto metaPageIt = metadataPages_.pageVersions.begin();
169  metaPageIt != metadataPages_.pageVersions.end();
170  ++metaPageIt) {
171  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
172  fileInfo->freePage(metaPageIt->pageNum);
173  }
174 
175  // Now delete regular pages
176  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
177  ++multiPageIt) {
178  for (auto pageIt = multiPageIt->pageVersions.begin();
179  pageIt != multiPageIt->pageVersions.end();
180  ++pageIt) {
181  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
182  fileInfo->freePage(pageIt->pageNum);
183  }
184  }
185 }
186 
187 struct readThreadDS {
188  FileMgr* t_fm; // ptr to FileMgr
189  size_t t_startPage; // start page for the thread
190  size_t t_endPage; // last page for the thread
191  int8_t* t_curPtr; // pointer to the current location of the target for the thread
192  size_t t_bytesLeft; // number of bytes to be read in the thread
193  size_t t_startPageOffset; // offset - used for the first page of the buffer
194  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
195  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
196 };
197 
198 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
199  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
200  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
201  int8_t* curPtr = threadDS.t_curPtr;
202  size_t bytesLeft = threadDS.t_bytesLeft;
203  size_t totalBytesRead = 0;
204  bool isFirstPage = threadDS.t_isFirstPage;
205 
206  // Traverse the logical pages
207  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
208  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
209  Page page = threadDS.multiPages[pageNum].current();
210 
211  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
212  CHECK(fileInfo);
213 
214  // Read the page into the destination (dst) buffer at its
215  // current (cur) location
216  size_t bytesRead = 0;
217  if (isFirstPage) {
218  bytesRead = fileInfo->read(
219  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
220  fileBuffer->reservedHeaderSize(),
221  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
222  curPtr);
223  isFirstPage = false;
224  } else {
225  bytesRead = fileInfo->read(
226  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
227  min(fileBuffer->pageDataSize(), bytesLeft),
228  curPtr);
229  }
230  curPtr += bytesRead;
231  bytesLeft -= bytesRead;
232  totalBytesRead += bytesRead;
233  }
234  CHECK(bytesLeft == 0);
235 
236  return (totalBytesRead);
237 }
238 
239 void FileBuffer::read(int8_t* const dst,
240  const size_t numBytes,
241  const size_t offset,
242  const MemoryLevel dstBufferType,
243  const int deviceId) {
244  if (dstBufferType != CPU_LEVEL) {
245  LOG(FATAL) << "Unsupported Buffer type";
246  }
247 
248  // variable declarations
249  size_t startPage = offset / pageDataSize_;
250  size_t startPageOffset = offset % pageDataSize_;
251  size_t numPagesToRead =
252  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
253  /*
254  if (startPage + numPagesToRead > multiPages_.size()) {
255  cout << "Start page: " << startPage << endl;
256  cout << "Num pages to read: " << numPagesToRead << endl;
257  cout << "Num multipages: " << multiPages_.size() << endl;
258  cout << "Offset: " << offset << endl;
259  cout << "Num bytes: " << numBytes << endl;
260  }
261  */
262 
263  CHECK(startPage + numPagesToRead <= multiPages_.size());
264 
265  size_t numPagesPerThread = 0;
266  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
267  size_t bytesRead = 0; // total number of bytes already being read
268  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
269  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
270  size_t numThreads = fm_->getNumReaderThreads();
271  std::vector<readThreadDS>
272  threadDSArr; // array of threadDS, needed to avoid racing conditions
273 
274  if (numPagesToRead > numThreads) {
275  numPagesPerThread = numPagesToRead / numThreads;
276  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
277  } else {
278  numThreads = numPagesToRead;
279  numPagesPerThread = 1;
280  }
281 
282  /* set threadDS for the first thread */
283  readThreadDS threadDS;
284  threadDS.t_fm = fm_;
285  threadDS.t_startPage = offset / pageDataSize_;
286  if (numExtraPages > 0) {
287  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
288  numExtraPages--;
289  } else {
290  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
291  }
292  threadDS.t_curPtr = dst;
293  threadDS.t_startPageOffset = offset % pageDataSize_;
294  threadDS.t_isFirstPage = true;
295 
296  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
297  threadDS.t_startPageOffset),
298  numBytesCurrent);
299  threadDS.t_bytesLeft = bytesLeftForThread;
300  threadDS.multiPages = getMultiPage();
301 
302  if (numThreads == 1) {
303  bytesRead += readForThread(this, threadDS);
304  } else {
305  std::vector<std::future<size_t>> threads;
306 
307  for (size_t i = 0; i < numThreads; i++) {
308  threadDSArr.push_back(threadDS);
309  threads.push_back(
310  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
311 
312  // calculate elements of threadDS
313  threadDS.t_fm = fm_;
314  threadDS.t_isFirstPage = false;
315  threadDS.t_curPtr += bytesLeftForThread;
316  threadDS.t_startPage +=
317  threadDS.t_endPage -
318  threadDS.t_startPage; // based on # of pages read on previous iteration
319  if (numExtraPages > 0) {
320  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
321  numExtraPages--;
322  } else {
323  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
324  }
325  numBytesCurrent -= bytesLeftForThread;
326  bytesLeftForThread = min(
327  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
328  threadDS.t_bytesLeft = bytesLeftForThread;
329  threadDS.multiPages = getMultiPage();
330  }
331 
332  for (auto& p : threads) {
333  p.wait();
334  }
335  for (auto& p : threads) {
336  bytesRead += p.get();
337  }
338  }
339  CHECK(bytesRead == numBytes);
340 }
341 
343  Page& destPage,
344  const size_t numBytes,
345  const size_t offset) {
346  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
347  // FILE *destFile = fm_->files_[destPage.fileId]->f;
348  CHECK(offset + numBytes < pageDataSize_);
349  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
350  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
351 
352  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
353  size_t bytesRead = srcFileInfo->read(
354  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
355  CHECK(bytesRead == numBytes);
356  size_t bytesWritten = destFileInfo->write(
357  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
358  CHECK(bytesWritten == numBytes);
359  free(buffer);
360 }
361 
363  Page page = fm_->requestFreePage(pageSize_, false);
364  MultiPage multiPage(pageSize_);
365  multiPage.epochs.push_back(epoch);
366  multiPage.pageVersions.push_back(page);
367  multiPages_.push_back(multiPage);
368  return page;
369 }
370 
372  const int pageId,
373  const int epoch,
374  const bool writeMetadata) {
375  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
376  vector<int> header(intHeaderSize);
377  // in addition to chunkkey we need size of header, pageId, version
378  header[0] =
379  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
380  // value - sizeof(size_t) is for chunkSize
381  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
382  header[intHeaderSize - 2] = pageId;
383  header[intHeaderSize - 1] = epoch;
384  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
385  size_t pageSize = writeMetadata ? METADATA_PAGE_SIZE : pageSize_;
386  fileInfo->write(
387  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
388 }
389 
390 void FileBuffer::readMetadata(const Page& page) {
391  FILE* f = fm_->getFileForFileId(page.fileId);
392  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
393  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
394  fread((int8_t*)&size_, sizeof(size_t), 1, f);
395  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
396  // encodingType, encodingBits all as int
397  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
398  int version = typeData[0];
399  CHECK(version == METADATA_VERSION); // add backward compatibility code here
400  has_encoder = static_cast<bool>(typeData[1]);
401  if (has_encoder) {
402  sql_type.set_type(static_cast<SQLTypes>(typeData[2]));
403  sql_type.set_subtype(static_cast<SQLTypes>(typeData[3]));
404  sql_type.set_dimension(typeData[4]);
405  sql_type.set_scale(typeData[5]);
406  sql_type.set_notnull(static_cast<bool>(typeData[6]));
407  sql_type.set_compression(static_cast<EncodingType>(typeData[7]));
408  sql_type.set_comp_param(typeData[8]);
409  sql_type.set_size(typeData[9]);
411  encoder->readMetadata(f);
412  }
413 }
414 
415 void FileBuffer::writeMetadata(const int epoch) {
416  // Right now stats page is size_ (in bytes), bufferType, encodingType,
417  // encodingDataType, numElements
419  writeHeader(page, -1, epoch, true);
420  FILE* f = fm_->getFileForFileId(page.fileId);
421  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
422  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
423  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
424  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
425  // encodingType, encodingBits all as int
426  typeData[0] = METADATA_VERSION;
427  typeData[1] = static_cast<int>(has_encoder);
428  if (has_encoder) {
429  typeData[2] = static_cast<int>(sql_type.get_type());
430  typeData[3] = static_cast<int>(sql_type.get_subtype());
431  typeData[4] = sql_type.get_dimension();
432  typeData[5] = sql_type.get_scale();
433  typeData[6] = static_cast<int>(sql_type.get_notnull());
434  typeData[7] = static_cast<int>(sql_type.get_compression());
435  typeData[8] = sql_type.get_comp_param();
436  typeData[9] = sql_type.get_size();
437  }
438  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
439  if (has_encoder) { // redundant
440  encoder->writeMetadata(f);
441  }
442  metadataPages_.epochs.push_back(epoch);
443  metadataPages_.pageVersions.push_back(page);
444 }
445 
446 /*
447 void FileBuffer::checkpoint() {
448  if (is_appended_) {
449  Page page = multiPages_[multiPages.size()-1].current();
450  writeHeader(page,0,multiPages_[0].epochs.back());
451  }
452  is_dirty_ = false;
453  is_updated_ = false;
454  is_appended_ = false;
455 }
456 */
457 
458 void FileBuffer::append(int8_t* src,
459  const size_t numBytes,
460  const MemoryLevel srcBufferType,
461  const int deviceId) {
462  is_dirty_ = true;
463  is_appended_ = true;
464 
465  size_t startPage = size_ / pageDataSize_;
466  size_t startPageOffset = size_ % pageDataSize_;
467  size_t numPagesToWrite =
468  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
469  size_t bytesLeft = numBytes;
470  int8_t* curPtr = src; // a pointer to the current location in dst being written to
471  size_t initialNumPages = multiPages_.size();
472  size_ = size_ + numBytes;
473  int epoch = fm_->epoch();
474  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
475  Page page;
476  if (pageNum >= initialNumPages) {
477  page = addNewMultiPage(epoch);
478  writeHeader(page, pageNum, epoch);
479  } else {
480  // we already have a new page at current
481  // epoch for this page - just grab this page
482  page = multiPages_[pageNum].current();
483  }
484  CHECK(page.fileId >= 0); // make sure page was initialized
485  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
486  size_t bytesWritten;
487  if (pageNum == startPage) {
488  bytesWritten = fileInfo->write(
489  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
490  min(pageDataSize_ - startPageOffset, bytesLeft),
491  curPtr);
492  } else {
493  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
494  min(pageDataSize_, bytesLeft),
495  curPtr);
496  }
497  curPtr += bytesWritten;
498  bytesLeft -= bytesWritten;
499  }
500  CHECK(bytesLeft == 0);
501 }
502 
503 void FileBuffer::write(int8_t* src,
504  const size_t numBytes,
505  const size_t offset,
506  const MemoryLevel srcBufferType,
507  const int deviceId) {
508  if (srcBufferType != CPU_LEVEL) {
509  LOG(FATAL) << "Unsupported Buffer type";
510  }
511  is_dirty_ = true;
512  if (offset < size_) {
513  is_updated_ = true;
514  }
515  bool tempIsAppended = false;
516 
517  if (offset + numBytes > size_) {
518  tempIsAppended = true; // because is_appended_ could have already been true - to
519  // avoid rewriting header
520  is_appended_ = true;
521  size_ = offset + numBytes;
522  }
523 
524  size_t startPage = offset / pageDataSize_;
525  size_t startPageOffset = offset % pageDataSize_;
526  size_t numPagesToWrite =
527  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
528  size_t bytesLeft = numBytes;
529  int8_t* curPtr = src; // a pointer to the current location in dst being written to
530  size_t initialNumPages = multiPages_.size();
531  int epoch = fm_->epoch();
532 
533  if (startPage >
534  initialNumPages) { // means there is a gap we need to allocate pages for
535  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
536  Page page = addNewMultiPage(epoch);
537  writeHeader(page, pageNum, epoch);
538  }
539  }
540  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
541  Page page;
542  if (pageNum >= initialNumPages) {
543  page = addNewMultiPage(epoch);
544  writeHeader(page, pageNum, epoch);
545  } else if (multiPages_[pageNum].epochs.back() <
546  epoch) { // need to create new page b/c this current one lags epoch and we
547  // can't overwrite it also need to copy if we are on first or
548  // last page
549  Page lastPage = multiPages_[pageNum].current();
550  page = fm_->requestFreePage(pageSize_, false);
551  multiPages_[pageNum].epochs.push_back(epoch);
552  multiPages_[pageNum].pageVersions.push_back(page);
553  if (pageNum == startPage && startPageOffset > 0) {
554  // copyPage takes care of header offset so don't worry
555  // about it
556  copyPage(lastPage, page, startPageOffset, 0);
557  }
558  if (pageNum == startPage + numPagesToWrite &&
559  bytesLeft > 0) { // bytesLeft should always > 0
560  copyPage(lastPage,
561  page,
562  pageDataSize_ - bytesLeft,
563  bytesLeft); // these would be empty if we're appending but we won't
564  // worry about it right now
565  }
566  writeHeader(page, pageNum, epoch);
567  } else {
568  // we already have a new page at current
569  // epoch for this page - just grab this page
570  page = multiPages_[pageNum].current();
571  }
572  CHECK(page.fileId >= 0); // make sure page was initialized
573  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
574  size_t bytesWritten;
575  if (pageNum == startPage) {
576  bytesWritten = fileInfo->write(
577  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
578  min(pageDataSize_ - startPageOffset, bytesLeft),
579  curPtr);
580  } else {
581  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
582  min(pageDataSize_, bytesLeft),
583  curPtr);
584  }
585  curPtr += bytesWritten;
586  bytesLeft -= bytesWritten;
587  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
588  //@todo below can lead to undefined - we're overwriting num
589  // bytes valid at checkpoint
590  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
591  }
592  }
593  CHECK(bytesLeft == 0);
594 }
595 
596 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
void initEncoder(const SQLTypeInfo tmp_sql_type)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:258
void set_compression(EncodingType c)
Definition: sqltypes.h:357
void set_size(int s)
Definition: sqltypes.h:355
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:362
int64_t * src
std::vector< int > ChunkKey
Definition: types.h:35
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
HOST DEVICE int get_size() const
Definition: sqltypes.h:267
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:813
static size_t headerBufferOffset_
Definition: FileBuffer.h:169
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:371
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:198
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:188
HOST DEVICE int get_scale() const
Definition: sqltypes.h:262
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:33
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:930
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:348
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:195
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:503
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:257
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
std::deque< Page > pageVersions
Definition: Page.h:71
CHECK(cgen_state)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
void set_scale(int s)
Definition: sqltypes.h:352
An AbstractBuffer is a unit of data management for a data manager.
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:239
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:415
void freePage(int pageId)
Definition: FileInfo.cpp:202
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void set_comp_param(int p)
Definition: sqltypes.h:358
std::deque< int > epochs
Definition: Page.h:72
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:134
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:265
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:349
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:136
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:259
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:266
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:40
void set_notnull(bool n)
Definition: sqltypes.h:354
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:141
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:264
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:390
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:342
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:130
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:458
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:208
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:347