24 namespace foreign_storage {
41 }
catch (
const std::runtime_error& error) {
45 for (
const auto& [chunk_key, buffer] : required_buffers) {
46 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
47 file_buffer->freeChunkPages();
50 for (
const auto& [chunk_key, buffer] : optional_buffers) {
51 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
52 file_buffer->freeChunkPages();
62 const size_t num_bytes) {
63 CHECK(destination_buffer);
70 std::vector<ChunkKey> optional_keys;
75 std::set<ChunkKey> optional_set;
79 data_wrapper.getCachedParallelismLevel());
80 for (
const auto& key : optional_set) {
82 optional_keys.emplace_back(key);
86 if (optional_keys.size()) {
91 CHECK(required_buffers.find(chunk_key) != required_buffers.end());
94 if (optional_keys.size()) {
101 buffer->
copyTo(destination_buffer, num_bytes);
123 const bool evict_cached_entries) {
142 std::vector<ChunkKey> old_chunk_keys =
156 int last_frag_id = 0;
160 for (
const auto& [key, metadata] : cached_metadata) {
169 const std::vector<ChunkKey>& old_chunk_keys) {
179 }
catch (std::runtime_error& e) {
186 const std::vector<ChunkKey>& old_chunk_keys) {
195 }
catch (std::runtime_error& e) {
201 const std::vector<ChunkKey>& old_chunk_keys,
203 int64_t total_time{0};
204 auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
206 if (old_chunk_keys.empty()) {
212 std::vector<ChunkKey> chunk_keys_to_be_cached;
215 std::vector<ChunkKey> chunk_keys_in_fragment;
216 for (
const auto& chunk_key : old_chunk_keys) {
222 if (chunk_keys_in_fragment.size() > 0) {
223 auto required_buffers =
227 chunk_keys_in_fragment.clear();
232 auto current_time = std::chrono::high_resolution_clock::now();
233 total_time += std::chrono::duration_cast<std::chrono::seconds>(
234 current_time - fragment_refresh_start_time)
237 LOG(
WARNING) <<
"Refresh time exceeded for table key: { " << table_key[0]
238 <<
", " << table_key[1] <<
" } after fragment id: " << fragment_id;
241 fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
254 chunk_keys_in_fragment.emplace_back(index_chunk_key);
255 chunk_keys_to_be_cached.emplace_back(index_chunk_key);
257 chunk_keys_in_fragment.emplace_back(chunk_key);
258 chunk_keys_to_be_cached.emplace_back(chunk_key);
262 if (chunk_keys_in_fragment.size() > 0) {
263 auto required_buffers =
268 if (chunk_keys_to_be_cached.size() > 0) {
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< int > ChunkKey
bool isMetadataCached(const ChunkKey &) const
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool is_table_key(const ChunkKey &key)
bool is_varlen_data_key(const ChunkKey &key)
void clearForTablePrefix(const ChunkKey &)
#define CHUNK_KEY_FRAGMENT_IDX
CachingForeignStorageMgr(ForeignStorageCache *cache)
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
void refreshTableInCache(const ChunkKey &table_key)
ChunkKey get_table_key(const ChunkKey &key)
ForeignStorageCache * disk_cache_
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
static SysCatalog & instance()
void createOrRecoverDataWrapperIfNotExists(const ChunkKey &chunk_key)
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
void recoverDataWrapperFromDisk(const ChunkKey &table_key, const ChunkMetadataVector &chunk_metadata)
const std::string wrapper_file_name
#define CHUNK_KEY_TABLE_IDX
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
void populateChunkBuffersSafely(ForeignDataWrapper &data_wrapper, ChunkToBufferMap &required_buffers, ChunkToBufferMap &optional_buffers)
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
void cacheMetadataVec(const ChunkMetadataVector &)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::string getCacheDirectoryForTable(int db_id, int tb_id) const
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers)=0
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
ChunkToBufferMap getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
#define CHUNK_KEY_COLUMN_IDX
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
bool is_varlen_key(const ChunkKey &key)
int getHighestCachedFragId(const ChunkKey &table_key)