36 namespace File_Namespace {
39 const size_t pageSize,
41 const size_t initialSize)
44 , metadataPageSize_(fm_->getMetadataPageSize())
45 , metadataPages_(metadataPageSize_)
47 , chunkKey_(chunkKey) {
69 const size_t pageSize,
72 const size_t initialSize)
75 , metadataPageSize_(fm->getMetadataPageSize())
76 , metadataPages_(metadataPageSize_)
78 , chunkKey_(chunkKey) {
86 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
87 const std::vector<HeaderInfo>::const_iterator& headerEndIt)
90 , metadataPageSize_(fm->getMetadataPageSize())
91 , metadataPages_(metadataPageSize_)
93 , chunkKey_(chunkKey) {
98 int32_t lastPageId = -1;
99 int32_t curPageId = 0;
100 for (
auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
101 curPageId = vecIt->pageId;
104 if (curPageId == -1) {
107 if (curPageId != lastPageId) {
110 if (curPageId != lastPageId + 1) {
112 <<
" Current page " << curPageId <<
" last page " << lastPageId
113 <<
" epoch " << vecIt->versionEpoch;
116 if (lastPageId == -1) {
121 lastPageId = curPageId;
123 multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
126 if (curPageId == -1) {
140 for (
size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
175 return num_pages_freed;
182 for (
auto pageIt = multiPageIt->pageVersions.begin();
183 pageIt != multiPageIt->pageVersions.end();
189 return num_pages_freed;
197 const int32_t targetEpoch,
198 const int32_t currentEpoch) {
199 std::vector<EpochedPage> epochedPagesToFree =
201 for (
const auto& epochedPageToFree : epochedPagesToFree) {
202 freePage(epochedPageToFree.page,
true );
211 CHECK_LE(targetEpoch, currentEpoch);
219 size_t max_historical_buffer_size{0};
223 buffer.readMetadata(epoch_page.page);
224 max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
228 if (max_historical_buffer_size == 0) {
250 size_t totalBytesRead = 0;
254 for (
size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
263 size_t bytesRead = 0;
265 bytesRead = fileInfo->
read(
272 bytesRead = fileInfo->
read(
278 bytesLeft -= bytesRead;
279 totalBytesRead += bytesRead;
281 CHECK(bytesLeft == 0);
283 return (totalBytesRead);
287 const size_t numBytes,
290 const int32_t deviceId) {
292 LOG(
FATAL) <<
"Unsupported Buffer type";
298 size_t numPagesToRead =
312 size_t numPagesPerThread = 0;
313 size_t numBytesCurrent = numBytes;
314 size_t bytesRead = 0;
315 size_t bytesLeftForThread = 0;
316 size_t numExtraPages = 0;
318 std::vector<readThreadDS>
321 if (numPagesToRead > numThreads) {
322 numPagesPerThread = numPagesToRead / numThreads;
323 numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
325 numThreads = numPagesToRead;
326 numPagesPerThread = 1;
333 if (numExtraPages > 0) {
349 if (numThreads == 1) {
352 std::vector<std::future<size_t>> threads;
354 for (
size_t i = 0; i < numThreads; i++) {
355 threadDSArr.push_back(threadDS);
362 threadDS.
t_curPtr += bytesLeftForThread;
366 if (numExtraPages > 0) {
372 numBytesCurrent -= bytesLeftForThread;
373 bytesLeftForThread = min(
379 for (
auto& p : threads) {
382 for (
auto& p : threads) {
383 bytesRead += p.get();
386 CHECK(bytesRead == numBytes);
391 const size_t numBytes,
392 const size_t offset) {
399 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
400 size_t bytesRead = srcFileInfo->
read(
402 CHECK(bytesRead == numBytes);
403 size_t bytesWritten = destFileInfo->
write(
405 CHECK(bytesWritten == numBytes);
412 multiPage.
push(page, epoch);
418 const int32_t pageId,
420 const bool writeMetadata) {
421 int32_t intHeaderSize =
chunkKey_.size() + 3;
422 vector<int32_t> header(intHeaderSize);
425 (intHeaderSize - 1) *
sizeof(int32_t);
428 header[intHeaderSize - 2] = pageId;
429 header[intHeaderSize - 1] = epoch;
433 page.
pageNum * pageSize, (intHeaderSize) *
sizeof(int32_t), (int8_t*)&header[0]);
439 fread((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
440 fread((int8_t*)&
size_,
sizeof(
size_t), 1, f);
441 vector<int32_t> typeData(
444 fread((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
447 bool has_encoder =
static_cast<bool>(typeData[1]);
469 fwrite((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
470 fwrite((int8_t*)&
size_,
sizeof(
size_t), 1, f);
471 vector<int32_t> typeData(
475 typeData[1] =
static_cast<int32_t
>(
hasEncoder());
486 fwrite((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
494 const size_t numBytes,
496 const int32_t deviceId) {
501 size_t numPagesToWrite =
503 size_t bytesLeft = numBytes;
504 int8_t* curPtr = src;
508 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
510 if (pageNum >= initialNumPages) {
521 if (pageNum == startPage) {
522 bytesWritten = fileInfo->
write(
531 curPtr += bytesWritten;
532 bytesLeft -= bytesWritten;
534 CHECK(bytesLeft == 0);
538 const size_t numBytes,
541 const int32_t deviceId) {
544 bool tempIsAppended =
false;
546 if (offset <
size_) {
549 if (offset + numBytes >
size_) {
550 tempIsAppended =
true;
553 size_ = offset + numBytes;
558 size_t numPagesToWrite =
560 size_t bytesLeft = numBytes;
561 int8_t* curPtr = src;
567 for (
size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
572 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
574 if (pageNum >= initialNumPages) {
584 if (pageNum == startPage && startPageOffset > 0) {
587 copyPage(lastPage, page, startPageOffset, 0);
589 if (pageNum == (startPage + numPagesToWrite - 1) &&
606 if (pageNum == startPage) {
607 bytesWritten = fileInfo->
write(
616 curPtr += bytesWritten;
617 bytesLeft -= bytesWritten;
618 if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) {
624 CHECK(bytesLeft == 0);
633 std::stringstream ss;
635 ss <<
"has_encoder = " << (
hasEncoder() ?
"true\n" :
"false\n");
636 ss <<
"size_ = " <<
size_ <<
"\n";
653 size_t total_size = 0;
655 total_size += multi_page.pageVersions.size();
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr's epoch instead of finding a table-specific epoch.
std::vector< int > ChunkKey
size_t write(const size_t offset, const size_t size, const int8_t *buf)
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
HOST DEVICE int get_size() const
size_t reservedHeaderSize_
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
void freePagesBeforeEpoch(const int32_t targetEpoch)
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
A logical page (Page) belongs to a file on disk.
void pop()
Purges the oldest Page.
size_t numChunkPages() const
Page addNewMultiPage(const int32_t epoch)
HOST DEVICE int get_scale() const
void writeMetadata(const int32_t epoch)
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
HOST DEVICE void set_subtype(SQLTypes st)
const size_t metadataPageSize_
std::vector< MultiPage > multiPages_
std::vector< MultiPage > multiPages
HOST DEVICE SQLTypes get_type() const
Represents/provides access to contiguous data stored in the file system.
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
void initMetadataAndPageDataSize()
std::deque< EpochedPage > pageVersions
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void set_comp_param(int p)
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
virtual size_t reservedHeaderSize() const
HOST DEVICE EncodingType get_compression() const
void set_dimension(int d)
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
~FileBuffer() override
Destructor.
size_t read(const size_t offset, const size_t size, int8_t *buf)
HOST DEVICE int get_dimension() const
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
std::unique_ptr< Encoder > encoder_
size_t freeMetadataPages()
HOST DEVICE int get_comp_param() const
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void reserve(const size_t numBytes) override
FileInfo * getFileInfoForFileId(const int32_t fileId) const
int32_t getFileMgrEpoch()
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
HOST DEVICE bool get_notnull() const
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void readMetadata(const Page &page)
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
EpochedPage current() const
Returns a reference to the most recent version of the page.
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
The MultiPage stores versions of the same logical page in a deque.
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
void freePage(const Page &page)
bool isMissingPages() const
static constexpr size_t headerBufferOffset_
HOST DEVICE void set_type(SQLTypes t)