OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <future>
26 #include <map>
27 #include <thread>
28 
30 #include "Shared/File.h"
31 
32 #define METADATA_PAGE_SIZE 4096
33 
34 using namespace std;
35 
36 namespace File_Namespace {
37 size_t FileBuffer::headerBufferOffset_ = 32;
38 
39 FileBuffer::FileBuffer(FileMgr* fm,
40  const size_t pageSize,
41  const ChunkKey& chunkKey,
42  const size_t initialSize)
43  : AbstractBuffer(fm->getDeviceId())
44  , fm_(fm)
45  , metadataPages_(METADATA_PAGE_SIZE)
46  , pageSize_(pageSize)
47  , chunkKey_(chunkKey) {
48  // Create a new FileBuffer
49  CHECK(fm_);
52  //@todo reintroduce initialSize - need to develop easy way of
53  // differentiating these pre-allocated pages from "written-to" pages
54  /*
55  if (initalSize > 0) {
56  // should expand to initialSize bytes
57  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
58  int epoch = fm_->epoch();
59  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
60  Page page = addNewMultiPage(epoch);
61  writeHeader(page,pageNum,epoch);
62  }
63  }
64  */
65 }
66 
68  const size_t pageSize,
69  const ChunkKey& chunkKey,
70  const SQLTypeInfo sqlType,
71  const size_t initialSize)
72  : AbstractBuffer(fm->getDeviceId(), sqlType)
73  , fm_(fm)
74  , metadataPages_(METADATA_PAGE_SIZE)
75  , pageSize_(pageSize)
76  , chunkKey_(chunkKey) {
77  CHECK(fm_);
80 }
81 
83  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
84  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
85  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
86  : AbstractBuffer(fm->getDeviceId())
87  , fm_(fm)
88  , metadataPages_(METADATA_PAGE_SIZE)
89  , pageSize_(0)
90  , chunkKey_(chunkKey) {
91  // We are being assigned an existing FileBuffer on disk
92 
93  CHECK(fm_);
95  // MultiPage multiPage(pageSize_); // why was this here?
96  int lastPageId = -1;
97  // Page lastMetadataPage;
98  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
99  int curPageId = vecIt->pageId;
100 
101  // We only want to read last metadata page
102  if (curPageId == -1) { // stats page
103  metadataPages_.epochs.push_back(vecIt->versionEpoch);
104  metadataPages_.pageVersions.push_back(vecIt->page);
105  } else {
106  if (curPageId != lastPageId) {
107  // protect from bad data on disk, and give diagnostics
108  if (curPageId != lastPageId + 1) {
109  LOG(FATAL) << "Failure reading DB file " << showChunk(chunkKey)
110  << " Current page " << curPageId << " last page " << lastPageId
111  << " epoch " << vecIt->versionEpoch;
112  }
113  if (lastPageId == -1) {
114  // If we are on first real page
115  CHECK(metadataPages_.pageVersions.back().fileId != -1); // was initialized
118  }
119  MultiPage multiPage(pageSize_);
120  multiPages_.push_back(multiPage);
121  lastPageId = curPageId;
122  }
123  multiPages_.back().epochs.push_back(vecIt->versionEpoch);
124  multiPages_.back().pageVersions.push_back(vecIt->page);
125  }
126  if (curPageId == -1) { // meaning there was only a metadata page
129  }
130  }
131  // auto lastHeaderIt = std::prev(headerEndIt);
132  // size_ = lastHeaderIt->chunkSize;
133 }
134 
136  // need to free pages
137  // NOP
138 }
139 
140 void FileBuffer::reserve(const size_t numBytes) {
141  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
142  size_t numCurrentPages = multiPages_.size();
143  int epoch = fm_->epoch();
144 
145  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
146  Page page = addNewMultiPage(epoch);
147  writeHeader(page, pageNum, epoch);
148  }
149 }
150 
152  // 3 * sizeof(int) is for headerSize, for pageId and versionEpoch
153  // sizeof(size_t) is for chunkSize
154  // reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int) + sizeof(size_t);
155  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int);
156  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
157  if (headerMod > 0) {
159  }
160  // pageDataSize_ = pageSize_-reservedHeaderSize_;
161 }
162 
164  // Need to zero headers (actually just first four bytes of header)
165 
166  // First delete metadata pages
167  for (auto metaPageIt = metadataPages_.pageVersions.begin();
168  metaPageIt != metadataPages_.pageVersions.end();
169  ++metaPageIt) {
170  FileInfo* fileInfo = fm_->getFileInfoForFileId(metaPageIt->fileId);
171  fileInfo->freePage(metaPageIt->pageNum);
172  }
173 
174  // Now delete regular pages
175  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
176  ++multiPageIt) {
177  for (auto pageIt = multiPageIt->pageVersions.begin();
178  pageIt != multiPageIt->pageVersions.end();
179  ++pageIt) {
180  FileInfo* fileInfo = fm_->getFileInfoForFileId(pageIt->fileId);
181  fileInfo->freePage(pageIt->pageNum);
182  }
183  }
184 }
185 
186 struct readThreadDS {
187  FileMgr* t_fm; // ptr to FileMgr
188  size_t t_startPage; // start page for the thread
189  size_t t_endPage; // last page for the thread
190  int8_t* t_curPtr; // pointer to the current location of the target for the thread
191  size_t t_bytesLeft; // number of bytes to be read in the thread
192  size_t t_startPageOffset; // offset - used for the first page of the buffer
193  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
194  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
195 };
196 
197 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
198  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
199  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
200  int8_t* curPtr = threadDS.t_curPtr;
201  size_t bytesLeft = threadDS.t_bytesLeft;
202  size_t totalBytesRead = 0;
203  bool isFirstPage = threadDS.t_isFirstPage;
204 
205  // Traverse the logical pages
206  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
207  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
208  Page page = threadDS.multiPages[pageNum].current();
209 
210  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
211  CHECK(fileInfo);
212 
213  // Read the page into the destination (dst) buffer at its
214  // current (cur) location
215  size_t bytesRead = 0;
216  if (isFirstPage) {
217  bytesRead = fileInfo->read(
218  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
219  fileBuffer->reservedHeaderSize(),
220  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
221  curPtr);
222  isFirstPage = false;
223  } else {
224  bytesRead = fileInfo->read(
225  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
226  min(fileBuffer->pageDataSize(), bytesLeft),
227  curPtr);
228  }
229  curPtr += bytesRead;
230  bytesLeft -= bytesRead;
231  totalBytesRead += bytesRead;
232  }
233  CHECK(bytesLeft == 0);
234 
235  return (totalBytesRead);
236 }
237 
238 void FileBuffer::read(int8_t* const dst,
239  const size_t numBytes,
240  const size_t offset,
241  const MemoryLevel dstBufferType,
242  const int deviceId) {
243  if (dstBufferType != CPU_LEVEL) {
244  LOG(FATAL) << "Unsupported Buffer type";
245  }
246 
247  // variable declarations
248  size_t startPage = offset / pageDataSize_;
249  size_t startPageOffset = offset % pageDataSize_;
250  size_t numPagesToRead =
251  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
252  /*
253  if (startPage + numPagesToRead > multiPages_.size()) {
254  cout << "Start page: " << startPage << endl;
255  cout << "Num pages to read: " << numPagesToRead << endl;
256  cout << "Num multipages: " << multiPages_.size() << endl;
257  cout << "Offset: " << offset << endl;
258  cout << "Num bytes: " << numBytes << endl;
259  }
260  */
261 
262  CHECK(startPage + numPagesToRead <= multiPages_.size());
263 
264  size_t numPagesPerThread = 0;
265  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
266  size_t bytesRead = 0; // total number of bytes already being read
267  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
268  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
269  size_t numThreads = fm_->getNumReaderThreads();
270  std::vector<readThreadDS>
271  threadDSArr; // array of threadDS, needed to avoid racing conditions
272 
273  if (numPagesToRead > numThreads) {
274  numPagesPerThread = numPagesToRead / numThreads;
275  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
276  } else {
277  numThreads = numPagesToRead;
278  numPagesPerThread = 1;
279  }
280 
281  /* set threadDS for the first thread */
282  readThreadDS threadDS;
283  threadDS.t_fm = fm_;
284  threadDS.t_startPage = offset / pageDataSize_;
285  if (numExtraPages > 0) {
286  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
287  numExtraPages--;
288  } else {
289  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
290  }
291  threadDS.t_curPtr = dst;
292  threadDS.t_startPageOffset = offset % pageDataSize_;
293  threadDS.t_isFirstPage = true;
294 
295  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
296  threadDS.t_startPageOffset),
297  numBytesCurrent);
298  threadDS.t_bytesLeft = bytesLeftForThread;
299  threadDS.multiPages = getMultiPage();
300 
301  if (numThreads == 1) {
302  bytesRead += readForThread(this, threadDS);
303  } else {
304  std::vector<std::future<size_t>> threads;
305 
306  for (size_t i = 0; i < numThreads; i++) {
307  threadDSArr.push_back(threadDS);
308  threads.push_back(
309  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
310 
311  // calculate elements of threadDS
312  threadDS.t_fm = fm_;
313  threadDS.t_isFirstPage = false;
314  threadDS.t_curPtr += bytesLeftForThread;
315  threadDS.t_startPage +=
316  threadDS.t_endPage -
317  threadDS.t_startPage; // based on # of pages read on previous iteration
318  if (numExtraPages > 0) {
319  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
320  numExtraPages--;
321  } else {
322  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
323  }
324  numBytesCurrent -= bytesLeftForThread;
325  bytesLeftForThread = min(
326  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
327  threadDS.t_bytesLeft = bytesLeftForThread;
328  threadDS.multiPages = getMultiPage();
329  }
330 
331  for (auto& p : threads) {
332  p.wait();
333  }
334  for (auto& p : threads) {
335  bytesRead += p.get();
336  }
337  }
338  CHECK(bytesRead == numBytes);
339 }
340 
342  Page& destPage,
343  const size_t numBytes,
344  const size_t offset) {
345  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
346  // FILE *destFile = fm_->files_[destPage.fileId]->f;
347  CHECK(offset + numBytes < pageDataSize_);
348  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
349  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
350 
351  int8_t* buffer = new int8_t[numBytes];
352  size_t bytesRead = srcFileInfo->read(
353  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
354  CHECK(bytesRead == numBytes);
355  size_t bytesWritten = destFileInfo->write(
356  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
357  CHECK(bytesWritten == numBytes);
358  delete[] buffer;
359 }
360 
362  Page page = fm_->requestFreePage(pageSize_, false);
363  MultiPage multiPage(pageSize_);
364  multiPage.epochs.push_back(epoch);
365  multiPage.pageVersions.push_back(page);
366  multiPages_.push_back(multiPage);
367  return page;
368 }
369 
371  const int pageId,
372  const int epoch,
373  const bool writeMetadata) {
374  int intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
375  vector<int> header(intHeaderSize);
376  // in addition to chunkkey we need size of header, pageId, version
377  header[0] =
378  (intHeaderSize - 1) * sizeof(int); // don't need to include size of headerSize
379  // value - sizeof(size_t) is for chunkSize
380  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
381  header[intHeaderSize - 2] = pageId;
382  header[intHeaderSize - 1] = epoch;
383  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
384  size_t pageSize = writeMetadata ? METADATA_PAGE_SIZE : pageSize_;
385  fileInfo->write(
386  page.pageNum * pageSize, (intHeaderSize) * sizeof(int), (int8_t*)&header[0]);
387 }
388 
389 void FileBuffer::readMetadata(const Page& page) {
390  FILE* f = fm_->getFileForFileId(page.fileId);
391  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
392  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
393  fread((int8_t*)&size_, sizeof(size_t), 1, f);
394  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
395  // encodingType, encodingBits all as int
396  fread((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
397  int version = typeData[0];
398  CHECK(version == METADATA_VERSION); // add backward compatibility code here
399  has_encoder = static_cast<bool>(typeData[1]);
400  if (has_encoder) {
401  sql_type.set_type(static_cast<SQLTypes>(typeData[2]));
402  sql_type.set_subtype(static_cast<SQLTypes>(typeData[3]));
403  sql_type.set_dimension(typeData[4]);
404  sql_type.set_scale(typeData[5]);
405  sql_type.set_notnull(static_cast<bool>(typeData[6]));
406  sql_type.set_compression(static_cast<EncodingType>(typeData[7]));
407  sql_type.set_comp_param(typeData[8]);
408  sql_type.set_size(typeData[9]);
410  encoder->readMetadata(f);
411  }
412 }
413 
414 void FileBuffer::writeMetadata(const int epoch) {
415  // Right now stats page is size_ (in bytes), bufferType, encodingType,
416  // encodingDataType, numElements
418  writeHeader(page, -1, epoch, true);
419  FILE* f = fm_->getFileForFileId(page.fileId);
420  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
421  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
422  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
423  vector<int> typeData(NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
424  // encodingType, encodingBits all as int
425  typeData[0] = METADATA_VERSION;
426  typeData[1] = static_cast<int>(has_encoder);
427  if (has_encoder) {
428  typeData[2] = static_cast<int>(sql_type.get_type());
429  typeData[3] = static_cast<int>(sql_type.get_subtype());
430  typeData[4] = sql_type.get_dimension();
431  typeData[5] = sql_type.get_scale();
432  typeData[6] = static_cast<int>(sql_type.get_notnull());
433  typeData[7] = static_cast<int>(sql_type.get_compression());
434  typeData[8] = sql_type.get_comp_param();
435  typeData[9] = sql_type.get_size();
436  }
437  fwrite((int8_t*)&(typeData[0]), sizeof(int), typeData.size(), f);
438  if (has_encoder) { // redundant
439  encoder->writeMetadata(f);
440  }
441  metadataPages_.epochs.push_back(epoch);
442  metadataPages_.pageVersions.push_back(page);
443 }
444 
445 /*
446 void FileBuffer::checkpoint() {
447  if (is_appended_) {
448  Page page = multiPages_[multiPages.size()-1].current();
449  writeHeader(page,0,multiPages_[0].epochs.back());
450  }
451  is_dirty_ = false;
452  is_updated_ = false;
453  is_appended_ = false;
454 }
455 */
456 
457 void FileBuffer::append(int8_t* src,
458  const size_t numBytes,
459  const MemoryLevel srcBufferType,
460  const int deviceId) {
461  is_dirty_ = true;
462  is_appended_ = true;
463 
464  size_t startPage = size_ / pageDataSize_;
465  size_t startPageOffset = size_ % pageDataSize_;
466  size_t numPagesToWrite =
467  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
468  size_t bytesLeft = numBytes;
469  int8_t* curPtr = src; // a pointer to the current location in dst being written to
470  size_t initialNumPages = multiPages_.size();
471  size_ = size_ + numBytes;
472  int epoch = fm_->epoch();
473  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
474  Page page;
475  if (pageNum >= initialNumPages) {
476  page = addNewMultiPage(epoch);
477  writeHeader(page, pageNum, epoch);
478  } else {
479  // we already have a new page at current
480  // epoch for this page - just grab this page
481  page = multiPages_[pageNum].current();
482  }
483  CHECK(page.fileId >= 0); // make sure page was initialized
484  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
485  size_t bytesWritten;
486  if (pageNum == startPage) {
487  bytesWritten = fileInfo->write(
488  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
489  min(pageDataSize_ - startPageOffset, bytesLeft),
490  curPtr);
491  } else {
492  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
493  min(pageDataSize_, bytesLeft),
494  curPtr);
495  }
496  curPtr += bytesWritten;
497  bytesLeft -= bytesWritten;
498  }
499  CHECK(bytesLeft == 0);
500 }
501 
502 void FileBuffer::write(int8_t* src,
503  const size_t numBytes,
504  const size_t offset,
505  const MemoryLevel srcBufferType,
506  const int deviceId) {
507  if (srcBufferType != CPU_LEVEL) {
508  LOG(FATAL) << "Unsupported Buffer type";
509  }
510  is_dirty_ = true;
511  if (offset < size_) {
512  is_updated_ = true;
513  }
514  bool tempIsAppended = false;
515 
516  if (offset + numBytes > size_) {
517  tempIsAppended = true; // because is_appended_ could have already been true - to
518  // avoid rewriting header
519  is_appended_ = true;
520  size_ = offset + numBytes;
521  }
522 
523  size_t startPage = offset / pageDataSize_;
524  size_t startPageOffset = offset % pageDataSize_;
525  size_t numPagesToWrite =
526  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
527  size_t bytesLeft = numBytes;
528  int8_t* curPtr = src; // a pointer to the current location in dst being written to
529  size_t initialNumPages = multiPages_.size();
530  int epoch = fm_->epoch();
531 
532  if (startPage >
533  initialNumPages) { // means there is a gap we need to allocate pages for
534  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
535  Page page = addNewMultiPage(epoch);
536  writeHeader(page, pageNum, epoch);
537  }
538  }
539  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
540  Page page;
541  if (pageNum >= initialNumPages) {
542  page = addNewMultiPage(epoch);
543  writeHeader(page, pageNum, epoch);
544  } else if (multiPages_[pageNum].epochs.back() <
545  epoch) { // need to create new page b/c this current one lags epoch and we
546  // can't overwrite it also need to copy if we are on first or
547  // last page
548  Page lastPage = multiPages_[pageNum].current();
549  page = fm_->requestFreePage(pageSize_, false);
550  multiPages_[pageNum].epochs.push_back(epoch);
551  multiPages_[pageNum].pageVersions.push_back(page);
552  if (pageNum == startPage && startPageOffset > 0) {
553  // copyPage takes care of header offset so don't worry
554  // about it
555  copyPage(lastPage, page, startPageOffset, 0);
556  }
557  if (pageNum == startPage + numPagesToWrite &&
558  bytesLeft > 0) { // bytesLeft should always > 0
559  copyPage(lastPage,
560  page,
561  pageDataSize_ - bytesLeft,
562  bytesLeft); // these would be empty if we're appending but we won't
563  // worry about it right now
564  }
565  writeHeader(page, pageNum, epoch);
566  } else {
567  // we already have a new page at current
568  // epoch for this page - just grab this page
569  page = multiPages_[pageNum].current();
570  }
571  CHECK(page.fileId >= 0); // make sure page was initialized
572  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
573  size_t bytesWritten;
574  if (pageNum == startPage) {
575  bytesWritten = fileInfo->write(
576  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
577  min(pageDataSize_ - startPageOffset, bytesLeft),
578  curPtr);
579  } else {
580  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
581  min(pageDataSize_, bytesLeft),
582  curPtr);
583  }
584  curPtr += bytesWritten;
585  bytesLeft -= bytesWritten;
586  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
587  //@todo below can lead to undefined - we're overwriting num
588  // bytes valid at checkpoint
589  writeHeader(page, 0, multiPages_[0].epochs.back(), true);
590  }
591  }
592  CHECK(bytesLeft == 0);
593 }
594 
595 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
void initEncoder(const SQLTypeInfo tmp_sql_type)
Page addNewMultiPage(const int epoch)
Definition: FileBuffer.cpp:361
std::vector< int > ChunkKey
Definition: types.h:35
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:799
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
static size_t headerBufferOffset_
Definition: FileBuffer.h:169
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:197
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:185
#define METADATA_PAGE_SIZE
Definition: FileBuffer.cpp:32
void set_size(int s)
Definition: sqltypes.h:424
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:916
int64_t * src
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:416
void set_dimension(int d)
Definition: sqltypes.h:418
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:194
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:502
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void set_scale(int s)
Definition: sqltypes.h:421
void set_compression(EncodingType c)
Definition: sqltypes.h:426
void set_notnull(bool n)
Definition: sqltypes.h:423
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:204
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
std::deque< Page > pageVersions
Definition: Page.h:71
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:417
CHECK(cgen_state)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
An AbstractBuffer is a unit of data management for a data manager.
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:238
void writeMetadata(const int epoch)
Definition: FileBuffer.cpp:414
void freePage(int pageId)
Definition: FileInfo.cpp:202
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
std::deque< int > epochs
Definition: Page.h:72
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:134
#define NUM_METADATA
Definition: FileBuffer.h:35
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:135
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
void set_comp_param(int p)
Definition: sqltypes.h:427
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:39
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:140
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:389
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:341
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:130
std::unique_ptr< Encoder > encoder
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int deviceId=-1) override
Definition: FileBuffer.cpp:457
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:210