35 namespace File_Namespace {
38 const size_t pageSize,
40 const size_t initialSize)
43 , metadataPageSize_(fm_->getMetadataPageSize())
44 , metadataPages_(metadataPageSize_)
46 , chunkKey_(chunkKey) {
68 const size_t pageSize,
71 const size_t initialSize)
74 , metadataPageSize_(fm->getMetadataPageSize())
75 , metadataPages_(metadataPageSize_)
77 , chunkKey_(chunkKey) {
85 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
86 const std::vector<HeaderInfo>::const_iterator& headerEndIt)
89 , metadataPageSize_(fm->getMetadataPageSize())
90 , metadataPages_(metadataPageSize_)
92 , chunkKey_(chunkKey) {
97 int32_t lastPageId = -1;
98 int32_t curPageId = 0;
99 for (
auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100 curPageId = vecIt->pageId;
103 if (curPageId == -1) {
106 if (curPageId != lastPageId) {
109 if (curPageId != lastPageId + 1) {
111 <<
" Current page " << curPageId <<
" last page " << lastPageId
112 <<
" epoch " << vecIt->versionEpoch;
115 if (lastPageId == -1) {
120 lastPageId = curPageId;
122 multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
125 if (curPageId == -1) {
139 for (
size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
175 return num_pages_freed;
182 for (
auto pageIt = multiPageIt->pageVersions.begin();
183 pageIt != multiPageIt->pageVersions.end();
189 return num_pages_freed;
197 const int32_t targetEpoch,
198 const int32_t currentEpoch) {
199 std::vector<EpochedPage> epochedPagesToFree =
201 for (
const auto& epochedPageToFree : epochedPagesToFree) {
202 freePage(epochedPageToFree.page,
true );
211 CHECK_LE(targetEpoch, currentEpoch);
219 size_t max_historical_buffer_size{0};
223 buffer.readMetadata(epoch_page.page);
224 max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
228 if (max_historical_buffer_size == 0) {
250 size_t totalBytesRead = 0;
254 for (
size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
263 size_t bytesRead = 0;
265 bytesRead = fileInfo->
read(
272 bytesRead = fileInfo->
read(
278 bytesLeft -= bytesRead;
279 totalBytesRead += bytesRead;
281 CHECK(bytesLeft == 0);
283 return (totalBytesRead);
287 const size_t numBytes,
290 const int32_t deviceId) {
292 LOG(
FATAL) <<
"Unsupported Buffer type";
298 size_t numPagesToRead =
311 <<
"Requested page out of bounds";
313 size_t numPagesPerThread = 0;
314 size_t numBytesCurrent = numBytes;
315 size_t bytesRead = 0;
316 size_t bytesLeftForThread = 0;
317 size_t numExtraPages = 0;
319 std::vector<readThreadDS>
322 if (numPagesToRead > numThreads) {
323 numPagesPerThread = numPagesToRead / numThreads;
324 numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
326 numThreads = numPagesToRead;
327 numPagesPerThread = 1;
334 if (numExtraPages > 0) {
350 if (numThreads == 1) {
353 std::vector<std::future<size_t>> threads;
355 for (
size_t i = 0; i < numThreads; i++) {
356 threadDSArr.push_back(threadDS);
363 threadDS.
t_curPtr += bytesLeftForThread;
367 if (numExtraPages > 0) {
373 numBytesCurrent -= bytesLeftForThread;
374 bytesLeftForThread = min(
380 for (
auto& p : threads) {
383 for (
auto& p : threads) {
384 bytesRead += p.get();
387 CHECK(bytesRead == numBytes);
392 const size_t numBytes,
393 const size_t offset) {
398 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
399 ScopeGuard guard = [&buffer] { free(buffer); };
400 size_t bytesRead = srcFileInfo->
read(
402 CHECK(bytesRead == numBytes);
403 size_t bytesWritten = destFileInfo->
write(
405 CHECK(bytesWritten == numBytes);
411 multiPage.
push(page, epoch);
417 const int32_t pageId,
419 const bool writeMetadata) {
420 int32_t intHeaderSize =
chunkKey_.size() + 3;
421 vector<int32_t>
header(intHeaderSize);
424 (intHeaderSize - 1) *
sizeof(int32_t);
427 header[intHeaderSize - 2] = pageId;
428 header[intHeaderSize - 1] = epoch;
432 page.
pageNum * pageSize, (intHeaderSize) *
sizeof(int32_t), (int8_t*)&header[0]);
438 fread((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
439 fread((int8_t*)&
size_,
sizeof(
size_t), 1, f);
440 vector<int32_t> typeData(
443 fread((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
446 bool has_encoder =
static_cast<bool>(typeData[1]);
468 fwrite((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
469 fwrite((int8_t*)&
size_,
sizeof(
size_t), 1, f);
470 vector<int32_t> typeData(
474 typeData[1] =
static_cast<int32_t
>(
hasEncoder());
485 fwrite((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
493 const size_t numBytes,
495 const int32_t deviceId) {
500 size_t numPagesToWrite =
502 size_t bytesLeft = numBytes;
503 int8_t* curPtr = src;
507 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
509 if (pageNum >= initialNumPages) {
520 if (pageNum == startPage) {
521 bytesWritten = fileInfo->
write(
530 curPtr += bytesWritten;
531 bytesLeft -= bytesWritten;
533 CHECK(bytesLeft == 0);
537 const size_t numBytes,
540 const int32_t deviceId) {
543 bool tempIsAppended =
false;
545 if (offset <
size_) {
548 if (offset + numBytes >
size_) {
549 tempIsAppended =
true;
552 size_ = offset + numBytes;
557 size_t numPagesToWrite =
559 size_t bytesLeft = numBytes;
560 int8_t* curPtr = src;
566 for (
size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
571 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
573 if (pageNum >= initialNumPages) {
583 if (pageNum == startPage && startPageOffset > 0) {
586 copyPage(lastPage, page, startPageOffset, 0);
588 if (pageNum == (startPage + numPagesToWrite - 1) &&
605 if (pageNum == startPage) {
606 bytesWritten = fileInfo->
write(
615 curPtr += bytesWritten;
616 bytesLeft -= bytesWritten;
617 if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) {
623 CHECK(bytesLeft == 0);
632 std::stringstream ss;
634 ss <<
"has_encoder = " << (
hasEncoder() ?
"true\n" :
"false\n");
635 ss <<
"size_ = " <<
size_ <<
"\n";
652 size_t total_size = 0;
654 total_size += multi_page.pageVersions.size();
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr's epoch instead of finding a table-specific epoch.
std::vector< int > ChunkKey
size_t write(const size_t offset, const size_t size, const int8_t *buf)
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
HOST DEVICE int get_size() const
size_t reservedHeaderSize_
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
void freePagesBeforeEpoch(const int32_t targetEpoch)
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
A logical page (Page) belongs to a file on disk.
void pop()
Purges the oldest Page.
size_t numChunkPages() const
Page addNewMultiPage(const int32_t epoch)
HOST DEVICE int get_scale() const
void writeMetadata(const int32_t epoch)
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
HOST DEVICE void set_subtype(SQLTypes st)
const size_t metadataPageSize_
std::vector< MultiPage > multiPages_
std::vector< MultiPage > multiPages
HOST DEVICE SQLTypes get_type() const
Represents/provides access to contiguous data stored in the file system.
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
void initMetadataAndPageDataSize()
std::deque< EpochedPage > pageVersions
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void set_comp_param(int p)
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
virtual size_t reservedHeaderSize() const
HOST DEVICE EncodingType get_compression() const
void set_dimension(int d)
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
~FileBuffer() override
Destructor.
size_t read(const size_t offset, const size_t size, int8_t *buf)
HOST DEVICE int get_dimension() const
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::unique_ptr< Encoder > encoder_
size_t freeMetadataPages()
HOST DEVICE int get_comp_param() const
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void reserve(const size_t numBytes) override
FileInfo * getFileInfoForFileId(const int32_t fileId) const
int32_t getFileMgrEpoch()
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
HOST DEVICE bool get_notnull() const
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void readMetadata(const Page &page)
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
EpochedPage current() const
Returns a reference to the most recent version of the page.
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
The MultiPage stores versions of the same logical page in a deque.
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
void freePage(const Page &page)
bool isMissingPages() const
static constexpr size_t headerBufferOffset_
HOST DEVICE void set_type(SQLTypes t)