38 const size_t num_bytes) {
39 CHECK(destination_buffer);
46 std::vector<ChunkKey> optional_chunk_keys;
47 std::map<ChunkKey, AbstractBuffer*> optional_buffers;
54 if (foreign_table->foreign_server->data_wrapper_type ==
58 std::set<ChunkKey> optional_chunk_key_set;
61 for (
const auto& key : optional_chunk_key_set) {
63 optional_chunk_keys.emplace_back(key);
67 if (optional_chunk_keys.size()) {
72 std::map<ChunkKey, AbstractBuffer*> required_buffers =
74 CHECK(required_buffers.find(chunk_key) != required_buffers.end());
75 getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
77 if (optional_chunk_keys.size()) {
84 buffer->
copyTo(destination_buffer, num_bytes);
104 const bool evict_cached_entries) {
123 std::vector<ChunkKey> old_chunk_keys =
137 int last_frag_id = 0;
141 for (
const auto& [key, metadata] : cached_metadata) {
150 const std::vector<ChunkKey>& old_chunk_keys) {
160 }
catch (std::runtime_error& e) {
167 const std::vector<ChunkKey>& old_chunk_keys) {
176 }
catch (std::runtime_error& e) {
182 const std::vector<ChunkKey>& old_chunk_keys,
184 int64_t total_time{0};
185 auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
187 if (old_chunk_keys.empty()) {
192 std::map<ChunkKey, AbstractBuffer*> optional_buffers;
193 std::vector<ChunkKey> chunk_keys_to_be_cached;
196 std::vector<ChunkKey> chunk_keys_in_fragment;
197 for (
const auto& chunk_key : old_chunk_keys) {
203 if (chunk_keys_in_fragment.size() > 0) {
204 auto required_buffers =
208 chunk_keys_in_fragment.clear();
213 auto current_time = std::chrono::high_resolution_clock::now();
214 total_time += std::chrono::duration_cast<std::chrono::seconds>(
215 current_time - fragment_refresh_start_time)
218 LOG(
WARNING) <<
"Refresh time exceeded for table key: { " << table_key[0]
219 <<
", " << table_key[1] <<
" } after fragment id: " << fragment_id;
222 fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
235 chunk_keys_in_fragment.emplace_back(index_chunk_key);
236 chunk_keys_to_be_cached.emplace_back(index_chunk_key);
238 chunk_keys_in_fragment.emplace_back(chunk_key);
239 chunk_keys_to_be_cached.emplace_back(chunk_key);
243 if (chunk_keys_in_fragment.size() > 0) {
244 auto required_buffers =
246 getDataWrapper(table_key)->populateChunkBuffers(required_buffers, optional_buffers);
248 if (chunk_keys_to_be_cached.size() > 0) {
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void clearForTablePrefix(const ChunkKey &)
#define CHUNK_KEY_TABLE_IDX
CachingForeignStorageMgr(ForeignStorageCache *cache)
bool is_table_key(const ChunkKey &key)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
const std::string wrapper_file_name
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
void refreshTableInCache(const ChunkKey &table_key)
ForeignStorageCache * disk_cache_
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::string getCacheDirectoryForTablePrefix(const ChunkKey &) const
static SysCatalog & instance()
void createOrRecoverDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
void recoverDataWrapperFromDisk(const ChunkKey &table_key, const ChunkMetadataVector &chunk_metadata)
bool is_varlen_key(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
ChunkKey get_table_key(const ChunkKey &key)
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
void cacheMetadataVec(const ChunkMetadataVector &)
#define CHUNK_KEY_FRAGMENT_IDX
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool isMetadataCached(const ChunkKey &) const
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
bool is_varlen_data_key(const ChunkKey &key)
#define CHUNK_KEY_COLUMN_IDX
static constexpr char const * CSV
std::vector< int > ChunkKey
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
int getHighestCachedFragId(const ChunkKey &table_key)