25 std::lock_guard<std::mutex>& lock,
26 std::optional<HashtableCacheMetaInfo> meta_info)
const {
33 CHECK(hashtable_cache);
34 auto candidate_ht_it = std::find_if(
35 hashtable_cache->begin(), hashtable_cache->end(), [&key](
const auto& cached_item) {
36 return cached_item.key == key;
38 if (candidate_ht_it != hashtable_cache->end()) {
40 CHECK(candidate_ht_it->meta_info && candidate_ht_it->meta_info->overlaps_meta_info);
41 CHECK(meta_info && meta_info->overlaps_meta_info);
43 *candidate_ht_it->meta_info->overlaps_meta_info,
44 *meta_info->overlaps_meta_info)) {
58 std::optional<HashtableCacheMetaInfo> meta_info) {
66 key, item_type, device_identifier, *hashtable_cache, lock);
68 bool can_return_cached_item =
false;
71 CHECK(candidate_ht->meta_info && candidate_ht->meta_info->overlaps_meta_info);
72 CHECK(meta_info && meta_info->overlaps_meta_info);
74 *candidate_ht->meta_info->overlaps_meta_info,
75 *meta_info->overlaps_meta_info)) {
76 can_return_cached_item =
true;
79 can_return_cached_item =
true;
81 if (can_return_cached_item) {
82 CHECK(!candidate_ht->isDirty());
83 candidate_ht->item_metric->incRefCount();
84 VLOG(1) <<
"[" << item_type <<
", "
86 <<
"] Recycle item in a cache (key: " << key <<
")";
87 return candidate_ht->cached_item;
94 std::shared_ptr<HashTable> item_ptr,
99 std::optional<HashtableCacheMetaInfo> meta_info) {
105 auto has_cached_ht =
hasItemInCache(key, item_type, device_identifier, lock, meta_info);
110 std::find_if(hashtable_cache->begin(),
111 hashtable_cache->end(),
112 [&key](
const auto& cached_item) {
return cached_item.key == key; });
113 bool found_candidate =
false;
114 if (candidate_it != hashtable_cache->end()) {
117 CHECK(candidate_it->meta_info && candidate_it->meta_info->overlaps_meta_info);
118 CHECK(meta_info && meta_info->overlaps_meta_info);
120 *candidate_it->meta_info->overlaps_meta_info,
121 *meta_info->overlaps_meta_info)) {
122 found_candidate =
true;
125 found_candidate =
true;
127 if (found_candidate && candidate_it->isDirty()) {
130 key, item_type, device_identifier, lock, candidate_it->meta_info);
131 has_cached_ht =
false;
136 if (!has_cached_ht) {
139 auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
148 auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
149 device_identifier, item_size);
153 auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
154 key, device_identifier, item_size, compute_time);
155 CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
157 VLOG(1) <<
"[" << item_type <<
", "
159 <<
"] Put item to cache (key: " << key <<
")";
161 hashtable_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
171 std::lock_guard<std::mutex>& lock,
172 std::optional<HashtableCacheMetaInfo> meta_info) {
179 auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
181 auto hashtable_size = cache_metric->getMemSize();
183 auto filter = [key](
auto const& item) {
return item.key == key; };
185 std::find_if(hashtable_container->cbegin(), hashtable_container->cend(), filter);
186 if (itr == hashtable_container->cend()) {
189 VLOG(1) <<
"[" << item_type <<
", "
191 <<
"] remove cached item from cache (key: " << key <<
")";
192 hashtable_container->erase(itr);
195 cache_metrics.removeCacheItemMetric(key, device_identifier);
197 cache_metrics.updateCurrentCacheSize(
205 size_t required_size,
206 std::lock_guard<std::mutex>& lock,
207 std::optional<HashtableCacheMetaInfo> meta_info) {
210 int elimination_target_offset = 0;
211 size_t removed_size = 0;
213 auto actual_space_to_free = metric_tracker.getTotalCacheSize() / 2;
214 if (!
g_is_test_env && required_size < actual_space_to_free) {
218 required_size = actual_space_to_free;
220 metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
221 auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
225 for (
auto& metric : cached_item_metrics) {
226 auto target_size = metric->getMemSize();
227 ++elimination_target_offset;
228 removed_size += target_size;
229 if (removed_size > required_size) {
236 metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
239 metric_tracker.updateCurrentCacheSize(
247 auto item_cache =
getItemCache().find(item_type)->second;
248 for (
auto& kv : *item_cache) {
249 if (!kv.second->empty()) {
250 VLOG(1) <<
"[" << item_type <<
", "
253 <<
"] clear cache (# items: " << kv.second->size() <<
")";
262 std::unordered_set<QueryPlanHash>& key_set,
270 for (
auto key : key_set) {
291 std::lock_guard<std::mutex>& lock) {
293 CHECK(hashtable_cache);
297 auto& key_set = key_set_it->second;
298 for (
auto key : key_set) {
309 std::ostringstream oss;
310 oss <<
"A current status of the Hashtable Recycler:\n";
312 oss <<
"\t" << item_type;
314 oss <<
"\n\t# cached hashtables:\n";
315 auto item_cache =
getItemCache().find(item_type)->second;
316 for (
auto& cache_container : *item_cache) {
319 <<
", # hashtables: " << cache_container.second->size() <<
"\n";
320 for (
auto& ht : *cache_container.second) {
321 oss <<
"\t\t\tHT] " << ht.item_metric->toString() <<
"\n";
324 oss <<
"\t" << metric_tracker.toString() <<
"\n";
335 for (
size_t i = 0; i < candidate.
bucket_sizes.size(); i++) {
340 auto threshold_check =
342 auto hashtable_size_check =
344 return threshold_check && hashtable_size_check;
348 std::vector<const Analyzer::ColumnVar*>& inner_cols,
349 std::vector<const Analyzer::ColumnVar*>& outer_cols,
350 Executor* executor) {
353 hashed_join_col_info,
354 executor->getQueryPlanDagCache().translateColVarsToInfoHash(inner_cols,
false));
356 hashed_join_col_info,
357 executor->getQueryPlanDagCache().translateColVarsToInfoHash(outer_cols,
false));
358 return hashed_join_col_info;
363 bool need_dict_translation,
364 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_info_pairs,
365 const int table_id) {
370 auto getNodeByTableId =
371 [&table_id_to_node_map](
const int table_id) ->
const RelAlgNode* {
372 auto it = table_id_to_node_map.find(table_id);
373 if (it != table_id_to_node_map.end()) {
378 bool found_sort_node =
false;
379 bool found_project_node =
false;
381 auto origin_table_id = table_id * -1;
382 auto inner_node = getNodeByTableId(origin_table_id);
391 auto sort_node =
dynamic_cast<const RelSort*
>(inner_node);
393 found_sort_node =
true;
395 auto project_node =
dynamic_cast<const RelProject*
>(inner_node);
397 found_project_node =
true;
401 return !(found_sort_node || (found_project_node && need_dict_translation));
405 const std::vector<QueryPlanHash>& cache_keys) {
406 return cache_keys.empty() ||
407 std::any_of(cache_keys.cbegin(), cache_keys.cend(), [](
QueryPlanHash key) {
413 const std::vector<InnerOuter>& inner_outer_pairs,
414 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs,
420 const std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>>& frags_for_device,
421 Executor* executor) {
424 std::vector<const Analyzer::ColumnVar*> inner_cols_vec, outer_cols_vec;
426 for (
auto& join_col_pair : inner_outer_pairs) {
427 inner_cols_vec.push_back(join_col_pair.first);
432 boost::hash_combine(join_qual_info,
433 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
435 boost::hash_combine(join_qual_info, op_type);
436 boost::hash_combine(join_qual_info, join_type);
438 boost::hash_combine(join_qual_info, join_col_pair.first->get_type_info().toString());
440 outer_cols_vec.push_back(outer_col_var);
441 if (join_col_pair.first->get_type_info().is_dict_encoded_string()) {
443 boost::hash_combine(join_qual_info,
444 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
446 boost::hash_combine(join_qual_info, outer_col_var->get_type_info().toString());
451 if (inner_outer_string_op_infos_pairs.size()) {
452 boost::hash_combine(join_qual_info, ::
toString(inner_outer_string_op_infos_pairs));
457 auto it = hashtable_build_dag_map.find(join_cols_info);
458 if (it != hashtable_build_dag_map.end()) {
460 boost::hash_combine(hashtable_access_path, it->second.inner_cols_access_path);
461 boost::hash_combine(hashtable_access_path, join_qual_info);
462 if (inner_cols_vec.front()->get_type_info().is_dict_encoded_string()) {
463 boost::hash_combine(hashtable_access_path, it->second.outer_cols_access_path);
465 boost::hash_combine(hashtable_access_path, shard_count);
469 auto cache_key_for_device = hashtable_access_path;
471 boost::hash_combine(cache_key_for_device, frag_list);
472 for (
int i = 0; i < device_count; ++i) {
478 for (
int i = 0; i < device_count; ++i) {
479 const auto frag_list_for_device =
481 auto cache_key_for_device = hashtable_access_path;
482 boost::hash_combine(cache_key_for_device, frag_list_for_device);
486 access_path_info.
table_keys = it->second.inputTableKeys;
488 return access_path_info;
492 std::shared_ptr<HashTable>,
493 std::optional<HashtableCacheMetaInfo>>
499 for (
auto& ht : *hashtable_cache) {
500 if (!visited.count(ht.key)) {
501 return std::make_tuple(ht.key, ht.cached_item, ht.meta_info);
508 size_t hashed_query_plan_dag,
509 const std::unordered_set<size_t>& table_keys) {
511 for (
auto table_key : table_keys) {
513 itr->second.insert(hashed_query_plan_dag);
517 std::optional<std::unordered_set<size_t>>
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
std::mutex & getCacheLock() const
bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) const override
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
void putItemToCache(QueryPlanHash key, std::shared_ptr< HashTable > item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::optional< CachedItem< std::shared_ptr< HashTable >, HashtableCacheMetaInfo > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
static size_t getJoinColumnInfoHash(std::vector< const Analyzer::ColumnVar * > &inner_cols, std::vector< const Analyzer::ColumnVar * > &outer_cols, Executor *executor)
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys)
bool g_enable_data_recycler
void clearCacheMetricTracker()
void removeCachedHashtableBuiltFromSyntheticTable(CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock)
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
PerTypeCacheItemContainer const & getItemCache() const
std::unordered_set< size_t > table_keys
std::vector< QueryPlanHash > hashed_query_plan_dag
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
static QueryPlanHash getUnitaryTableKey()
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
std::string toString() const override
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
void clearCache() override
bool checkOverlapsHashtableBucketCompatability(const OverlapsHashTableMetaInfo &candidate_bucket_dim, const OverlapsHashTableMetaInfo &target_bucket_dim) const
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)
std::tuple< QueryPlanHash, std::shared_ptr< HashTable >, std::optional< HashtableCacheMetaInfo > > getCachedHashtableWithoutCacheKey(std::set< size_t > &visited, CacheItemType hash_table_type, DeviceIdentifier device_identifier)
virtual std::shared_ptr< HashTable > getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt)=0
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
bool g_use_hashtable_cache
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const int table_id)