19 #include <boost/filesystem.hpp>
26 #ifdef ENABLE_IMPORT_PARQUET
31 namespace foreign_storage {
46 CHECK_GT(required_buffers.size(), 0U) <<
"Must populate at least one buffer";
50 chunk_size_validator.validateChunkSizes(required_buffers);
51 chunk_size_validator.validateChunkSizes(optional_buffers);
54 }
catch (
const std::runtime_error& error) {
58 for (
const auto& [chunk_key, buffer] : required_buffers) {
59 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
60 file_buffer->freeChunkPages();
63 for (
const auto& [chunk_key, buffer] : optional_buffers) {
64 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
65 file_buffer->freeChunkPages();
84 const size_t num_bytes) {
91 CHECK(destination_buffer);
97 buffer->copyTo(destination_buffer, num_bytes);
112 chunk_key, column_keys, data_wrapper.getCachedParallelismLevel());
116 for (
auto it = optional_set.begin(); it != optional_set.end();) {
118 it = optional_set.erase(it);
126 CHECK(required_buffers.find(chunk_key) != required_buffers.end());
131 buffer->
copyTo(destination_buffer, num_bytes);
152 for (
auto& [key, meta] : chunk_metadata) {
196 auto doc =
getDataWrapper(chunk_key_prefix)->getSerializedDataWrapper();
205 const bool evict_cached_entries) {
209 if (evict_cached_entries) {
224 std::vector<ChunkKey> old_chunk_keys =
229 for (
auto& key : old_chunk_keys) {
261 int last_frag_id = 0;
265 for (
const auto& [key, metadata] : cached_metadata) {
274 const std::vector<ChunkKey>& old_chunk_keys) {
283 }
catch (std::runtime_error& e) {
290 const std::vector<ChunkKey>& old_chunk_keys) {
299 }
catch (std::runtime_error& e) {
305 const std::vector<ChunkKey>& old_chunk_keys,
307 int64_t total_time{0};
308 auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
310 if (old_chunk_keys.empty()) {
316 std::set<ChunkKey> chunk_keys_to_be_cached;
319 std::set<ChunkKey> chunk_keys_in_fragment;
320 for (
const auto& chunk_key : old_chunk_keys) {
327 if (chunk_keys_in_fragment.size() > 0) {
328 auto required_buffers =
332 chunk_keys_in_fragment.clear();
337 auto current_time = std::chrono::high_resolution_clock::now();
338 total_time += std::chrono::duration_cast<std::chrono::seconds>(
339 current_time - fragment_refresh_start_time)
342 LOG(
WARNING) <<
"Refresh time exceeded for table key: { " << table_key[0]
343 <<
", " << table_key[1] <<
" } after fragment id: " << fragment_id;
346 fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
359 chunk_keys_in_fragment.emplace(index_chunk_key);
360 chunk_keys_to_be_cached.emplace(index_chunk_key);
362 chunk_keys_in_fragment.emplace(chunk_key);
363 chunk_keys_to_be_cached.emplace(chunk_key);
367 if (chunk_keys_in_fragment.size() > 0) {
368 auto required_buffers =
385 if (boost::filesystem::exists(wrapper_file)) {
409 size_t total_size = 0U;
410 for (
const auto& key : key_set) {
420 std::set<ChunkKey> optional_keys;
425 for (
const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
426 for (
const auto& key : keys) {
428 for (
const auto& column_key : column_keys) {
432 if (total_chunk_size > max_size) {
433 return optional_keys;
435 for (
const auto& column_key : column_keys) {
436 optional_keys.emplace(column_key);
440 return optional_keys;
444 size_t num_bytes = 0;
448 auto metadata = meta.begin()->second;
454 num_bytes = (metadata->sqlType.is_string())
459 num_bytes = metadata->numBytes;
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< int > ChunkKey
bool isMetadataCached(const ChunkKey &) const
size_t getBufferSize(const ChunkKey &key) const
bool is_table_key(const ChunkKey &key)
bool is_varlen_data_key(const ChunkKey &key)
bool hasStoredDataWrapper(int32_t db, int32_t tb) const
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
size_t getMaxChunkDataSize() const
void clearForTablePrefix(const ChunkKey &)
void storeDataWrapper(const std::string &doc, int32_t db_id, int32_t tb_id)
#define CHUNK_KEY_FRAGMENT_IDX
CachingForeignStorageMgr(ForeignStorageCache *cache)
size_t maxFetchSize(int32_t db_id) const override
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
bool is_append_table_chunk_key(const ChunkKey &chunk_key)
bool hasMaxFetchSize() const override
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
void removeTableRelatedDS(const int db_id, const int table_id) override
void refreshTableInCache(const ChunkKey &table_key)
ChunkKey get_table_key(const ChunkKey &key)
ForeignStorageCache * disk_cache_
std::string show_chunk(const ChunkKey &key)
std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const override
bool is_in_memory_system_table
bool is_table_enabled_on_node(const ChunkKey &key)
void eraseDataWrapper(const ChunkKey &key) override
bool is_in_memory_system_table_chunk_key(const ChunkKey &chunk_key)
void getChunkMetadataVecFromDataWrapper(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix)
void clearTable(const ChunkKey &table_key)
This file contains the class specification and related data structures for Catalog.
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
bool hasStoredDataWrapperMetadata(int32_t db_id, int32_t table_id) const
void updateFragmenterMetadata(const ChunkToBufferMap &) const
constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS
std::set< ChunkKey > getOptionalChunkKeySet(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key) override
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
#define CHUNK_KEY_TABLE_IDX
bool has_table_prefix(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
void populateChunkBuffersSafely(ForeignDataWrapper &data_wrapper, ChunkToBufferMap &required_buffers, ChunkToBufferMap &optional_buffers)
ChunkToBufferMap getChunkBuffersForCaching(const std::set< ChunkKey > &chunk_keys) const
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::string getSerializedWrapperPath(int32_t db_id, int32_t tb_id) const
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
void cacheMetadataVec(const ChunkMetadataVector &)
const foreign_storage::ForeignTable & get_foreign_table_for_key(const ChunkKey &key)
size_t get_max_chunk_size(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void checkpoint(const int32_t db_id, const int32_t tb_id)
void createDataWrapperUnlocked(int32_t db, int32_t tb)
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
#define CHUNK_KEY_COLUMN_IDX
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
void removeTableRelatedDS(const int db_id, const int table_id) override
size_t getRequiredBuffersSize(const ChunkKey &chunk_key) const
ChunkKey get_fragment_key(const ChunkKey &key)
std::shared_mutex data_wrapper_mutex_
bool is_varlen_key(const ChunkKey &key)
int getHighestCachedFragId(const ChunkKey &table_key)