25 std::lock_guard<std::mutex>& lock,
26 std::optional<HashtableCacheMetaInfo> meta_info)
const {
33 CHECK(hashtable_cache);
34 auto candidate_ht_it = std::find_if(
35 hashtable_cache->begin(), hashtable_cache->end(), [&key](
const auto& cached_item) {
36 return cached_item.key == key;
38 if (candidate_ht_it != hashtable_cache->end()) {
40 CHECK(candidate_ht_it->meta_info && candidate_ht_it->meta_info->overlaps_meta_info);
41 CHECK(meta_info && meta_info->overlaps_meta_info);
43 *candidate_ht_it->meta_info->overlaps_meta_info,
44 *meta_info->overlaps_meta_info)) {
58 std::optional<HashtableCacheMetaInfo> meta_info) {
66 key, item_type, device_identifier, *hashtable_cache, lock);
68 bool can_return_cached_item =
false;
71 CHECK(candidate_ht->meta_info && candidate_ht->meta_info->overlaps_meta_info);
72 CHECK(meta_info && meta_info->overlaps_meta_info);
74 *candidate_ht->meta_info->overlaps_meta_info,
75 *meta_info->overlaps_meta_info)) {
76 can_return_cached_item =
true;
79 can_return_cached_item =
true;
81 if (can_return_cached_item) {
82 CHECK(!candidate_ht->isDirty());
83 candidate_ht->item_metric->incRefCount();
84 VLOG(1) <<
"[" << item_type <<
", "
86 <<
"] Recycle item in a cache (key: " << key <<
")";
87 return candidate_ht->cached_item;
94 std::shared_ptr<HashTable> item_ptr,
99 std::optional<HashtableCacheMetaInfo> meta_info) {
105 auto has_cached_ht =
hasItemInCache(key, item_type, device_identifier, lock, meta_info);
110 std::find_if(hashtable_cache->begin(),
111 hashtable_cache->end(),
112 [&key](
const auto& cached_item) {
return cached_item.key == key; });
113 bool found_candidate =
false;
114 if (candidate_it != hashtable_cache->end()) {
117 CHECK(candidate_it->meta_info && candidate_it->meta_info->overlaps_meta_info);
118 CHECK(meta_info && meta_info->overlaps_meta_info);
120 *candidate_it->meta_info->overlaps_meta_info,
121 *meta_info->overlaps_meta_info)) {
122 found_candidate =
true;
125 found_candidate =
true;
127 if (found_candidate && candidate_it->isDirty()) {
130 key, item_type, device_identifier, lock, candidate_it->meta_info);
131 has_cached_ht =
false;
136 if (!has_cached_ht) {
139 auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
148 auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
149 device_identifier, item_size);
153 auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
154 key, device_identifier, item_size, compute_time);
155 CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
157 VLOG(1) <<
"[" << item_type <<
", "
159 <<
"] Put item to cache (key: " << key <<
")";
161 hashtable_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
171 std::lock_guard<std::mutex>& lock,
172 std::optional<HashtableCacheMetaInfo> meta_info) {
179 auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
181 auto hashtable_size = cache_metric->getMemSize();
183 auto filter = [key](
auto const& item) {
return item.key == key; };
185 std::find_if(hashtable_container->cbegin(), hashtable_container->cend(), filter);
186 if (itr == hashtable_container->cend()) {
189 VLOG(1) <<
"[" << item_type <<
", "
191 <<
"] remove cached item from cache (key: " << key <<
")";
192 hashtable_container->erase(itr);
195 cache_metrics.removeCacheItemMetric(key, device_identifier);
197 cache_metrics.updateCurrentCacheSize(
205 size_t required_size,
206 std::lock_guard<std::mutex>& lock,
207 std::optional<HashtableCacheMetaInfo> meta_info) {
210 int elimination_target_offset = 0;
211 size_t removed_size = 0;
213 auto actual_space_to_free = metric_tracker.getTotalCacheSize() / 2;
214 if (!
g_is_test_env && required_size < actual_space_to_free) {
218 required_size = actual_space_to_free;
220 metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
221 auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
225 for (
auto& metric : cached_item_metrics) {
226 auto target_size = metric->getMemSize();
227 ++elimination_target_offset;
228 removed_size += target_size;
229 if (removed_size > required_size) {
236 metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
239 metric_tracker.updateCurrentCacheSize(
247 auto item_cache =
getItemCache().find(item_type)->second;
248 for (
auto& kv : *item_cache) {
249 if (!kv.second->empty()) {
250 VLOG(1) <<
"[" << item_type <<
", "
253 <<
"] clear cache (# items: " << kv.second->size() <<
")";
262 std::unordered_set<QueryPlanHash>& key_set,
270 for (
auto key : key_set) {
291 std::lock_guard<std::mutex>& lock) {
293 CHECK(hashtable_cache);
297 auto& key_set = key_set_it->second;
298 for (
auto key : key_set) {
309 std::ostringstream oss;
310 oss <<
"A current status of the Hashtable Recycler:\n";
312 oss <<
"\t" << item_type;
314 oss <<
"\n\t# cached hashtables:\n";
315 auto item_cache =
getItemCache().find(item_type)->second;
316 for (
auto& cache_container : *item_cache) {
319 <<
", # hashtables: " << cache_container.second->size() <<
"\n";
320 for (
auto& ht : *cache_container.second) {
321 oss <<
"\t\t\tHT] " << ht.item_metric->toString() <<
"\n";
324 oss <<
"\t" << metric_tracker.toString() <<
"\n";
335 for (
size_t i = 0; i < candidate.
bucket_sizes.size(); i++) {
340 auto threshold_check =
342 auto hashtable_size_check =
344 return threshold_check && hashtable_size_check;
348 std::vector<const Analyzer::ColumnVar*>& inner_cols,
349 std::vector<const Analyzer::ColumnVar*>& outer_cols,
350 Executor* executor) {
353 hashed_join_col_info,
354 executor->getQueryPlanDagCache().translateColVarsToInfoHash(inner_cols,
false));
356 hashed_join_col_info,
357 executor->getQueryPlanDagCache().translateColVarsToInfoHash(outer_cols,
false));
358 return hashed_join_col_info;
363 bool need_dict_translation,
364 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_info_pairs,
370 auto getNodeByTableId =
371 [&table_id_to_node_map](
373 auto it = table_id_to_node_map.find(table_key_param);
374 if (it != table_id_to_node_map.end()) {
379 bool found_sort_node =
false;
380 bool found_project_node =
false;
382 const auto origin_table_id = table_key.
table_id * -1;
383 const auto inner_node = getNodeByTableId({table_key.
db_id, origin_table_id});
392 auto sort_node =
dynamic_cast<const RelSort*
>(inner_node);
394 found_sort_node =
true;
396 auto project_node =
dynamic_cast<const RelProject*
>(inner_node);
398 found_project_node =
true;
402 return !(found_sort_node || (found_project_node && need_dict_translation));
406 const std::vector<QueryPlanHash>& cache_keys) {
407 return cache_keys.empty() ||
414 const std::vector<InnerOuter>& inner_outer_pairs,
415 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs,
421 const std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>>& frags_for_device,
422 Executor* executor) {
425 std::vector<const Analyzer::ColumnVar*> inner_cols_vec, outer_cols_vec;
427 for (
auto& join_col_pair : inner_outer_pairs) {
428 inner_cols_vec.push_back(join_col_pair.first);
433 boost::hash_combine(join_qual_info,
434 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
436 boost::hash_combine(join_qual_info, op_type);
437 boost::hash_combine(join_qual_info, join_type);
439 boost::hash_combine(join_qual_info, join_col_pair.first->get_type_info().toString());
441 outer_cols_vec.push_back(outer_col_var);
442 if (join_col_pair.first->get_type_info().is_dict_encoded_string()) {
444 boost::hash_combine(join_qual_info,
445 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
447 boost::hash_combine(join_qual_info, outer_col_var->get_type_info().toString());
452 if (inner_outer_string_op_infos_pairs.size()) {
453 boost::hash_combine(join_qual_info, ::
toString(inner_outer_string_op_infos_pairs));
458 auto it = hashtable_build_dag_map.find(join_cols_info);
459 if (it != hashtable_build_dag_map.end()) {
461 boost::hash_combine(hashtable_access_path, it->second.inner_cols_access_path);
462 boost::hash_combine(hashtable_access_path, join_qual_info);
463 if (inner_cols_vec.front()->get_type_info().is_dict_encoded_string()) {
464 boost::hash_combine(hashtable_access_path, it->second.outer_cols_access_path);
466 boost::hash_combine(hashtable_access_path, shard_count);
470 auto cache_key_for_device = hashtable_access_path;
472 boost::hash_combine(cache_key_for_device, frag_list);
473 for (
int i = 0; i < device_count; ++i) {
479 for (
int i = 0; i < device_count; ++i) {
480 const auto frag_list_for_device =
482 auto cache_key_for_device = hashtable_access_path;
483 boost::hash_combine(cache_key_for_device, frag_list_for_device);
487 access_path_info.
table_keys = it->second.inputTableKeys;
489 return access_path_info;
493 std::shared_ptr<HashTable>,
494 std::optional<HashtableCacheMetaInfo>>
500 for (
auto& ht : *hashtable_cache) {
501 if (!visited.count(ht.key)) {
502 return std::make_tuple(ht.key, ht.cached_item, ht.meta_info);
509 size_t hashed_query_plan_dag,
510 const std::unordered_set<size_t>& table_keys) {
512 for (
auto table_key : table_keys) {
514 itr->second.insert(hashed_query_plan_dag);
518 std::optional<std::unordered_set<size_t>>
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
std::mutex & getCacheLock() const
bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) const override
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
void putItemToCache(QueryPlanHash key, std::shared_ptr< HashTable > item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::optional< CachedItem< std::shared_ptr< HashTable >, HashtableCacheMetaInfo > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
static size_t getJoinColumnInfoHash(std::vector< const Analyzer::ColumnVar * > &inner_cols, std::vector< const Analyzer::ColumnVar * > &outer_cols, Executor *executor)
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys)
bool g_enable_data_recycler
void clearCacheMetricTracker()
void removeCachedHashtableBuiltFromSyntheticTable(CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock)
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
PerTypeCacheItemContainer const & getItemCache() const
std::unordered_set< size_t > table_keys
std::vector< QueryPlanHash > hashed_query_plan_dag
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
static QueryPlanHash getUnitaryTableKey()
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
std::string toString() const override
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
void clearCache() override
bool checkOverlapsHashtableBucketCompatability(const OverlapsHashTableMetaInfo &candidate_bucket_dim, const OverlapsHashTableMetaInfo &target_bucket_dim) const
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)
std::tuple< QueryPlanHash, std::shared_ptr< HashTable >, std::optional< HashtableCacheMetaInfo > > getCachedHashtableWithoutCacheKey(std::set< size_t > &visited, CacheItemType hash_table_type, DeviceIdentifier device_identifier)
virtual std::shared_ptr< HashTable > getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt)=0
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
bool g_use_hashtable_cache