OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <future>
26 #include <map>
27 #include <thread>
28 #include <utility> // std::pair
29 
31 #include "Shared/File.h"
32 #include "Shared/checked_alloc.h"
33 
34 using namespace std;
35 
36 namespace File_Namespace {
37 
38 FileBuffer::FileBuffer(FileMgr* fm,
39  const size_t pageSize,
40  const ChunkKey& chunkKey,
41  const size_t initialSize)
42  : AbstractBuffer(fm->getDeviceId())
43  , fm_(fm)
44  , metadataPageSize_(fm_->getMetadataPageSize())
45  , metadataPages_(metadataPageSize_)
46  , pageSize_(pageSize)
47  , chunkKey_(chunkKey) {
48  // Create a new FileBuffer
49  CHECK(fm_);
53  //@todo reintroduce initialSize - need to develop easy way of
54  // differentiating these pre-allocated pages from "written-to" pages
55  /*
56  if (initalSize > 0) {
57  // should expand to initialSize bytes
58  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
59  int32_t epoch = fm_->epoch();
60  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
61  Page page = addNewMultiPage(epoch);
62  writeHeader(page,pageNum,epoch);
63  }
64  }
65  */
66 }
67 
69  const size_t pageSize,
70  const ChunkKey& chunkKey,
71  const SQLTypeInfo sqlType,
72  const size_t initialSize)
73  : AbstractBuffer(fm->getDeviceId(), sqlType)
74  , fm_(fm)
75  , metadataPageSize_(fm->getMetadataPageSize())
76  , metadataPages_(metadataPageSize_)
77  , pageSize_(pageSize)
78  , chunkKey_(chunkKey) {
79  CHECK(fm_);
82 }
83 
85  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
86  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
87  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
88  : AbstractBuffer(fm->getDeviceId())
89  , fm_(fm)
90  , metadataPageSize_(fm->getMetadataPageSize())
91  , metadataPages_(metadataPageSize_)
92  , pageSize_(0)
93  , chunkKey_(chunkKey) {
94  // We are being assigned an existing FileBuffer on disk
95 
96  CHECK(fm_);
98  int32_t lastPageId = -1;
99  int32_t curPageId = 0;
100  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
101  curPageId = vecIt->pageId;
102 
103  // We only want to read last metadata page
104  if (curPageId == -1) { // stats page
105  metadataPages_.push(vecIt->page, vecIt->versionEpoch);
106  } else {
107  if (curPageId != lastPageId) {
108  // protect from bad data on disk, and give diagnostics
109  if (fm->failOnReadError()) {
110  if (curPageId != lastPageId + 1) {
111  LOG(FATAL) << "Failure reading DB file " << show_chunk(chunkKey)
112  << " Current page " << curPageId << " last page " << lastPageId
113  << " epoch " << vecIt->versionEpoch;
114  }
115  }
116  if (lastPageId == -1) { // If we are on first real page
118  }
119  MultiPage multiPage(pageSize_);
120  multiPages_.push_back(multiPage);
121  lastPageId = curPageId;
122  }
123  multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
124  }
125  }
126  if (curPageId == -1) { // meaning there was only a metadata page
128  }
129 }
130 
132  // need to free pages
133  // NOP
134 }
135 
136 void FileBuffer::reserve(const size_t numBytes) {
137  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
138  size_t numCurrentPages = multiPages_.size();
139  auto epoch = getFileMgrEpoch();
140  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
141  Page page = addNewMultiPage(epoch);
142  writeHeader(page, pageNum, epoch);
143  }
144 }
145 
147  // 3 * sizeof(int32_t) is for headerSize, for pageId and versionEpoch
148  // sizeof(size_t) is for chunkSize
149  reservedHeaderSize_ = (chunkKey_.size() + 3) * sizeof(int32_t);
150  size_t headerMod = reservedHeaderSize_ % headerBufferOffset_;
151  if (headerMod > 0) {
153  }
154 }
155 
156 void FileBuffer::freePage(const Page& page) {
157  freePage(page, false);
158 }
159 
160 void FileBuffer::freePage(const Page& page, const bool isRolloff) {
161  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
162  fileInfo->freePage(page.pageNum, isRolloff, getFileMgrEpoch());
163 }
164 
166  size_t num_pages_freed = metadataPages_.pageVersions.size();
167  for (auto metaPageIt = metadataPages_.pageVersions.begin();
168  metaPageIt != metadataPages_.pageVersions.end();
169  ++metaPageIt) {
170  freePage(metaPageIt->page, false /* isRolloff */);
171  }
172  while (metadataPages_.pageVersions.size() > 0) {
174  }
175  return num_pages_freed;
176 }
177 
179  size_t num_pages_freed = multiPages_.size();
180  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
181  ++multiPageIt) {
182  for (auto pageIt = multiPageIt->pageVersions.begin();
183  pageIt != multiPageIt->pageVersions.end();
184  ++pageIt) {
185  freePage(pageIt->page, false /* isRolloff */);
186  }
187  }
188  multiPages_.clear();
189  return num_pages_freed;
190 }
191 
193  return freeMetadataPages() + freeChunkPages();
194 }
195 
197  const int32_t targetEpoch,
198  const int32_t currentEpoch) {
199  std::vector<EpochedPage> epochedPagesToFree =
200  multiPage.freePagesBeforeEpoch(targetEpoch, currentEpoch);
201  for (const auto& epochedPageToFree : epochedPagesToFree) {
202  freePage(epochedPageToFree.page, true /* isRolloff */);
203  }
204 }
205 
206 void FileBuffer::freePagesBeforeEpoch(const int32_t targetEpoch) {
207  // This method is only safe to be called within a checkpoint, after the sync and epoch
208  // increment where a failure at any point32_t in the process would lead to a safe
209  // rollback
210  auto currentEpoch = getFileMgrEpoch();
211  CHECK_LE(targetEpoch, currentEpoch);
212  freePagesBeforeEpochForMultiPage(metadataPages_, targetEpoch, currentEpoch);
213  for (auto& multiPage : multiPages_) {
214  freePagesBeforeEpochForMultiPage(multiPage, targetEpoch, currentEpoch);
215  }
216 
217  // Check if all buffer pages can be freed
218  if (size_ == 0) {
219  size_t max_historical_buffer_size{0};
220  for (auto& epoch_page : metadataPages_.pageVersions) {
221  // Create buffer that is used to get the buffer size at the epoch version
222  FileBuffer buffer{fm_, pageSize_, chunkKey_};
223  buffer.readMetadata(epoch_page.page);
224  max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
225  }
226 
227  // Free all chunk pages, if none of the old chunk versions has any data
228  if (max_historical_buffer_size == 0) {
229  freeChunkPages();
230  }
231  }
232 }
233 
234 struct readThreadDS {
235  FileMgr* t_fm; // ptr to FileMgr
236  size_t t_startPage; // start page for the thread
237  size_t t_endPage; // last page for the thread
238  int8_t* t_curPtr; // pointer to the current location of the target for the thread
239  size_t t_bytesLeft; // number of bytes to be read in the thread
240  size_t t_startPageOffset; // offset - used for the first page of the buffer
241  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
242  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
243 };
244 
245 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
246  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
247  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
248  int8_t* curPtr = threadDS.t_curPtr;
249  size_t bytesLeft = threadDS.t_bytesLeft;
250  size_t totalBytesRead = 0;
251  bool isFirstPage = threadDS.t_isFirstPage;
252 
253  // Traverse the logical pages
254  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
255  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
256  Page page = threadDS.multiPages[pageNum].current().page;
257 
258  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
259  CHECK(fileInfo);
260 
261  // Read the page into the destination (dst) buffer at its
262  // current (cur) location
263  size_t bytesRead = 0;
264  if (isFirstPage) {
265  bytesRead = fileInfo->read(
266  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
267  fileBuffer->reservedHeaderSize(),
268  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
269  curPtr);
270  isFirstPage = false;
271  } else {
272  bytesRead = fileInfo->read(
273  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
274  min(fileBuffer->pageDataSize(), bytesLeft),
275  curPtr);
276  }
277  curPtr += bytesRead;
278  bytesLeft -= bytesRead;
279  totalBytesRead += bytesRead;
280  }
281  CHECK(bytesLeft == 0);
282 
283  return (totalBytesRead);
284 }
285 
286 void FileBuffer::read(int8_t* const dst,
287  const size_t numBytes,
288  const size_t offset,
289  const MemoryLevel dstBufferType,
290  const int32_t deviceId) {
291  if (dstBufferType != CPU_LEVEL) {
292  LOG(FATAL) << "Unsupported Buffer type";
293  }
294 
295  // variable declarations
296  size_t startPage = offset / pageDataSize_;
297  size_t startPageOffset = offset % pageDataSize_;
298  size_t numPagesToRead =
299  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
300  /*
301  if (startPage + numPagesToRead > multiPages_.size()) {
302  cout << "Start page: " << startPage << endl;
303  cout << "Num pages to read: " << numPagesToRead << endl;
304  cout << "Num multipages: " << multiPages_.size() << endl;
305  cout << "Offset: " << offset << endl;
306  cout << "Num bytes: " << numBytes << endl;
307  }
308  */
309 
310  CHECK(startPage + numPagesToRead <= multiPages_.size());
311 
312  size_t numPagesPerThread = 0;
313  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
314  size_t bytesRead = 0; // total number of bytes already being read
315  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
316  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
317  size_t numThreads = fm_->getNumReaderThreads();
318  std::vector<readThreadDS>
319  threadDSArr; // array of threadDS, needed to avoid racing conditions
320 
321  if (numPagesToRead > numThreads) {
322  numPagesPerThread = numPagesToRead / numThreads;
323  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
324  } else {
325  numThreads = numPagesToRead;
326  numPagesPerThread = 1;
327  }
328 
329  /* set threadDS for the first thread */
330  readThreadDS threadDS;
331  threadDS.t_fm = fm_;
332  threadDS.t_startPage = offset / pageDataSize_;
333  if (numExtraPages > 0) {
334  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
335  numExtraPages--;
336  } else {
337  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
338  }
339  threadDS.t_curPtr = dst;
340  threadDS.t_startPageOffset = offset % pageDataSize_;
341  threadDS.t_isFirstPage = true;
342 
343  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
344  threadDS.t_startPageOffset),
345  numBytesCurrent);
346  threadDS.t_bytesLeft = bytesLeftForThread;
347  threadDS.multiPages = getMultiPage();
348 
349  if (numThreads == 1) {
350  bytesRead += readForThread(this, threadDS);
351  } else {
352  std::vector<std::future<size_t>> threads;
353 
354  for (size_t i = 0; i < numThreads; i++) {
355  threadDSArr.push_back(threadDS);
356  threads.push_back(
357  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
358 
359  // calculate elements of threadDS
360  threadDS.t_fm = fm_;
361  threadDS.t_isFirstPage = false;
362  threadDS.t_curPtr += bytesLeftForThread;
363  threadDS.t_startPage +=
364  threadDS.t_endPage -
365  threadDS.t_startPage; // based on # of pages read on previous iteration
366  if (numExtraPages > 0) {
367  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
368  numExtraPages--;
369  } else {
370  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
371  }
372  numBytesCurrent -= bytesLeftForThread;
373  bytesLeftForThread = min(
374  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
375  threadDS.t_bytesLeft = bytesLeftForThread;
376  threadDS.multiPages = getMultiPage();
377  }
378 
379  for (auto& p : threads) {
380  p.wait();
381  }
382  for (auto& p : threads) {
383  bytesRead += p.get();
384  }
385  }
386  CHECK(bytesRead == numBytes);
387 }
388 
390  Page& destPage,
391  const size_t numBytes,
392  const size_t offset) {
393  // FILE *srcFile = fm_->files_[srcPage.fileId]->f;
394  // FILE *destFile = fm_->files_[destPage.fileId]->f;
395  CHECK_LE(offset + numBytes, pageDataSize_);
396  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
397  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
398 
399  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
400  size_t bytesRead = srcFileInfo->read(
401  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
402  CHECK(bytesRead == numBytes);
403  size_t bytesWritten = destFileInfo->write(
404  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
405  CHECK(bytesWritten == numBytes);
406  free(buffer);
407 }
408 
409 Page FileBuffer::addNewMultiPage(const int32_t epoch) {
410  Page page = fm_->requestFreePage(pageSize_, false);
411  MultiPage multiPage(pageSize_);
412  multiPage.push(page, epoch);
413  multiPages_.emplace_back(multiPage);
414  return page;
415 }
416 
418  const int32_t pageId,
419  const int32_t epoch,
420  const bool writeMetadata) {
421  int32_t intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
422  vector<int32_t> header(intHeaderSize);
423  // in addition to chunkkey we need size of header, pageId, version
424  header[0] =
425  (intHeaderSize - 1) * sizeof(int32_t); // don't need to include size of headerSize
426  // value - sizeof(size_t) is for chunkSize
427  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
428  header[intHeaderSize - 2] = pageId;
429  header[intHeaderSize - 1] = epoch;
430  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
431  size_t pageSize = writeMetadata ? metadataPageSize_ : pageSize_;
432  fileInfo->write(
433  page.pageNum * pageSize, (intHeaderSize) * sizeof(int32_t), (int8_t*)&header[0]);
434 }
435 
436 void FileBuffer::readMetadata(const Page& page) {
437  FILE* f = fm_->getFileForFileId(page.fileId);
438  fseek(f, page.pageNum * metadataPageSize_ + reservedHeaderSize_, SEEK_SET);
439  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
440  fread((int8_t*)&size_, sizeof(size_t), 1, f);
441  vector<int32_t> typeData(
442  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
443  // encodingType, encodingBits all as int
444  fread((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
445  int32_t version = typeData[0];
446  CHECK(version == METADATA_VERSION); // add backward compatibility code here
447  bool has_encoder = static_cast<bool>(typeData[1]);
448  if (has_encoder) {
449  sql_type_.set_type(static_cast<SQLTypes>(typeData[2]));
450  sql_type_.set_subtype(static_cast<SQLTypes>(typeData[3]));
451  sql_type_.set_dimension(typeData[4]);
452  sql_type_.set_scale(typeData[5]);
453  sql_type_.set_notnull(static_cast<bool>(typeData[6]));
454  sql_type_.set_compression(static_cast<EncodingType>(typeData[7]));
455  sql_type_.set_comp_param(typeData[8]);
456  sql_type_.set_size(typeData[9]);
458  encoder_->readMetadata(f);
459  }
460 }
461 
462 void FileBuffer::writeMetadata(const int32_t epoch) {
463  // Right now stats page is size_ (in bytes), bufferType, encodingType,
464  // encodingDataType, numElements
465  Page page = fm_->requestFreePage(metadataPageSize_, true);
466  writeHeader(page, -1, epoch, true);
467  FILE* f = fm_->getFileForFileId(page.fileId);
468  fseek(f, page.pageNum * metadataPageSize_ + reservedHeaderSize_, SEEK_SET);
469  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
470  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
471  vector<int32_t> typeData(
472  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
473  // encodingType, encodingBits all as int32_t
474  typeData[0] = METADATA_VERSION;
475  typeData[1] = static_cast<int32_t>(hasEncoder());
476  if (hasEncoder()) {
477  typeData[2] = static_cast<int32_t>(sql_type_.get_type());
478  typeData[3] = static_cast<int32_t>(sql_type_.get_subtype());
479  typeData[4] = sql_type_.get_dimension();
480  typeData[5] = sql_type_.get_scale();
481  typeData[6] = static_cast<int32_t>(sql_type_.get_notnull());
482  typeData[7] = static_cast<int32_t>(sql_type_.get_compression());
483  typeData[8] = sql_type_.get_comp_param();
484  typeData[9] = sql_type_.get_size();
485  }
486  fwrite((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
487  if (hasEncoder()) { // redundant
488  encoder_->writeMetadata(f);
489  }
490  metadataPages_.push(page, epoch);
491 }
492 
493 void FileBuffer::append(int8_t* src,
494  const size_t numBytes,
495  const MemoryLevel srcBufferType,
496  const int32_t deviceId) {
497  setAppended();
498 
499  size_t startPage = size_ / pageDataSize_;
500  size_t startPageOffset = size_ % pageDataSize_;
501  size_t numPagesToWrite =
502  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
503  size_t bytesLeft = numBytes;
504  int8_t* curPtr = src; // a pointer to the current location in dst being written to
505  size_t initialNumPages = multiPages_.size();
506  size_ = size_ + numBytes;
507  auto epoch = getFileMgrEpoch();
508  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
509  Page page;
510  if (pageNum >= initialNumPages) {
511  page = addNewMultiPage(epoch);
512  writeHeader(page, pageNum, epoch);
513  } else {
514  // we already have a new page at current
515  // epoch for this page - just grab this page
516  page = multiPages_[pageNum].current().page;
517  }
518  CHECK(page.fileId >= 0); // make sure page was initialized
519  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
520  size_t bytesWritten;
521  if (pageNum == startPage) {
522  bytesWritten = fileInfo->write(
523  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
524  min(pageDataSize_ - startPageOffset, bytesLeft),
525  curPtr);
526  } else {
527  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
528  min(pageDataSize_, bytesLeft),
529  curPtr);
530  }
531  curPtr += bytesWritten;
532  bytesLeft -= bytesWritten;
533  }
534  CHECK(bytesLeft == 0);
535 }
536 
537 void FileBuffer::write(int8_t* src,
538  const size_t numBytes,
539  const size_t offset,
540  const MemoryLevel srcBufferType,
541  const int32_t deviceId) {
542  CHECK(srcBufferType == CPU_LEVEL) << "Unsupported Buffer type";
543 
544  bool tempIsAppended = false;
545  setDirty();
546  if (offset < size_) {
547  setUpdated();
548  }
549  if (offset + numBytes > size_) {
550  tempIsAppended = true; // because is_appended_ could have already been true - to
551  // avoid rewriting header
552  setAppended();
553  size_ = offset + numBytes;
554  }
555 
556  size_t startPage = offset / pageDataSize_;
557  size_t startPageOffset = offset % pageDataSize_;
558  size_t numPagesToWrite =
559  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
560  size_t bytesLeft = numBytes;
561  int8_t* curPtr = src; // a pointer to the current location in dst being written to
562  size_t initialNumPages = multiPages_.size();
563  auto epoch = getFileMgrEpoch();
564 
565  if (startPage >
566  initialNumPages) { // means there is a gap we need to allocate pages for
567  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
568  Page page = addNewMultiPage(epoch);
569  writeHeader(page, pageNum, epoch);
570  }
571  }
572  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
573  Page page;
574  if (pageNum >= initialNumPages) {
575  page = addNewMultiPage(epoch);
576  writeHeader(page, pageNum, epoch);
577  } else if (multiPages_[pageNum].current().epoch <
578  epoch) { // need to create new page b/c this current one lags epoch and we
579  // can't overwrite it also need to copy if we are on first or
580  // last page
581  Page lastPage = multiPages_[pageNum].current().page;
582  page = fm_->requestFreePage(pageSize_, false);
583  multiPages_[pageNum].push(page, epoch);
584  if (pageNum == startPage && startPageOffset > 0) {
585  // copyPage takes care of header offset so don't worry
586  // about it
587  copyPage(lastPage, page, startPageOffset, 0);
588  }
589  if (pageNum == (startPage + numPagesToWrite - 1) &&
590  bytesLeft > 0) { // bytesLeft should always > 0
591  copyPage(lastPage,
592  page,
593  pageDataSize_ - bytesLeft,
594  bytesLeft); // these would be empty if we're appending but we won't
595  // worry about it right now
596  }
597  writeHeader(page, pageNum, epoch);
598  } else {
599  // we already have a new page at current
600  // epoch for this page - just grab this page
601  page = multiPages_[pageNum].current().page;
602  }
603  CHECK(page.fileId >= 0); // make sure page was initialized
604  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
605  size_t bytesWritten;
606  if (pageNum == startPage) {
607  bytesWritten = fileInfo->write(
608  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
609  min(pageDataSize_ - startPageOffset, bytesLeft),
610  curPtr);
611  } else {
612  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
613  min(pageDataSize_, bytesLeft),
614  curPtr);
615  }
616  curPtr += bytesWritten;
617  bytesLeft -= bytesWritten;
618  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
619  //@todo below can lead to undefined - we're overwriting num
620  // bytes valid at checkpoint
621  writeHeader(page, 0, multiPages_[0].current().epoch, true);
622  }
623  }
624  CHECK(bytesLeft == 0);
625 }
626 
628  auto [db_id, tb_id] = get_table_prefix(chunkKey_);
629  return fm_->epoch(db_id, tb_id);
630 }
631 
632 std::string FileBuffer::dump() const {
633  std::stringstream ss;
634  ss << "chunk_key = " << show_chunk(chunkKey_) << "\n";
635  ss << "has_encoder = " << (hasEncoder() ? "true\n" : "false\n");
636  ss << "size_ = " << size_ << "\n";
637  return ss.str();
638 }
639 
641  CHECK(metadataPages_.current().page.fileId != -1); // was initialized
644 }
645 
647  // Detect the case where a page is missing by comparing the amount of pages read
648  // with the metadata size.
649  return ((size() + pageDataSize_ - 1) / pageDataSize_ != multiPages_.size());
650 }
651 
653  size_t total_size = 0;
654  for (const auto& multi_page : multiPages_) {
655  total_size += multi_page.pageVersions.size();
656  }
657  return total_size;
658 }
659 
660 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:147
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:405
void set_compression(EncodingType c)
Definition: sqltypes.h:525
void set_size(int s)
Definition: sqltypes.h:522
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr&#39;s epoch instead of finding a table-specific epoch.
Definition: FileMgr.h:277
std::vector< int > ChunkKey
Definition: types.h:36
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:63
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
Definition: FileBuffer.cpp:196
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:875
void freePagesBeforeEpoch(const int32_t targetEpoch)
Definition: FileBuffer.cpp:206
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:245
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
void pop()
Purges the oldest Page.
Definition: Page.h:110
#define LOG(tag)
Definition: Logger.h:216
size_t numChunkPages() const
Definition: FileBuffer.cpp:652
Page addNewMultiPage(const int32_t epoch)
Definition: FileBuffer.cpp:409
HOST DEVICE int get_scale() const
Definition: sqltypes.h:409
void writeMetadata(const int32_t epoch)
Definition: FileBuffer.cpp:462
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:537
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:515
const size_t metadataPageSize_
Definition: FileBuffer.h:196
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:198
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:242
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
#define CHECK_GT(x, y)
Definition: Logger.h:234
void initEncoder(const SQLTypeInfo &tmp_sql_type)
constexpr double f
Definition: Utm.h:31
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:185
std::deque< EpochedPage > pageVersions
Definition: Page.h:81
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
int32_t fileId
Definition: Page.h:47
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:137
string version
Definition: setup.in.py:73
void set_scale(int s)
Definition: sqltypes.h:519
std::string dump() const
Definition: FileBuffer.cpp:632
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:988
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:493
void set_comp_param(int p)
Definition: sqltypes.h:526
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:417
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:144
#define CHECK_LE(x, y)
Definition: Logger.h:233
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:412
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:516
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:131
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:69
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:406
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
Definition: FileMgr.h:363
std::unique_ptr< Encoder > encoder_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:413
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:38
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
void set_notnull(bool n)
Definition: sqltypes.h:521
#define CHECK(condition)
Definition: Logger.h:222
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:136
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:222
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
Definition: Page.h:117
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:411
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:286
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:436
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:389
EpochedPage current() const
Returns a reference to the most recent version of the page.
Definition: Page.h:94
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:140
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:312
void freePage(const Page &page)
Definition: FileBuffer.cpp:156
bool isMissingPages() const
Definition: FileBuffer.cpp:646
static constexpr size_t headerBufferOffset_
Definition: FileBuffer.h:165
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:514