OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <future>
26 #include <map>
27 #include <thread>
28 #include <utility> // std::pair
29 
31 #include "Shared/File.h"
32 #include "Shared/checked_alloc.h"
33 
34 using namespace std;
35 
36 namespace File_Namespace {
37 size_t FileBuffer::headerBufferOffset_ = 32;
38 
39 FileBuffer::FileBuffer(FileMgr* fm,
40  const size_t pageSize,
41  const ChunkKey& chunkKey,
42  const size_t initialSize)
43  : AbstractBuffer(fm->getDeviceId())
44  , fm_(fm)
45  , metadataPages_(METADATA_PAGE_SIZE)
46  , pageSize_(pageSize)
47  , chunkKey_(chunkKey) {
48  // Create a new FileBuffer
49  CHECK(fm_);
52  //@todo reintroduce initialSize - need to develop easy way of
53  // differentiating these pre-allocated pages from "written-to" pages
54  /*
55  if (initalSize > 0) {
56  // should expand to initialSize bytes
57  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
58  int32_t epoch = fm_->epoch();
59  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
60  Page page = addNewMultiPage(epoch);
61  writeHeader(page,pageNum,epoch);
62  }
63  }
64  */
65 }
66 
68  const size_t pageSize,
69  const ChunkKey& chunkKey,
70  const SQLTypeInfo sqlType,
71  const size_t initialSize)
72  : AbstractBuffer(fm->getDeviceId(), sqlType)
73  , fm_(fm)
74  , metadataPages_(METADATA_PAGE_SIZE)
75  , pageSize_(pageSize)
76  , chunkKey_(chunkKey) {
77  CHECK(fm_);
80 }
81 
83  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
84  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
85  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
86  : AbstractBuffer(fm->getDeviceId())
87  , fm_(fm)
88  , metadataPages_(METADATA_PAGE_SIZE)
89  , pageSize_(0)
90  , chunkKey_(chunkKey) {
91  // We are being assigned an existing FileBuffer on disk
92 
93  CHECK(fm_);
95  int32_t lastPageId = -1;
96  // Page lastMetadataPage;
97  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
98  int32_t curPageId = vecIt->pageId;
99 
100  // We only want to read last metadata page
101  if (curPageId == -1) { // stats page
102  metadataPages_.push(vecIt->page, vecIt->versionEpoch);
103  } else {
104  if (curPageId != lastPageId) {
105  // protect from bad data on disk, and give diagnostics
106  if (curPageId != lastPageId + 1) {
107  LOG(FATAL) << "Failure reading DB file " << show_chunk(chunkKey)
108  << " Current page " << curPageId << " last page " << lastPageId
109  << " epoch " << vecIt->versionEpoch;
110  }
111  if (lastPageId == -1) {
112  // If we are on first real page
113  CHECK(metadataPages_.current().page.fileId != -1); // was initialized
116  }
117  MultiPage multiPage(pageSize_);
118  multiPages_.push_back(multiPage);
119  lastPageId = curPageId;
120  }
121  multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
122  }
123  if (curPageId == -1) { // meaning there was only a metadata page
126  }
127  }
128 }
129 
131  // need to free pages
132  // NOP
133 }
134 
135 void FileBuffer::reserve(const size_t numBytes) {
136  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
137  size_t numCurrentPages = multiPages_.size();
138  int32_t epoch = fm_->epoch();
139 
140  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
141  Page page = addNewMultiPage(epoch);
142  writeHeader(page, pageNum, epoch);
143  }
144 }
145 
147  // 3 * sizeof(int32_t) is for headerSize, for pageId and versionEpoch
148  // sizeof(size_t) is for chunkSize
149  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int32_t);
150  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
151  if (headerMod > 0) {
153  }
154 }
155 
156 void FileBuffer::freePage(const Page& page, const bool isRolloff) {
157  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
158  fileInfo->freePage(page.pageNum, isRolloff);
159 }
160 
162  for (auto metaPageIt = metadataPages_.pageVersions.begin();
163  metaPageIt != metadataPages_.pageVersions.end();
164  ++metaPageIt) {
165  freePage(metaPageIt->page, false /* isRolloff */);
166  }
167  while (metadataPages_.pageVersions.size() > 0) {
169  }
170 }
171 
173  size_t num_pages_freed = multiPages_.size();
174  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
175  ++multiPageIt) {
176  for (auto pageIt = multiPageIt->pageVersions.begin();
177  pageIt != multiPageIt->pageVersions.end();
178  ++pageIt) {
179  freePage(pageIt->page, false /* isRolloff */);
180  }
181  }
182  multiPages_.clear();
183  return num_pages_freed;
184 }
185 
188  freeChunkPages();
189 }
190 
192  const int32_t targetEpoch,
193  const int32_t currentEpoch) {
194  std::vector<EpochedPage> epochedPagesToFree =
195  multiPage.freePagesBeforeEpoch(targetEpoch, currentEpoch);
196  for (const auto& epochedPageToFree : epochedPagesToFree) {
197  freePage(epochedPageToFree.page, true /* isRolloff */);
198  }
199 }
200 
201 void FileBuffer::freePagesBeforeEpoch(const int32_t targetEpoch) {
202  // This method is only safe to be called within a checkpoint, after the sync and epoch
203  // increment where a failure at any point32_t in the process would lead to a safe
204  // rollback
205  const int32_t currentEpoch = fm_->epoch();
206  CHECK_LE(targetEpoch, currentEpoch);
207  freePagesBeforeEpochForMultiPage(metadataPages_, targetEpoch, currentEpoch);
208  for (auto& multiPage : multiPages_) {
209  freePagesBeforeEpochForMultiPage(multiPage, targetEpoch, currentEpoch);
210  }
211 }
212 
213 struct readThreadDS {
214  FileMgr* t_fm; // ptr to FileMgr
215  size_t t_startPage; // start page for the thread
216  size_t t_endPage; // last page for the thread
217  int8_t* t_curPtr; // pointer to the current location of the target for the thread
218  size_t t_bytesLeft; // number of bytes to be read in the thread
219  size_t t_startPageOffset; // offset - used for the first page of the buffer
220  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
221  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
222 };
223 
224 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
225  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
226  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
227  int8_t* curPtr = threadDS.t_curPtr;
228  size_t bytesLeft = threadDS.t_bytesLeft;
229  size_t totalBytesRead = 0;
230  bool isFirstPage = threadDS.t_isFirstPage;
231 
232  // Traverse the logical pages
233  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
234  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
235  Page page = threadDS.multiPages[pageNum].current().page;
236 
237  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
238  CHECK(fileInfo);
239 
240  // Read the page into the destination (dst) buffer at its
241  // current (cur) location
242  size_t bytesRead = 0;
243  if (isFirstPage) {
244  bytesRead = fileInfo->read(
245  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
246  fileBuffer->reservedHeaderSize(),
247  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
248  curPtr);
249  isFirstPage = false;
250  } else {
251  bytesRead = fileInfo->read(
252  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
253  min(fileBuffer->pageDataSize(), bytesLeft),
254  curPtr);
255  }
256  curPtr += bytesRead;
257  bytesLeft -= bytesRead;
258  totalBytesRead += bytesRead;
259  }
260  CHECK(bytesLeft == 0);
261 
262  return (totalBytesRead);
263 }
264 
265 void FileBuffer::read(int8_t* const dst,
266  const size_t numBytes,
267  const size_t offset,
268  const MemoryLevel dstBufferType,
269  const int32_t deviceId) {
270  if (dstBufferType != CPU_LEVEL) {
271  LOG(FATAL) << "Unsupported Buffer type";
272  }
273 
274  // variable declarations
275  size_t startPage = offset / pageDataSize_;
276  size_t startPageOffset = offset % pageDataSize_;
277  size_t numPagesToRead =
278  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
279  /*
280  if (startPage + numPagesToRead > multiPages_.size()) {
281  cout << "Start page: " << startPage << endl;
282  cout << "Num pages to read: " << numPagesToRead << endl;
283  cout << "Num multipages: " << multiPages_.size() << endl;
284  cout << "Offset: " << offset << endl;
285  cout << "Num bytes: " << numBytes << endl;
286  }
287  */
288 
289  CHECK(startPage + numPagesToRead <= multiPages_.size());
290 
291  size_t numPagesPerThread = 0;
292  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
293  size_t bytesRead = 0; // total number of bytes already being read
294  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
295  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
296  size_t numThreads = fm_->getNumReaderThreads();
297  std::vector<readThreadDS>
298  threadDSArr; // array of threadDS, needed to avoid racing conditions
299 
300  if (numPagesToRead > numThreads) {
301  numPagesPerThread = numPagesToRead / numThreads;
302  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
303  } else {
304  numThreads = numPagesToRead;
305  numPagesPerThread = 1;
306  }
307 
308  /* set threadDS for the first thread */
309  readThreadDS threadDS;
310  threadDS.t_fm = fm_;
311  threadDS.t_startPage = offset / pageDataSize_;
312  if (numExtraPages > 0) {
313  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
314  numExtraPages--;
315  } else {
316  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
317  }
318  threadDS.t_curPtr = dst;
319  threadDS.t_startPageOffset = offset % pageDataSize_;
320  threadDS.t_isFirstPage = true;
321 
322  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
323  threadDS.t_startPageOffset),
324  numBytesCurrent);
325  threadDS.t_bytesLeft = bytesLeftForThread;
326  threadDS.multiPages = getMultiPage();
327 
328  if (numThreads == 1) {
329  bytesRead += readForThread(this, threadDS);
330  } else {
331  std::vector<std::future<size_t>> threads;
332 
333  for (size_t i = 0; i < numThreads; i++) {
334  threadDSArr.push_back(threadDS);
335  threads.push_back(
336  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
337 
338  // calculate elements of threadDS
339  threadDS.t_fm = fm_;
340  threadDS.t_isFirstPage = false;
341  threadDS.t_curPtr += bytesLeftForThread;
342  threadDS.t_startPage +=
343  threadDS.t_endPage -
344  threadDS.t_startPage; // based on # of pages read on previous iteration
345  if (numExtraPages > 0) {
346  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
347  numExtraPages--;
348  } else {
349  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
350  }
351  numBytesCurrent -= bytesLeftForThread;
352  bytesLeftForThread = min(
353  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
354  threadDS.t_bytesLeft = bytesLeftForThread;
355  threadDS.multiPages = getMultiPage();
356  }
357 
358  for (auto& p : threads) {
359  p.wait();
360  }
361  for (auto& p : threads) {
362  bytesRead += p.get();
363  }
364  }
365  CHECK(bytesRead == numBytes);
366 }
367 
369  Page& destPage,
370  const size_t numBytes,
371  const size_t offset) {
372  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
373  // FILE *destFile = fm_->files_[destPage.fileId]->f;
374  CHECK(offset + numBytes < pageDataSize_);
375  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
376  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
377 
378  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
379  size_t bytesRead = srcFileInfo->read(
380  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
381  CHECK(bytesRead == numBytes);
382  size_t bytesWritten = destFileInfo->write(
383  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
384  CHECK(bytesWritten == numBytes);
385  free(buffer);
386 }
387 
388 Page FileBuffer::addNewMultiPage(const int32_t epoch) {
389  Page page = fm_->requestFreePage(pageSize_, false);
390  MultiPage multiPage(pageSize_);
391  multiPage.push(page, epoch);
392  multiPages_.emplace_back(multiPage);
393  return page;
394 }
395 
397  const int32_t pageId,
398  const int32_t epoch,
399  const bool writeMetadata) {
400  int32_t intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
401  vector<int32_t> header(intHeaderSize);
402  // in addition to chunkkey we need size of header, pageId, version
403  header[0] =
404  (intHeaderSize - 1) * sizeof(int32_t); // don't need to include size of headerSize
405  // value - sizeof(size_t) is for chunkSize
406  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
407  header[intHeaderSize - 2] = pageId;
408  header[intHeaderSize - 1] = epoch;
409  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
410  size_t pageSize = writeMetadata ? METADATA_PAGE_SIZE : pageSize_;
411  fileInfo->write(
412  page.pageNum * pageSize, (intHeaderSize) * sizeof(int32_t), (int8_t*)&header[0]);
413 }
414 
415 void FileBuffer::readMetadata(const Page& page) {
416  FILE* f = fm_->getFileForFileId(page.fileId);
417  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
418  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
419  fread((int8_t*)&size_, sizeof(size_t), 1, f);
420  vector<int32_t> typeData(
421  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
422  // encodingType, encodingBits all as int
423  fread((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
424  int32_t version = typeData[0];
425  CHECK(version == METADATA_VERSION); // add backward compatibility code here
426  bool has_encoder = static_cast<bool>(typeData[1]);
427  if (has_encoder) {
428  sql_type_.set_type(static_cast<SQLTypes>(typeData[2]));
429  sql_type_.set_subtype(static_cast<SQLTypes>(typeData[3]));
430  sql_type_.set_dimension(typeData[4]);
431  sql_type_.set_scale(typeData[5]);
432  sql_type_.set_notnull(static_cast<bool>(typeData[6]));
433  sql_type_.set_compression(static_cast<EncodingType>(typeData[7]));
434  sql_type_.set_comp_param(typeData[8]);
435  sql_type_.set_size(typeData[9]);
437  encoder_->readMetadata(f);
438  }
439 }
440 
441 void FileBuffer::writeMetadata(const int32_t epoch) {
442  // Right now stats page is size_ (in bytes), bufferType, encodingType,
443  // encodingDataType, numElements
445  writeHeader(page, -1, epoch, true);
446  FILE* f = fm_->getFileForFileId(page.fileId);
447  fseek(f, page.pageNum * METADATA_PAGE_SIZE + reservedHeaderSize_, SEEK_SET);
448  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
449  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
450  vector<int32_t> typeData(
451  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
452  // encodingType, encodingBits all as int32_t
453  typeData[0] = METADATA_VERSION;
454  typeData[1] = static_cast<int32_t>(hasEncoder());
455  if (hasEncoder()) {
456  typeData[2] = static_cast<int32_t>(sql_type_.get_type());
457  typeData[3] = static_cast<int32_t>(sql_type_.get_subtype());
458  typeData[4] = sql_type_.get_dimension();
459  typeData[5] = sql_type_.get_scale();
460  typeData[6] = static_cast<int32_t>(sql_type_.get_notnull());
461  typeData[7] = static_cast<int32_t>(sql_type_.get_compression());
462  typeData[8] = sql_type_.get_comp_param();
463  typeData[9] = sql_type_.get_size();
464  }
465  fwrite((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
466  if (hasEncoder()) { // redundant
467  encoder_->writeMetadata(f);
468  }
469  metadataPages_.push(page, epoch);
470 }
471 
472 void FileBuffer::append(int8_t* src,
473  const size_t numBytes,
474  const MemoryLevel srcBufferType,
475  const int32_t deviceId) {
476  setAppended();
477 
478  size_t startPage = size_ / pageDataSize_;
479  size_t startPageOffset = size_ % pageDataSize_;
480  size_t numPagesToWrite =
481  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
482  size_t bytesLeft = numBytes;
483  int8_t* curPtr = src; // a pointer to the current location in dst being written to
484  size_t initialNumPages = multiPages_.size();
485  size_ = size_ + numBytes;
486  int32_t epoch = fm_->epoch();
487  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
488  Page page;
489  if (pageNum >= initialNumPages) {
490  page = addNewMultiPage(epoch);
491  writeHeader(page, pageNum, epoch);
492  } else {
493  // we already have a new page at current
494  // epoch for this page - just grab this page
495  page = multiPages_[pageNum].current().page;
496  }
497  CHECK(page.fileId >= 0); // make sure page was initialized
498  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
499  size_t bytesWritten;
500  if (pageNum == startPage) {
501  bytesWritten = fileInfo->write(
502  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
503  min(pageDataSize_ - startPageOffset, bytesLeft),
504  curPtr);
505  } else {
506  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
507  min(pageDataSize_, bytesLeft),
508  curPtr);
509  }
510  curPtr += bytesWritten;
511  bytesLeft -= bytesWritten;
512  }
513  CHECK(bytesLeft == 0);
514 }
515 
516 void FileBuffer::write(int8_t* src,
517  const size_t numBytes,
518  const size_t offset,
519  const MemoryLevel srcBufferType,
520  const int32_t deviceId) {
521  if (srcBufferType != CPU_LEVEL) {
522  LOG(FATAL) << "Unsupported Buffer type";
523  }
524 
525  bool tempIsAppended = false;
526  setDirty();
527  if (offset < size_) {
528  setUpdated();
529  }
530  if (offset + numBytes > size_) {
531  tempIsAppended = true; // because is_appended_ could have already been true - to
532  // avoid rewriting header
533  setAppended();
534  size_ = offset + numBytes;
535  }
536 
537  size_t startPage = offset / pageDataSize_;
538  size_t startPageOffset = offset % pageDataSize_;
539  size_t numPagesToWrite =
540  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
541  size_t bytesLeft = numBytes;
542  int8_t* curPtr = src; // a pointer to the current location in dst being written to
543  size_t initialNumPages = multiPages_.size();
544  int32_t epoch = fm_->epoch();
545 
546  if (startPage >
547  initialNumPages) { // means there is a gap we need to allocate pages for
548  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
549  Page page = addNewMultiPage(epoch);
550  writeHeader(page, pageNum, epoch);
551  }
552  }
553  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
554  Page page;
555  if (pageNum >= initialNumPages) {
556  page = addNewMultiPage(epoch);
557  writeHeader(page, pageNum, epoch);
558  } else if (multiPages_[pageNum].current().epoch <
559  epoch) { // need to create new page b/c this current one lags epoch and we
560  // can't overwrite it also need to copy if we are on first or
561  // last page
562  Page lastPage = multiPages_[pageNum].current().page;
563  page = fm_->requestFreePage(pageSize_, false);
564  multiPages_[pageNum].push(page, epoch);
565  if (pageNum == startPage && startPageOffset > 0) {
566  // copyPage takes care of header offset so don't worry
567  // about it
568  copyPage(lastPage, page, startPageOffset, 0);
569  }
570  if (pageNum == startPage + numPagesToWrite &&
571  bytesLeft > 0) { // bytesLeft should always > 0
572  copyPage(lastPage,
573  page,
574  pageDataSize_ - bytesLeft,
575  bytesLeft); // these would be empty if we're appending but we won't
576  // worry about it right now
577  }
578  writeHeader(page, pageNum, epoch);
579  } else {
580  // we already have a new page at current
581  // epoch for this page - just grab this page
582  page = multiPages_[pageNum].current().page;
583  }
584  CHECK(page.fileId >= 0); // make sure page was initialized
585  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
586  size_t bytesWritten;
587  if (pageNum == startPage) {
588  bytesWritten = fileInfo->write(
589  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
590  min(pageDataSize_ - startPageOffset, bytesLeft),
591  curPtr);
592  } else {
593  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
594  min(pageDataSize_, bytesLeft),
595  curPtr);
596  }
597  curPtr += bytesWritten;
598  bytesLeft -= bytesWritten;
599  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
600  //@todo below can lead to undefined - we're overwriting num
601  // bytes valid at checkpoint
602  writeHeader(page, 0, multiPages_[0].current().epoch, true);
603  }
604  }
605  CHECK(bytesLeft == 0);
606 }
607 
608 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:141
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:312
void set_compression(EncodingType c)
Definition: sqltypes.h:411
void set_size(int s)
Definition: sqltypes.h:409
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
Definition: FileBuffer.cpp:191
void freePage(const Page &page, const bool isRolloff)
Definition: FileBuffer.cpp:156
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:871
void freePagesBeforeEpoch(const int32_t targetEpoch)
Definition: FileBuffer.cpp:201
static size_t headerBufferOffset_
Definition: FileBuffer.h:174
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:224
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
void pop()
Purges the oldest Page.
Definition: Page.h:100
#define LOG(tag)
Definition: Logger.h:188
Page addNewMultiPage(const int32_t epoch)
Definition: FileBuffer.cpp:388
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
void writeMetadata(const int32_t epoch)
Definition: FileBuffer.cpp:441
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:516
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:402
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:176
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:221
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:56
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:60
std::deque< EpochedPage > pageVersions
Definition: Page.h:74
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
int32_t fileId
Definition: Page.h:47
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:131
string version
Definition: setup.py:56
void set_scale(int s)
Definition: sqltypes.h:406
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:988
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:472
void freePage(int32_t pageId, const bool isRolloff)
Definition: FileInfo.cpp:199
void set_comp_param(int p)
Definition: sqltypes.h:412
FileInfo * getFileInfoForFileId(const int32_t fileId)
Definition: FileMgr.h:187
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:396
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:138
#define CHECK_LE(x, y)
Definition: Logger.h:208
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:403
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:95
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:130
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:66
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:313
std::unique_ptr< Encoder > encoder_
int32_t epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:245
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:39
void set_notnull(bool n)
Definition: sqltypes.h:408
#define CHECK(condition)
Definition: Logger.h:197
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:135
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
Definition: Page.h:107
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:265
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:415
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:368
EpochedPage current() const
Returns a reference to the most recent version of the page.
Definition: Page.h:87
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:134
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:72
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:278
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:401