36 namespace File_Namespace {
37 size_t FileBuffer::headerBufferOffset_ = 32;
40 const size_t pageSize,
42 const size_t initialSize)
47 , chunkKey_(chunkKey) {
68 const size_t pageSize,
71 const size_t initialSize)
76 , chunkKey_(chunkKey) {
84 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
85 const std::vector<HeaderInfo>::const_iterator& headerEndIt)
90 , chunkKey_(chunkKey) {
95 int32_t lastPageId = -1;
96 int32_t curPageId = 0;
98 for (
auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
99 curPageId = vecIt->pageId;
102 if (curPageId == -1) {
105 if (curPageId != lastPageId) {
107 if (curPageId != lastPageId + 1) {
109 <<
" Current page " << curPageId <<
" last page " << lastPageId
110 <<
" epoch " << vecIt->versionEpoch;
112 if (lastPageId == -1) {
120 lastPageId = curPageId;
122 multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
125 if (curPageId == -1) {
141 for (
size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
177 for (
auto pageIt = multiPageIt->pageVersions.begin();
178 pageIt != multiPageIt->pageVersions.end();
184 return num_pages_freed;
193 const int32_t targetEpoch,
194 const int32_t currentEpoch) {
195 std::vector<EpochedPage> epochedPagesToFree =
197 for (
const auto& epochedPageToFree : epochedPagesToFree) {
198 freePage(epochedPageToFree.page,
true );
206 const int32_t currentEpoch =
fm_->
epoch();
207 CHECK_LE(targetEpoch, currentEpoch);
215 size_t max_historical_buffer_size{0};
219 buffer.readMetadata(epoch_page.page);
220 max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
224 if (max_historical_buffer_size == 0) {
246 size_t totalBytesRead = 0;
250 for (
size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
259 size_t bytesRead = 0;
261 bytesRead = fileInfo->
read(
268 bytesRead = fileInfo->
read(
274 bytesLeft -= bytesRead;
275 totalBytesRead += bytesRead;
277 CHECK(bytesLeft == 0);
279 return (totalBytesRead);
283 const size_t numBytes,
286 const int32_t deviceId) {
288 LOG(
FATAL) <<
"Unsupported Buffer type";
294 size_t numPagesToRead =
308 size_t numPagesPerThread = 0;
309 size_t numBytesCurrent = numBytes;
310 size_t bytesRead = 0;
311 size_t bytesLeftForThread = 0;
312 size_t numExtraPages = 0;
314 std::vector<readThreadDS>
317 if (numPagesToRead > numThreads) {
318 numPagesPerThread = numPagesToRead / numThreads;
319 numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
321 numThreads = numPagesToRead;
322 numPagesPerThread = 1;
329 if (numExtraPages > 0) {
345 if (numThreads == 1) {
348 std::vector<std::future<size_t>> threads;
350 for (
size_t i = 0;
i < numThreads;
i++) {
351 threadDSArr.push_back(threadDS);
353 std::async(std::launch::async,
readForThread,
this, threadDSArr[
i]));
358 threadDS.
t_curPtr += bytesLeftForThread;
362 if (numExtraPages > 0) {
368 numBytesCurrent -= bytesLeftForThread;
369 bytesLeftForThread = min(
375 for (
auto& p : threads) {
378 for (
auto& p : threads) {
379 bytesRead += p.get();
382 CHECK(bytesRead == numBytes);
387 const size_t numBytes,
388 const size_t offset) {
395 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
396 size_t bytesRead = srcFileInfo->
read(
398 CHECK(bytesRead == numBytes);
399 size_t bytesWritten = destFileInfo->
write(
401 CHECK(bytesWritten == numBytes);
408 multiPage.
push(page, epoch);
414 const int32_t pageId,
416 const bool writeMetadata) {
417 int32_t intHeaderSize =
chunkKey_.size() + 3;
418 vector<int32_t> header(intHeaderSize);
421 (intHeaderSize - 1) *
sizeof(int32_t);
424 header[intHeaderSize - 2] = pageId;
425 header[intHeaderSize - 1] = epoch;
429 page.
pageNum * pageSize, (intHeaderSize) *
sizeof(int32_t), (int8_t*)&header[0]);
435 fread((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
436 fread((int8_t*)&
size_,
sizeof(
size_t), 1, f);
437 vector<int32_t> typeData(
440 fread((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
443 bool has_encoder =
static_cast<bool>(typeData[1]);
465 fwrite((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
466 fwrite((int8_t*)&
size_,
sizeof(
size_t), 1, f);
467 vector<int32_t> typeData(
471 typeData[1] =
static_cast<int32_t
>(
hasEncoder());
482 fwrite((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
490 const size_t numBytes,
492 const int32_t deviceId) {
497 size_t numPagesToWrite =
499 size_t bytesLeft = numBytes;
500 int8_t* curPtr = src;
504 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
506 if (pageNum >= initialNumPages) {
517 if (pageNum == startPage) {
518 bytesWritten = fileInfo->
write(
527 curPtr += bytesWritten;
528 bytesLeft -= bytesWritten;
530 CHECK(bytesLeft == 0);
534 const size_t numBytes,
537 const int32_t deviceId) {
539 LOG(
FATAL) <<
"Unsupported Buffer type";
542 bool tempIsAppended =
false;
544 if (offset <
size_) {
547 if (offset + numBytes >
size_) {
548 tempIsAppended =
true;
551 size_ = offset + numBytes;
556 size_t numPagesToWrite =
558 size_t bytesLeft = numBytes;
559 int8_t* curPtr = src;
565 for (
size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
570 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
572 if (pageNum >= initialNumPages) {
582 if (pageNum == startPage && startPageOffset > 0) {
585 copyPage(lastPage, page, startPageOffset, 0);
587 if (pageNum == (startPage + numPagesToWrite - 1) &&
604 if (pageNum == startPage) {
605 bytesWritten = fileInfo->
write(
614 curPtr += bytesWritten;
615 bytesLeft -= bytesWritten;
616 if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) {
622 CHECK(bytesLeft == 0);
626 std::stringstream ss;
628 ss <<
"has_encoder = " << (
hasEncoder() ?
"true\n" :
"false\n");
629 ss <<
"size_ = " <<
size_ <<
"\n";
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
#define METADATA_PAGE_SIZE
std::vector< int > ChunkKey
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
void freePage(const Page &page, const bool isRolloff)
HOST DEVICE int get_size() const
size_t reservedHeaderSize_
Page requestFreePage(size_t pagesize, const bool isMetadata)
void freePagesBeforeEpoch(const int32_t targetEpoch)
static size_t headerBufferOffset_
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
A logical page (Page) belongs to a file on disk.
void pop()
Purges the oldest Page.
Page addNewMultiPage(const int32_t epoch)
HOST DEVICE int get_scale() const
void writeMetadata(const int32_t epoch)
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
HOST DEVICE void set_subtype(SQLTypes st)
std::vector< MultiPage > multiPages_
std::vector< MultiPage > multiPages
HOST DEVICE SQLTypes get_type() const
Represents/provides access to contiguous data stored in the file system.
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
size_t write(const size_t offset, const size_t size, int8_t *buf)
std::deque< EpochedPage > pageVersions
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void freePage(int32_t pageId, const bool isRolloff)
void set_comp_param(int p)
FileInfo * getFileInfoForFileId(const int32_t fileId)
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
virtual size_t reservedHeaderSize() const
HOST DEVICE EncodingType get_compression() const
void set_dimension(int d)
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
~FileBuffer() override
Destructor.
size_t read(const size_t offset, const size_t size, int8_t *buf)
HOST DEVICE int get_dimension() const
std::unique_ptr< Encoder > encoder_
int32_t epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
HOST DEVICE int get_comp_param() const
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
void reserve(const size_t numBytes) override
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
HOST DEVICE bool get_notnull() const
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void readMetadata(const Page &page)
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
EpochedPage current() const
Returns a reference to the most recent version of the page.
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
The MultiPage stores versions of the same logical page in a deque.
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
HOST DEVICE void set_type(SQLTypes t)