OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashtableRecycler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashtableRecycler.h"
18 
19 extern bool g_is_test_env;
20 
22  QueryPlanHash key,
23  CacheItemType item_type,
24  DeviceIdentifier device_identifier,
25  std::lock_guard<std::mutex>& lock,
26  std::optional<HashtableCacheMetaInfo> meta_info) const {
29  return false;
30  }
31  auto hashtable_cache = getCachedItemContainer(item_type, device_identifier);
32  // hashtable cache of the *any* device type should be properly initialized
33  CHECK(hashtable_cache);
34  auto candidate_ht = getCachedItem(key, *hashtable_cache);
35  if (candidate_ht) {
36  if (item_type == OVERLAPS_HT) {
37  CHECK(candidate_ht->meta_info && candidate_ht->meta_info->overlaps_meta_info);
38  CHECK(meta_info && meta_info->overlaps_meta_info);
40  *candidate_ht->meta_info->overlaps_meta_info,
41  *meta_info->overlaps_meta_info)) {
42  return true;
43  }
44  } else {
45  return true;
46  }
47  }
48  return false;
49 }
50 
51 std::shared_ptr<HashTable> HashtableRecycler::getItemFromCache(
52  QueryPlanHash key,
53  CacheItemType item_type,
54  DeviceIdentifier device_identifier,
55  std::optional<HashtableCacheMetaInfo> meta_info) const {
58  return nullptr;
59  }
60  std::lock_guard<std::mutex> lock(getCacheLock());
61  auto hashtable_cache = getCachedItemContainer(item_type, device_identifier);
62  auto candidate_ht = getCachedItem(key, *hashtable_cache);
63  if (candidate_ht) {
64  candidate_ht->item_metric->incRefCount();
65  VLOG(1) << "[" << DataRecyclerUtil::toStringCacheItemType(item_type) << ", "
67  << "] Recycle item in a cache";
68  return candidate_ht->cached_item;
69  }
70  return nullptr;
71 }
72 
74  std::shared_ptr<HashTable> item_ptr,
75  CacheItemType item_type,
76  DeviceIdentifier device_identifier,
77  size_t item_size,
78  size_t compute_time,
79  std::optional<HashtableCacheMetaInfo> meta_info) {
82  return;
83  }
84  std::lock_guard<std::mutex> lock(getCacheLock());
85  if (!hasItemInCache(key, item_type, device_identifier, lock, meta_info)) {
86  // check cache's space availability
87  auto& metric_tracker = getMetricTracker(item_type);
88  auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
89  if (cache_status == CacheAvailability::UNAVAILABLE) {
90  // hashtable is too large
91  return;
92  } else if (cache_status == CacheAvailability::AVAILABLE_AFTER_CLEANUP) {
93  // we need to cleanup some cached hashtables to make a room to insert this hashtable
94  // here we try to cache the new one anyway since we don't know the importance of
95  // this hashtable yet and if it is not that frequently reused it is removed
96  // in a near future
97  auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
98  device_identifier, item_size);
99  cleanupCacheForInsertion(item_type, device_identifier, required_size, lock);
100  }
101  // put hashtable's metric to metric tracker
102  auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
103  key, device_identifier, item_size, compute_time);
104  CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
105  metric_tracker.updateCurrentCacheSize(
106  device_identifier, CacheUpdateAction::ADD, item_size);
107  // put hashtable to cache
108  VLOG(1) << "[" << DataRecyclerUtil::toStringCacheItemType(item_type) << ", "
109  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
110  << "] Put item to cache";
111  auto hashtable_cache = getCachedItemContainer(item_type, device_identifier);
112  hashtable_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
113  }
114  // this hashtable is already cached
115  return;
116 }
117 
119  QueryPlanHash key,
120  CacheItemType item_type,
121  DeviceIdentifier device_identifier,
122  std::lock_guard<std::mutex>& lock,
123  std::optional<HashtableCacheMetaInfo> meta_info) {
125  key == EMPTY_HASHED_PLAN_DAG_KEY) {
126  return;
127  }
128  auto& cache_metrics = getMetricTracker(item_type);
129  // remove cached item from the cache
130  auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
131  CHECK(cache_metric);
132  auto hashtable_size = cache_metric->getMemSize();
133  auto hashtable_container = getCachedItemContainer(item_type, device_identifier);
134  auto filter = [key](auto const& item) { return item.key == key; };
135  auto itr =
136  std::find_if(hashtable_container->cbegin(), hashtable_container->cend(), filter);
137  if (itr == hashtable_container->cend()) {
138  return;
139  } else {
140  hashtable_container->erase(itr);
141  }
142  // remove cache metric
143  cache_metrics.removeCacheItemMetric(key, device_identifier);
144  // update current cache size
145  cache_metrics.updateCurrentCacheSize(
146  device_identifier, CacheUpdateAction::REMOVE, hashtable_size);
147  return;
148 }
149 
151  CacheItemType item_type,
152  DeviceIdentifier device_identifier,
153  size_t required_size,
154  std::lock_guard<std::mutex>& lock,
155  std::optional<HashtableCacheMetaInfo> meta_info) {
156  // sort the vector based on the importance of the cached items (by # referenced, size
157  // and compute time) and then remove unimportant cached items
158  int elimination_target_offset = 0;
159  size_t removed_size = 0;
160  auto& metric_tracker = getMetricTracker(item_type);
161  auto actual_space_to_free = metric_tracker.getTotalCacheSize() / 2;
162  if (!g_is_test_env && required_size < actual_space_to_free) {
163  // remove enough items to avoid too frequent cache cleanup
164  // we do not apply thin to test code since test scenarios are designed to
165  // specific size of items and their caches
166  required_size = actual_space_to_free;
167  }
168  metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
169  auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
170  sortCacheContainerByQueryMetric(item_type, device_identifier);
171 
172  // collect targets to eliminate
173  for (auto& metric : cached_item_metrics) {
174  auto target_size = metric->getMemSize();
175  ++elimination_target_offset;
176  removed_size += target_size;
177  if (removed_size > required_size) {
178  break;
179  }
180  }
181 
182  // eliminate targets in 1) cache container and 2) their metrics
183  removeCachedItemFromBeginning(item_type, device_identifier, elimination_target_offset);
184  metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
185 
186  // update the current cache size after this cleanup
187  metric_tracker.updateCurrentCacheSize(
188  device_identifier, CacheUpdateAction::REMOVE, removed_size);
189 }
190 
192  std::lock_guard<std::mutex> lock(getCacheLock());
193  for (auto& item_type : getCacheItemType()) {
195  auto item_cache = getItemCache().find(item_type)->second;
196  for (auto& kv : *item_cache) {
197  kv.second->clear();
198  }
199  }
200 }
201 
202 std::string HashtableRecycler::toString() const {
203  std::ostringstream oss;
204  oss << "A current status of the Hashtable Recycler:\n";
205  for (auto& item_type : getCacheItemType()) {
206  oss << "\t" << DataRecyclerUtil::toStringCacheItemType(item_type);
207  auto& metric_tracker = getMetricTracker(item_type);
208  oss << "\n\t# cached hashtables:\n";
209  auto item_cache = getItemCache().find(item_type)->second;
210  for (auto& cache_container : *item_cache) {
211  oss << "\t\tDevice"
212  << DataRecyclerUtil::getDeviceIdentifierString(cache_container.first)
213  << ", # hashtables: " << cache_container.second->size() << "\n";
214  for (auto& ht : *cache_container.second) {
215  oss << "\t\t\tHT] " << ht.item_metric->toString() << "\n";
216  }
217  }
218  oss << "\t" << metric_tracker.toString() << "\n";
219  }
220  return oss.str();
221 }
222 
224  const OverlapsHashTableMetaInfo& candidate,
225  const OverlapsHashTableMetaInfo& target) const {
226  if (candidate.bucket_sizes.size() != target.bucket_sizes.size()) {
227  return false;
228  }
229  for (size_t i = 0; i < candidate.bucket_sizes.size(); i++) {
230  if (std::abs(target.bucket_sizes[i] - candidate.bucket_sizes[i]) > 1e-4) {
231  return false;
232  }
233  }
234  auto threshold_check =
236  auto hashtable_size_check =
238  return threshold_check && hashtable_size_check;
239 }
240 
242  std::vector<const Analyzer::ColumnVar*>& inner_cols,
243  std::vector<const Analyzer::ColumnVar*>& outer_cols,
244  Executor* executor) {
245  std::ostringstream oss;
246  oss << executor->getQueryPlanDagCache().translateColVarsToInfoString(inner_cols, false);
247  auto hash_table_cols_info = oss.str();
248  oss << "|";
249  oss << executor->getQueryPlanDagCache().translateColVarsToInfoString(outer_cols, false);
250  return oss.str();
251 }
252 
254  const TableIdToNodeMap& table_id_to_node_map,
255  bool need_dict_translation,
256  const int table_id) {
257  // if hashtable is built from subquery's resultset we need to check
258  // 1) whether resulset rows can have inconsistency, e.g., rows can randomly be
259  // permutated per execution and 2) whether it needs dictionary translation for hashtable
260  // building to recycle the hashtable safely
261  auto getNodeByTableId =
262  [&table_id_to_node_map](const int table_id) -> const RelAlgNode* {
263  auto it = table_id_to_node_map.find(table_id);
264  if (it != table_id_to_node_map.end()) {
265  return it->second;
266  }
267  return nullptr;
268  };
269  bool found_sort_node = false;
270  bool found_project_node = false;
271  if (table_id < 0) {
272  auto origin_table_id = table_id * -1;
273  auto inner_node = getNodeByTableId(origin_table_id);
274  if (!inner_node) {
275  // we have to keep the node info of temporary resultset
276  // so in this case we are not safe to recycle the hashtable
277  return false;
278  }
279  // it is not safe to recycle the hashtable when
280  // this resultset may have resultset ordering inconsistency and/or
281  // need dictionary translation for hashtable building
282  auto sort_node = dynamic_cast<const RelSort*>(inner_node);
283  if (sort_node) {
284  found_sort_node = true;
285  } else {
286  auto project_node = dynamic_cast<const RelProject*>(inner_node);
287  if (project_node) {
288  found_project_node = true;
289  }
290  }
291  }
292  return !(found_sort_node || (found_project_node && need_dict_translation));
293 }
294 
295 std::pair<QueryPlan, HashtableCacheMetaInfo> HashtableRecycler::getHashtableKeyString(
296  const std::vector<InnerOuter>& inner_outer_pairs,
297  const SQLOps op_type,
298  const JoinType join_type,
299  const HashTableBuildDagMap& hashtable_build_dag_map,
300  Executor* executor) {
301  std::vector<const Analyzer::ColumnVar*> inner_cols_vec, outer_cols_vec;
302  std::string inner_join_cols_info{""};
303  for (auto& join_col_pair : inner_outer_pairs) {
304  inner_cols_vec.push_back(join_col_pair.first);
305  // extract inner join col's id
306  // b/c when the inner col comes from a subquery's resulset,
307  // table id / rte_index can be different even if we have the same
308  // subquery's semantic, i.e., project col A from table T
309  inner_join_cols_info +=
310  concat(executor->getQueryPlanDagCache().getJoinColumnsInfoString(
311  join_col_pair.first, JoinColumnSide::kDirect, true),
312  "|",
313  ::toString(op_type),
314  "|",
315  ::toString(join_type),
316  "|");
317  auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(join_col_pair.second);
318  if (outer_col_var) {
319  outer_cols_vec.push_back(outer_col_var);
320  if (join_col_pair.first->get_type_info().is_dict_encoded_string()) {
321  // add comp param for dict encoded string
322  inner_join_cols_info += outer_col_var->get_type_info().get_comp_param();
323  }
324  }
325  }
326  auto join_cols_info = getJoinColumnInfoString(inner_cols_vec, outer_cols_vec, executor);
327  QueryPlan hashtable_access_path{EMPTY_QUERY_PLAN};
328  HashtableCacheMetaInfo meta_info;
329  auto it = hashtable_build_dag_map.find(join_cols_info);
330  if (it != hashtable_build_dag_map.end()) {
331  hashtable_access_path = it->second.second;
332  hashtable_access_path += inner_join_cols_info;
333  QueryPlanMetaInfo query_plan_meta_info;
334  query_plan_meta_info.query_plan_dag = it->second.second;
335  query_plan_meta_info.inner_col_info_string = inner_join_cols_info;
336  HashtableCacheMetaInfo meta_info;
337  meta_info.query_plan_meta_info = query_plan_meta_info;
338  VLOG(2) << "Find hashtable access path for the hashjoin qual: " << join_cols_info
339  << " -> " << hashtable_access_path;
340  }
341  return std::make_pair(hashtable_access_path, meta_info);
342 }
343 
344 std::pair<QueryPlanHash, HashtableCacheMetaInfo> HashtableRecycler::getHashtableCacheKey(
345  const std::vector<InnerOuter>& inner_outer_pairs,
346  const SQLOps op_type,
347  const JoinType join_type,
348  const HashTableBuildDagMap& hashtable_build_dag_map,
349  Executor* executor) {
350  auto hashtable_access_path = getHashtableKeyString(
351  inner_outer_pairs, op_type, join_type, hashtable_build_dag_map, executor);
352  return std::make_pair(boost::hash_value(hashtable_access_path.first),
353  hashtable_access_path.second);
354 }
355 
356 std::tuple<QueryPlanHash,
357  std::shared_ptr<HashTable>,
358  std::optional<HashtableCacheMetaInfo>>
360  CacheItemType hash_table_type,
361  DeviceIdentifier device_identifier) {
362  std::lock_guard<std::mutex> lock(getCacheLock());
363  auto hashtable_cache = getCachedItemContainer(hash_table_type, device_identifier);
364  for (auto& ht : *hashtable_cache) {
365  if (!visited.count(ht.key)) {
366  return std::make_tuple(ht.key, ht.cached_item, ht.meta_info);
367  }
368  }
369  return std::make_tuple(EMPTY_HASHED_PLAN_DAG_KEY, nullptr, std::nullopt);
370 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) const override
size_t DeviceIdentifier
Definition: DataRecycler.h:111
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:134
JoinType
Definition: sqldefs.h:108
void putItemToCache(QueryPlanHash key, std::shared_ptr< HashTable > item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
static std::string_view toStringCacheItemType(CacheItemType item_type)
Definition: DataRecycler.h:127
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:546
std::string inner_col_info_string
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const int table_id)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
static std::pair< QueryPlanHash, HashtableCacheMetaInfo > getHashtableCacheKey(const std::vector< InnerOuter > &inner_outer_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, Executor *executor)
SQLOps
Definition: sqldefs.h:29
std::string concat(Types &&...parms)
std::optional< QueryPlanMetaInfo > query_plan_meta_info
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:446
bool g_is_test_env
Definition: Execute.cpp:134
bool g_enable_data_recycler
Definition: Execute.cpp:139
static std::string getJoinColumnInfoString(std::vector< const Analyzer::ColumnVar * > &inner_cols, std::vector< const Analyzer::ColumnVar * > &outer_cols, Executor *executor)
void clearCacheMetricTracker()
Definition: DataRecycler.h:273
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
static std::pair< QueryPlan, HashtableCacheMetaInfo > getHashtableKeyString(const std::vector< InnerOuter > &inner_outer_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, Executor *executor)
CacheItemType
Definition: DataRecycler.h:36
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
virtual std::shared_ptr< HashTable > getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) const =0
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:524
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:556
size_t QueryPlanHash
std::string toString() const override
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
void clearCache() override
bool checkOverlapsHashtableBucketCompatability(const OverlapsHashTableMetaInfo &candidate_bucket_dim, const OverlapsHashTableMetaInfo &target_bucket_dim) const
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:209
std::optional< CachedItem< std::shared_ptr< HashTable >, HashtableCacheMetaInfo > > getCachedItem(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:460
std::tuple< QueryPlanHash, std::shared_ptr< HashTable >, std::optional< HashtableCacheMetaInfo > > getCachedHashtableWithoutCacheKey(std::set< size_t > &visited, CacheItemType hash_table_type, DeviceIdentifier device_identifier)
std::vector< double > bucket_sizes
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:513
std::string QueryPlan
bool g_use_hashtable_cache
Definition: Execute.cpp:140
#define VLOG(n)
Definition: Logger.h:303
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap