OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DataRecycler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Analyzer/Analyzer.h"
25 #include "QueryEngine/ResultSet.h"
27 #include "Shared/misc.h"
28 
29 #include <boost/functional/hash.hpp>
30 
31 #include <algorithm>
32 #include <ostream>
33 #include <unordered_map>
34 
35 struct EMPTY_META_INFO {};
36 
37 // Item type that we try to recycle
39  PERFECT_HT = 0, // Perfect hashtable
40  BASELINE_HT, // Baseline hashtable
41  OVERLAPS_HT, // Overlaps hashtable
42  HT_HASHING_SCHEME, // Hashtable layout
43  BASELINE_HT_APPROX_CARD, // Approximated cardinality for baseline hashtable
44  OVERLAPS_AUTO_TUNER_PARAM, // Hashtable auto tuner's params for overlaps join
45  QUERY_RESULTSET, // query resultset
46  CHUNK_METADATA, // query resultset's chunk metadata
47  // TODO (yoonmin): support the following items for recycling
48  // COUNTALL_CARD_EST, Cardinality of query result
49  // NDV_CARD_EST, # Non-distinct value
50  // FILTER_SEL Selectivity of (push-downed) filter node
52 };
53 
54 inline std::ostream& operator<<(std::ostream& os, CacheItemType const item_type) {
55  constexpr char const* cache_item_type_str[]{
56  "Perfect Join Hashtable",
57  "Baseline Join Hashtable",
58  "Overlaps Join Hashtable",
59  "Hashing Scheme for Join Hashtable",
60  "Baseline Join Hashtable's Approximated Cardinality",
61  "Overlaps Join Hashtable's Auto Tuner's Parameters",
62  "Query ResultSet",
63  "Chunk Metadata"};
64  static_assert(sizeof(cache_item_type_str) / sizeof(*cache_item_type_str) ==
66  return os << cache_item_type_str[item_type];
67 }
68 
69 // given item to be cached, it represents whether the item can be cached when considering
70 // various size limitation
72  AVAILABLE, // item can be cached as is
73  AVAILABLE_AFTER_CLEANUP, // item can be cached after removing already cached items
74  UNAVAILABLE // item cannot be cached due to size limitation
75 };
76 
78 
79 // the order of enum values affects how we remove cached items when
80 // new item wants to be cached but there is not enough space to keep them
81 // regarding `REF_COUNT`, it represents how many times a cached item is referenced during
82 // its lifetime to numerically estimate the usefulness of this cached item
83 // (not to measure exact # reference count at time T as std::shared_ptr does)
85 
86 // per query plan DAG metric
88  public:
89  CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
90  : query_plan_hash_(query_plan_hash), metrics_({0, mem_size, compute_time}) {}
91 
92  QueryPlanHash getQueryPlanHash() const { return query_plan_hash_; }
93 
94  void incRefCount() { ++metrics_[CacheMetricType::REF_COUNT]; }
95 
96  size_t getRefCount() const { return metrics_[CacheMetricType::REF_COUNT]; }
97 
98  size_t getComputeTime() const { return metrics_[CacheMetricType::COMPUTE_TIME]; }
99 
100  size_t getMemSize() const { return metrics_[CacheMetricType::MEM_SIZE]; }
101 
102  const std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>& getMetrics() const {
103  return metrics_;
104  }
105 
106  void setComputeTime(size_t compute_time) {
107  metrics_[CacheMetricType::COMPUTE_TIME] = compute_time;
108  }
109 
110  void setMemSize(const size_t mem_size) {
112  }
113 
114  std::string toString() const {
115  std::ostringstream oss;
116  oss << "Query plan hash: " << query_plan_hash_
117  << ", compute_time: " << metrics_[CacheMetricType::COMPUTE_TIME]
118  << ", mem_size: " << metrics_[CacheMetricType::MEM_SIZE]
119  << ", ref_count: " << metrics_[CacheMetricType::REF_COUNT];
120  return oss.str();
121  }
122 
123  private:
124  const QueryPlanHash query_plan_hash_;
125  std::array<size_t, CacheMetricType::NUM_METRIC_TYPE> metrics_;
126 };
127 
128 // 0 = CPU, 1 ~ N : GPU-1 ~ GPU-N
129 using DeviceIdentifier = size_t;
130 using CacheSizeMap = std::unordered_map<DeviceIdentifier, size_t>;
131 using CacheMetricInfoMap =
132  std::unordered_map<DeviceIdentifier, std::vector<std::shared_ptr<CacheItemMetric>>>;
133 
135  public:
137 
138  static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier) {
139  std::string device_type = device_identifier == CPU_DEVICE_IDENTIFIER ? "CPU" : "GPU-";
140  return device_identifier != CPU_DEVICE_IDENTIFIER
141  ? device_type.append(std::to_string(device_identifier))
142  : device_type;
143  }
144 
146  // C++20 will support constexpr vector
147  // Before we have C++20, let's use a pre-computed constant which is retrieved from
148  // std::vector<int> unitary_table_identifier = {-1, -1};
149  // UNITARY_TABLE_ID_HASH_VALUE = boost::hash_value(unitary_table_identifier);
150  constexpr QueryPlanHash UNITARY_TABLE_ID_HASH_VALUE = 1703092966009212028;
151  return UNITARY_TABLE_ID_HASH_VALUE;
152  }
153 
154  static std::unordered_set<size_t> getAlternativeTableKeys(
155  const std::vector<ChunkKey>& chunk_keys,
156  int db_id,
157  int inner_table_id) {
158  std::unordered_set<size_t> alternative_table_keys;
159  if (!chunk_keys.empty() && chunk_keys.front().size() > 2 &&
160  chunk_keys.front()[1] > 0) {
161  auto& chunk_key = chunk_keys.front();
162  // the actual chunks fetched per device can be different but they constitute the
163  // same table in the same db, so we can exploit this to create an alternative table
164  // key
165  std::vector<int> alternative_table_key{chunk_key[0], chunk_key[1]};
166  alternative_table_keys.insert(boost::hash_value(alternative_table_key));
167  } else if (inner_table_id > 0) {
168  // use this path if chunk_keys is empty
169  std::vector<int> alternative_table_key{db_id, inner_table_id};
170  alternative_table_keys.insert(boost::hash_value(alternative_table_key));
171  } else {
172  // this can happen when we use synthetic table generated by table function such as
173  // generate_series, i.e., SELECT ... FROM table(generate_series(...)) ...
174  // then we try to manage them via predefined static chunk key
175  // and remove them "all" when necessary
176  alternative_table_keys.insert(DataRecyclerUtil::getUnitaryTableKey());
177  }
178  return alternative_table_keys;
179  }
180 };
181 
182 // contain information regarding 1) per-cache item metric: perfect ht-1, perfect ht-2,
183 // baseline ht-1, ... and 2) per-type size in current: perfect-ht cache size, baseline-ht
184 // cache size, overlaps-ht cache size, ...
186  public:
188  size_t total_cache_size,
189  size_t max_cache_item_size,
190  int num_gpus = 0)
191  : item_type_(cache_item_type)
192  , total_cache_size_(total_cache_size)
193  , max_cache_item_size_(max_cache_item_size) {
194  // initialize cache metrics for each device: CPU, GPU0, GPU1, ...
195  // Currently we only consider maintaining our cache in CPU-memory
196  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
197  --gpu_device_identifier) {
198  cache_metrics_.emplace(gpu_device_identifier,
199  std::vector<std::shared_ptr<CacheItemMetric>>());
200  current_cache_size_in_bytes_.emplace(gpu_device_identifier, 0);
201  }
203  std::vector<std::shared_ptr<CacheItemMetric>>());
205 
206  if (total_cache_size_ < 1024 * 1024 * 256) {
207  LOG(INFO) << "The total cache size of " << cache_item_type
208  << " is set too low, so we suggest raising it larger than 256MB";
209  }
210 
211  if (max_cache_item_size < 1024 * 1024 * 10) {
212  LOG(INFO)
213  << "The maximum item size of " << cache_item_type
214  << " that can be cached is set too low, we suggest raising it larger than 10MB";
215  }
216  if (max_cache_item_size > total_cache_size_) {
217  LOG(INFO) << "The maximum item size of " << cache_item_type
218  << " is set larger than its total cache size, so we force to set the "
219  "maximum item size as equal to the total cache size";
220  max_cache_item_size = total_cache_size_;
221  }
222  }
223 
224  static inline CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(
225  QueryPlanHash key,
226  CacheMetricInfoMap::mapped_type const& metrics) {
227  auto same_hash = [key](auto itr) { return itr->getQueryPlanHash() == key; };
228  return std::find_if(metrics.cbegin(), metrics.cend(), same_hash);
229  }
230 
231  static inline std::shared_ptr<CacheItemMetric> getCacheItemMetricImpl(
232  QueryPlanHash key,
233  CacheMetricInfoMap::mapped_type const& metrics) {
234  auto itr = getCacheItemMetricItr(key, metrics);
235  return itr == metrics.cend() ? nullptr : *itr;
236  }
237 
238  std::vector<std::shared_ptr<CacheItemMetric>>& getCacheItemMetrics(
239  DeviceIdentifier device_identifier) {
240  auto itr = cache_metrics_.find(device_identifier);
241  CHECK(itr != cache_metrics_.end());
242  return itr->second;
243  }
244 
245  std::shared_ptr<CacheItemMetric> getCacheItemMetric(
246  QueryPlanHash key,
247  DeviceIdentifier device_identifier) const {
248  auto itr = cache_metrics_.find(device_identifier);
249  return itr == cache_metrics_.cend() ? nullptr
250  : getCacheItemMetricImpl(key, itr->second);
251  }
252 
253  void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes) {
254  if (bytes > total_cache_size_) {
255  return;
256  }
257  auto itr = current_cache_size_in_bytes_.find(device_identifier);
258  CHECK(itr != current_cache_size_in_bytes_.end());
259  itr->second = bytes;
260  }
261 
262  std::optional<size_t> getCurrentCacheSize(DeviceIdentifier key) const {
263  auto same_hash = [key](auto itr) { return itr.first == key; };
264  auto itr = std::find_if(current_cache_size_in_bytes_.cbegin(),
266  same_hash);
267  return itr == current_cache_size_in_bytes_.cend() ? std::nullopt
268  : std::make_optional(itr->second);
269  }
270 
271  std::shared_ptr<CacheItemMetric> putNewCacheItemMetric(
272  QueryPlanHash key,
273  DeviceIdentifier device_identifier,
274  size_t mem_size,
275  size_t compute_time) {
276  auto itr = cache_metrics_.find(device_identifier);
277  CHECK(itr != cache_metrics_.end());
278  if (auto cached_metric = getCacheItemMetricImpl(key, itr->second)) {
279  if (cached_metric->getMemSize() != mem_size) {
281  device_identifier, CacheUpdateAction::REMOVE, cached_metric->getMemSize());
282  removeCacheItemMetric(key, device_identifier);
283  } else {
284  cached_metric->incRefCount();
285  return cached_metric;
286  }
287  }
288  auto cache_metric = std::make_shared<CacheItemMetric>(key, compute_time, mem_size);
289  updateCurrentCacheSize(device_identifier, CacheUpdateAction::ADD, mem_size);
290  // we add the item to cache after we create it during query runtime
291  // so it is used at least once
292  cache_metric->incRefCount();
293  return itr->second.emplace_back(std::move(cache_metric));
294  }
295 
296  void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) {
297  auto& cache_metrics = getCacheItemMetrics(device_identifier);
298  auto itr = getCacheItemMetricItr(key, cache_metrics);
299  if (itr != cache_metrics.cend()) {
300  cache_metrics.erase(itr);
301  }
302  }
303 
304  void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset) {
305  auto metrics = getCacheItemMetrics(device_identifier);
306  metrics.erase(metrics.begin(), metrics.begin() + offset);
307  }
308 
310  size_t item_size) const {
311  auto it = current_cache_size_in_bytes_.find(device_identifier);
312  CHECK(it != current_cache_size_in_bytes_.end());
313  CHECK_LE(item_size, total_cache_size_);
314  const auto current_cache_size = it->second;
315  long rem = total_cache_size_ - current_cache_size;
316  return rem < 0 ? item_size : item_size - rem;
317  }
318 
320  for (auto& kv : current_cache_size_in_bytes_) {
321  auto cache_item_metrics = getCacheItemMetrics(kv.first);
322  if (kv.first > 0) {
323  VLOG(1) << "[" << item_type_ << "]"
324  << "] clear cache metrics (# items: " << kv.first << ", " << kv.second
325  << " bytes)";
326  }
328  CHECK_EQ(getCurrentCacheSize(kv.first).value(), 0u);
329  }
330  for (auto& kv : cache_metrics_) {
331  kv.second.clear();
332  }
333  }
334 
336  size_t item_size) const {
337  if (item_size > max_cache_item_size_ || item_size > total_cache_size_) {
339  }
340  // now we know that a cache can hold the new item since its size is less than
341  // per-item maximum size limit
342  // check if we need to remove some (or all) of cached item to make a room
343  // for the new item
344  auto current_cache_size = getCurrentCacheSize(device_identifier);
345  CHECK(current_cache_size.has_value());
346  auto cache_size_after_addition = *current_cache_size + item_size;
347  if (cache_size_after_addition > total_cache_size_) {
348  // if so, we need to remove the item to hold the new one within the cache
350  }
351  // cache has a sufficient space to hold the new item
352  // thus, there is no need to remove cached item
354  }
355 
356  void updateCurrentCacheSize(DeviceIdentifier device_identifier,
358  size_t size) {
359  auto current_cache_size = getCurrentCacheSize(device_identifier);
360  CHECK(current_cache_size.has_value());
361  if (action == CacheUpdateAction::ADD) {
362  setCurrentCacheSize(device_identifier, current_cache_size.value() + size);
363  } else {
365  CHECK_LE(size, *current_cache_size);
366  setCurrentCacheSize(device_identifier, current_cache_size.value() - size);
367  }
368  }
369 
371  auto& metric_cache = getCacheItemMetrics(device_identifier);
372  std::sort(metric_cache.begin(),
373  metric_cache.end(),
374  [](const std::shared_ptr<CacheItemMetric>& left,
375  const std::shared_ptr<CacheItemMetric>& right) {
376  auto& elem1_metrics = left->getMetrics();
377  auto& elem2_metrics = right->getMetrics();
378  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
379  if (elem1_metrics[i] != elem2_metrics[i]) {
380  return elem1_metrics[i] < elem2_metrics[i];
381  }
382  }
383  return false;
384  });
385  }
386 
387  std::string toString() const {
388  std::ostringstream oss;
389  oss << "Current memory consumption of caches for each device:\n";
390  for (auto& kv : current_cache_size_in_bytes_) {
391  oss << "\t\tDevice " << kv.first << " : " << kv.second << " bytes\n";
392  }
393  return oss.str();
394  }
395 
396  size_t getTotalCacheSize() const { return total_cache_size_; }
397  size_t getMaxCacheItemSize() const { return max_cache_item_size_; }
398  void setTotalCacheSize(size_t new_total_cache_size) {
399  if (new_total_cache_size > 0) {
400  total_cache_size_ = new_total_cache_size;
401  }
402  }
403  void setMaxCacheItemSize(size_t new_max_cache_item_size) {
404  if (new_max_cache_item_size > 0) {
405  max_cache_item_size_ = new_max_cache_item_size;
406  }
407  }
408 
409  private:
413  // metadata of cached item that belongs to a cache of a specific device
414  // 1) ref_count: how many times this cached item is recycled
415  // 2) memory_usage: the size of cached item in bytes
416  // 3) compute_time: an elapsed time to generate this cached item
418 
419  // the total amount of currently cached data per device
421 };
422 
423 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
424 struct CachedItem {
426  CACHED_ITEM_TYPE item,
427  std::shared_ptr<CacheItemMetric> item_metric_ptr,
428  std::optional<META_INFO_TYPE> metadata = std::nullopt)
429  : key(hashed_plan)
430  , cached_item(item)
431  , item_metric(item_metric_ptr)
432  , meta_info(metadata)
433  , dirty(false) {}
434 
435  void setDirty() { dirty = true; }
436  bool isDirty() const { return dirty; }
437 
439  CACHED_ITEM_TYPE cached_item;
440  std::shared_ptr<CacheItemMetric> item_metric;
441  std::optional<META_INFO_TYPE> meta_info;
442  bool dirty;
443 };
444 
445 // A main class of data recycler
446 // note that some tests which directly accesses APIs for update/modify/delete
447 // (meta)data may need to disable data recycler explicitly before running test suites
448 // to make test scenarios as expected
449 // i.e., UpdelStorageTest that calls fragmenter's updateColumn API
450 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
452  public:
453  using CachedItemContainer = std::vector<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>;
455  std::unordered_map<DeviceIdentifier, std::shared_ptr<CachedItemContainer>>;
457  std::unordered_map<CacheItemType, std::shared_ptr<PerDeviceCacheItemContainer>>;
458  using PerTypeCacheMetricTracker = std::unordered_map<CacheItemType, CacheMetricTracker>;
459 
460  DataRecycler(const std::vector<CacheItemType>& item_types,
461  size_t total_cache_size,
462  size_t max_item_size,
463  int num_gpus) {
464  for (auto& item_type : item_types) {
465  cache_item_types_.insert(item_type);
466  metric_tracker_.emplace(
467  item_type,
468  CacheMetricTracker(item_type, total_cache_size, max_item_size, num_gpus));
469  auto item_container = std::make_shared<PerDeviceCacheItemContainer>();
470  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
471  --gpu_device_identifier) {
472  item_container->emplace(gpu_device_identifier,
473  std::make_shared<CachedItemContainer>());
474  }
475  item_container->emplace(DataRecyclerUtil::CPU_DEVICE_IDENTIFIER,
476  std::make_shared<CachedItemContainer>());
477  cached_items_container_.emplace(item_type, item_container);
478  }
479  }
480 
481  virtual ~DataRecycler() = default;
482 
483  virtual CACHED_ITEM_TYPE getItemFromCache(
484  QueryPlanHash key,
485  CacheItemType item_type,
486  DeviceIdentifier device_identifier,
487  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
488 
489  virtual void putItemToCache(QueryPlanHash key,
490  CACHED_ITEM_TYPE item_ptr,
491  CacheItemType item_type,
492  DeviceIdentifier device_identifier,
493  size_t item_size,
494  size_t compute_time,
495  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
496 
497  virtual void initCache() = 0;
498 
499  virtual void clearCache() = 0;
500 
501  virtual void markCachedItemAsDirty(size_t table_key,
502  std::unordered_set<QueryPlanHash>& key_set,
503  CacheItemType item_type,
504  DeviceIdentifier device_identifier) = 0;
505 
507  auto candidate_it = std::find_if(
508  m.begin(),
509  m.end(),
510  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
511  return cached_item.key == key;
512  });
513  if (candidate_it != m.end()) {
514  candidate_it->setDirty();
515  }
516  }
517 
519  auto candidate_it = std::find_if(
520  m.begin(),
521  m.end(),
522  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
523  return cached_item.key == key;
524  });
525  return candidate_it != m.end() && candidate_it->isDirty();
526  }
527 
528  virtual std::string toString() const = 0;
529 
530  std::shared_ptr<CachedItemContainer> getCachedItemContainer(
531  CacheItemType item_type,
532  DeviceIdentifier device_identifier) const {
533  auto item_type_container_itr = cached_items_container_.find(item_type);
534  if (item_type_container_itr != cached_items_container_.end()) {
535  auto device_type_container_itr =
536  item_type_container_itr->second->find(device_identifier);
537  return device_type_container_itr != item_type_container_itr->second->end()
538  ? device_type_container_itr->second
539  : nullptr;
540  }
541  return nullptr;
542  }
543 
544  std::optional<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>
546  CacheItemType item_type,
547  DeviceIdentifier device_identifier,
549  std::lock_guard<std::mutex>& lock) {
550  auto candidate_it = std::find_if(
551  m.begin(),
552  m.end(),
553  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
554  return cached_item.key == key;
555  });
556  if (candidate_it != m.end()) {
557  if (candidate_it->isDirty()) {
559  key, item_type, device_identifier, lock, candidate_it->meta_info);
560  return std::nullopt;
561  }
562  return *candidate_it;
563  }
564  return std::nullopt;
565  }
566 
568  DeviceIdentifier device_identifier) const {
569  std::lock_guard<std::mutex> lock(cache_lock_);
570  auto container = getCachedItemContainer(item_type, device_identifier);
571  return container ? container->size() : 0;
572  }
573 
575  DeviceIdentifier device_identifier) const {
576  std::lock_guard<std::mutex> lock(cache_lock_);
577  auto container = getCachedItemContainer(item_type, device_identifier);
578  return std::count_if(container->begin(),
579  container->end(),
580  [](const auto& cached_item) { return cached_item.isDirty(); });
581  }
582 
584  DeviceIdentifier device_identifier) const {
585  std::lock_guard<std::mutex> lock(cache_lock_);
586  auto container = getCachedItemContainer(item_type, device_identifier);
587  return std::count_if(container->begin(),
588  container->end(),
589  [](const auto& cached_item) { return !cached_item.isDirty(); });
590  }
591 
593  DeviceIdentifier device_identifier) const {
594  std::lock_guard<std::mutex> lock(cache_lock_);
595  auto metric_tracker = getMetricTracker(item_type);
596  auto current_size_opt = metric_tracker.getCurrentCacheSize(device_identifier);
597  return current_size_opt ? current_size_opt.value() : 0;
598  }
599 
600  std::shared_ptr<CacheItemMetric> getCachedItemMetric(CacheItemType item_type,
601  DeviceIdentifier device_identifier,
602  QueryPlanHash key) const {
603  std::lock_guard<std::mutex> lock(cache_lock_);
604  auto cache_metric_tracker = getMetricTracker(item_type);
605  return cache_metric_tracker.getCacheItemMetric(key, device_identifier);
606  }
607 
608  void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size) {
609  if (new_total_cache_size > 0) {
610  std::lock_guard<std::mutex> lock(cache_lock_);
611  getMetricTracker(item_type).setTotalCacheSize(new_total_cache_size);
612  }
613  }
614 
615  void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size) {
616  if (new_max_cache_item_size > 0) {
617  std::lock_guard<std::mutex> lock(cache_lock_);
618  getMetricTracker(item_type).setMaxCacheItemSize(new_max_cache_item_size);
619  }
620  }
621 
622  protected:
624  DeviceIdentifier device_identifier,
625  int offset) {
626  // it removes cached items located from `idx 0` to `offset`
627  // so, call this function after sorting the cached items container vec
628  // and we should call this function under the proper locking scheme
629  auto container = getCachedItemContainer(item_type, device_identifier);
630  CHECK(container);
631  container->erase(container->begin(), container->begin() + offset);
632  }
633 
635  DeviceIdentifier device_identifier) {
636  // should call this function under the proper locking scheme
637  auto container = getCachedItemContainer(item_type, device_identifier);
638  CHECK(container);
639  std::sort(container->begin(),
640  container->end(),
643  auto& left_metrics = left.item_metric->getMetrics();
644  auto& right_metrics = right.item_metric->getMetrics();
645  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
646  if (left_metrics[i] != right_metrics[i]) {
647  return left_metrics[i] < right_metrics[i];
648  }
649  }
650  return false;
651  });
652  }
653 
654  std::mutex& getCacheLock() const { return cache_lock_; }
655 
657  auto metric_iter = metric_tracker_.find(item_type);
658  CHECK(metric_iter != metric_tracker_.end());
659  return metric_iter->second;
660  }
661 
663  return const_cast<DataRecycler*>(this)->getMetricTracker(item_type);
664  }
665 
666  std::unordered_set<CacheItemType> const& getCacheItemType() const {
667  return cache_item_types_;
668  }
669 
672  }
673 
674  private:
675  // internally called under the proper locking scheme
676  virtual bool hasItemInCache(
677  QueryPlanHash key,
678  CacheItemType item_type,
679  DeviceIdentifier device_identifier,
680  std::lock_guard<std::mutex>& lock,
681  std::optional<META_INFO_TYPE> meta_info = std::nullopt) const = 0;
682 
683  // internally called under the proper locking scheme
684  virtual void removeItemFromCache(
685  QueryPlanHash key,
686  CacheItemType item_type,
687  DeviceIdentifier device_identifier,
688  std::lock_guard<std::mutex>& lock,
689  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
690 
691  // internally called under the proper locking scheme
692  virtual void cleanupCacheForInsertion(
693  CacheItemType item_type,
694  DeviceIdentifier device_identifier,
695  size_t required_size,
696  std::lock_guard<std::mutex>& lock,
697  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
698 
699  // a set of cache item type that this recycler supports
700  std::unordered_set<CacheItemType> cache_item_types_;
701 
702  // cache metric tracker
704 
705  // per-device cached item containers for each cached item type
707 
708  mutable std::mutex cache_lock_;
709 };
Defines data structures for the semantic analysis phase of query processing.
CACHED_ITEM_TYPE cached_item
Definition: DataRecycler.h:439
std::mutex & getCacheLock() const
Definition: DataRecycler.h:654
std::unordered_map< CacheItemType, std::shared_ptr< PerDeviceCacheItemContainer >> PerTypeCacheItemContainer
Definition: DataRecycler.h:457
#define CHECK_EQ(x, y)
Definition: Logger.h:230
CacheUpdateAction
Definition: DataRecycler.h:77
size_t DeviceIdentifier
Definition: DataRecycler.h:129
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:138
virtual std::string toString() const =0
size_t calculateRequiredSpaceForItemAddition(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:309
std::shared_ptr< CacheItemMetric > putNewCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier, size_t mem_size, size_t compute_time)
Definition: DataRecycler.h:271
std::optional< CachedItem< CACHED_ITEM_TYPE, META_INFO_TYPE > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
Definition: DataRecycler.h:545
size_t getCurrentNumCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:567
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:656
CacheItemType item_type_
Definition: DataRecycler.h:410
std::unordered_map< DeviceIdentifier, size_t > CacheSizeMap
Definition: DataRecycler.h:130
DataRecycler(const std::vector< CacheItemType > &item_types, size_t total_cache_size, size_t max_item_size, int num_gpus)
Definition: DataRecycler.h:460
bool isDirty() const
Definition: DataRecycler.h:436
#define LOG(tag)
Definition: Logger.h:216
std::vector< CachedItem< std::optional< HashType >, EMPTY_META_INFO >> CachedItemContainer
Definition: DataRecycler.h:453
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
CacheMetricTracker const & getMetricTracker(CacheItemType item_type) const
Definition: DataRecycler.h:662
void setDirty()
Definition: DataRecycler.h:435
CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
Definition: DataRecycler.h:89
std::unordered_set< CacheItemType > cache_item_types_
Definition: DataRecycler.h:700
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
QueryPlanHash key
Definition: DataRecycler.h:438
std::optional< size_t > getCurrentCacheSize(DeviceIdentifier key) const
Definition: DataRecycler.h:262
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
size_t getCurrentNumCleanCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:583
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:530
void setMaxCacheItemSize(size_t new_max_cache_item_size)
Definition: DataRecycler.h:403
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:506
void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size)
Definition: DataRecycler.h:615
CacheAvailability canAddItem(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:335
void clearCacheMetricTracker()
Definition: DataRecycler.h:319
std::string to_string(char const *&&v)
PerTypeCacheItemContainer const & getItemCache() const
Definition: DataRecycler.h:670
CacheAvailability
Definition: DataRecycler.h:71
PerTypeCacheMetricTracker metric_tracker_
Definition: DataRecycler.h:703
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:592
std::mutex cache_lock_
Definition: DataRecycler.h:708
void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes)
Definition: DataRecycler.h:253
size_t getMaxCacheItemSize() const
Definition: DataRecycler.h:397
std::vector< std::shared_ptr< CacheItemMetric > > & getCacheItemMetrics(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:238
void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:296
CacheItemType
Definition: DataRecycler.h:38
std::optional< META_INFO_TYPE > meta_info
Definition: DataRecycler.h:441
virtual void initCache()=0
size_t getTotalCacheSize() const
Definition: DataRecycler.h:396
bool isCachedItemDirty(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:518
std::shared_ptr< CacheItemMetric > getCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:245
std::string toString() const
Definition: DataRecycler.h:387
PerTypeCacheItemContainer cached_items_container_
Definition: DataRecycler.h:706
size_t max_cache_item_size_
Definition: DataRecycler.h:412
CacheSizeMap current_cache_size_in_bytes_
Definition: DataRecycler.h:420
void updateCurrentCacheSize(DeviceIdentifier device_identifier, CacheUpdateAction action, size_t size)
Definition: DataRecycler.h:356
virtual void putItemToCache(QueryPlanHash key, CACHED_ITEM_TYPE item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::unordered_map< CacheItemType, CacheMetricTracker > PerTypeCacheMetricTracker
Definition: DataRecycler.h:458
std::shared_ptr< CacheItemMetric > getCachedItemMetric(CacheItemType item_type, DeviceIdentifier device_identifier, QueryPlanHash key) const
Definition: DataRecycler.h:600
static QueryPlanHash getUnitaryTableKey()
Definition: DataRecycler.h:145
void setTotalCacheSize(size_t new_total_cache_size)
Definition: DataRecycler.h:398
void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size)
Definition: DataRecycler.h:608
static std::shared_ptr< CacheItemMetric > getCacheItemMetricImpl(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:231
size_t getCurrentNumDirtyCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:574
std::unordered_map< DeviceIdentifier, std::shared_ptr< CachedItemContainer >> PerDeviceCacheItemContainer
Definition: DataRecycler.h:455
CacheMetricType
Definition: DataRecycler.h:84
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:634
#define CHECK_LE(x, y)
Definition: Logger.h:233
void sortCacheInfoByQueryMetric(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:370
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, int db_id, int inner_table_id)
Definition: DataRecycler.h:154
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:666
CacheMetricTracker(CacheItemType cache_item_type, size_t total_cache_size, size_t max_cache_item_size, int num_gpus=0)
Definition: DataRecycler.h:187
virtual ~DataRecycler()=default
size_t QueryPlanHash
std::unordered_map< DeviceIdentifier, std::vector< std::shared_ptr< CacheItemMetric >>> CacheMetricInfoMap
Definition: DataRecycler.h:132
static CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:224
CachedItem(QueryPlanHash hashed_plan, CACHED_ITEM_TYPE item, std::shared_ptr< CacheItemMetric > item_metric_ptr, std::optional< META_INFO_TYPE > metadata=std::nullopt)
Definition: DataRecycler.h:425
virtual bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:79
CacheMetricInfoMap cache_metrics_
Definition: DataRecycler.h:417
#define CHECK(condition)
Definition: Logger.h:222
virtual void clearCache()=0
virtual void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
Basic constructors and methods of the row set interface.
virtual CACHED_ITEM_TYPE getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
virtual void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier)=0
void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:304
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:623
virtual void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::shared_ptr< CacheItemMetric > item_metric
Definition: DataRecycler.h:440
#define VLOG(n)
Definition: Logger.h:316
std::array< size_t, CacheMetricType::NUM_METRIC_TYPE > metrics_
Definition: DataRecycler.h:90