OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DataRecycler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Analyzer/Analyzer.h"
25 #include "QueryEngine/ResultSet.h"
27 #include "Shared/misc.h"
28 
29 #include <boost/functional/hash.hpp>
30 
31 #include <algorithm>
32 #include <ostream>
33 #include <unordered_map>
34 
35 struct EMPTY_META_INFO {};
36 
37 // Item type that we try to recycle
39  PERFECT_HT = 0, // Perfect hashtable
40  BASELINE_HT, // Baseline hashtable
41  OVERLAPS_HT, // Overlaps hashtable
42  HT_HASHING_SCHEME, // Hashtable layout
43  BASELINE_HT_APPROX_CARD, // Approximated cardinality for baseline hashtable
44  OVERLAPS_AUTO_TUNER_PARAM, // Hashtable auto tuner's params for overlaps join
45  QUERY_RESULTSET, // query resultset
46  CHUNK_METADATA, // query resultset's chunk metadata
47  // TODO (yoonmin): support the following items for recycling
48  // COUNTALL_CARD_EST, Cardinality of query result
49  // NDV_CARD_EST, # Non-distinct value
50  // FILTER_SEL Selectivity of (push-downed) filter node
52 };
53 
54 inline std::ostream& operator<<(std::ostream& os, CacheItemType const item_type) {
55  constexpr char const* cache_item_type_str[]{
56  "Perfect Join Hashtable",
57  "Baseline Join Hashtable",
58  "Overlaps Join Hashtable",
59  "Hashing Scheme for Join Hashtable",
60  "Baseline Join Hashtable's Approximated Cardinality",
61  "Overlaps Join Hashtable's Auto Tuner's Parameters",
62  "Query ResultSet",
63  "Chunk Metadata"};
64  static_assert(sizeof(cache_item_type_str) / sizeof(*cache_item_type_str) ==
66  return os << cache_item_type_str[item_type];
67 }
68 
69 // given item to be cached, it represents whether the item can be cached when considering
70 // various size limitation
72  AVAILABLE, // item can be cached as is
73  AVAILABLE_AFTER_CLEANUP, // item can be cached after removing already cached items
74  UNAVAILABLE // item cannot be cached due to size limitation
75 };
76 
78 
79 // the order of enum values affects how we remove cached items when
80 // new item wants to be cached but there is not enough space to keep them
81 // regarding `REF_COUNT`, it represents how many times a cached item is referenced during
82 // its lifetime to numerically estimate the usefulness of this cached item
83 // (not to measure exact # reference count at time T as std::shared_ptr does)
85 
86 // per query plan DAG metric
88  public:
89  CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
90  : query_plan_hash_(query_plan_hash), metrics_({0, mem_size, compute_time}) {}
91 
92  QueryPlanHash getQueryPlanHash() const { return query_plan_hash_; }
93 
94  void incRefCount() { ++metrics_[CacheMetricType::REF_COUNT]; }
95 
96  size_t getRefCount() const { return metrics_[CacheMetricType::REF_COUNT]; }
97 
98  size_t getComputeTime() const { return metrics_[CacheMetricType::COMPUTE_TIME]; }
99 
100  size_t getMemSize() const { return metrics_[CacheMetricType::MEM_SIZE]; }
101 
102  const std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>& getMetrics() const {
103  return metrics_;
104  }
105 
106  void setComputeTime(size_t compute_time) {
107  metrics_[CacheMetricType::COMPUTE_TIME] = compute_time;
108  }
109 
110  void setMemSize(const size_t mem_size) {
112  }
113 
114  std::string toString() const {
115  std::ostringstream oss;
116  oss << "Query plan hash: " << query_plan_hash_
117  << ", compute_time: " << metrics_[CacheMetricType::COMPUTE_TIME]
118  << ", mem_size: " << metrics_[CacheMetricType::MEM_SIZE]
119  << ", ref_count: " << metrics_[CacheMetricType::REF_COUNT];
120  return oss.str();
121  }
122 
123  private:
124  const QueryPlanHash query_plan_hash_;
125  std::array<size_t, CacheMetricType::NUM_METRIC_TYPE> metrics_;
126 };
127 
128 // 0 = CPU, 1 ~ N : GPU-1 ~ GPU-N
129 using DeviceIdentifier = size_t;
130 using CacheSizeMap = std::unordered_map<DeviceIdentifier, size_t>;
131 using CacheMetricInfoMap =
132  std::unordered_map<DeviceIdentifier, std::vector<std::shared_ptr<CacheItemMetric>>>;
133 
135  public:
137 
138  static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier) {
139  std::string device_type = device_identifier == CPU_DEVICE_IDENTIFIER ? "CPU" : "GPU-";
140  return device_identifier != CPU_DEVICE_IDENTIFIER
141  ? device_type.append(std::to_string(device_identifier))
142  : device_type;
143  }
144 };
145 
146 // contain information regarding 1) per-cache item metric: perfect ht-1, perfect ht-2,
147 // baseline ht-1, ... and 2) per-type size in current: perfect-ht cache size, baseline-ht
148 // cache size, overlaps-ht cache size, ...
150  public:
152  size_t total_cache_size,
153  size_t max_cache_item_size,
154  int num_gpus = 0)
155  : item_type_(cache_item_type)
156  , total_cache_size_(total_cache_size)
157  , max_cache_item_size_(max_cache_item_size) {
158  // initialize cache metrics for each device: CPU, GPU0, GPU1, ...
159  // Currently we only consider maintaining our cache in CPU-memory
160  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
161  --gpu_device_identifier) {
162  cache_metrics_.emplace(gpu_device_identifier,
163  std::vector<std::shared_ptr<CacheItemMetric>>());
164  current_cache_size_in_bytes_.emplace(gpu_device_identifier, 0);
165  }
167  std::vector<std::shared_ptr<CacheItemMetric>>());
169 
170  if (total_cache_size_ < 1024 * 1024 * 256) {
171  LOG(INFO) << "The total cache size of " << cache_item_type
172  << " is set too low, so we suggest raising it larger than 256MB";
173  }
174 
175  if (max_cache_item_size < 1024 * 1024 * 10) {
176  LOG(INFO)
177  << "The maximum item size of " << cache_item_type
178  << " that can be cached is set too low, we suggest raising it larger than 10MB";
179  }
180  if (max_cache_item_size > total_cache_size_) {
181  LOG(INFO) << "The maximum item size of " << cache_item_type
182  << " is set larger than its total cache size, so we force to set the "
183  "maximum item size as equal to the total cache size";
184  max_cache_item_size = total_cache_size_;
185  }
186  }
187 
188  static inline CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(
189  QueryPlanHash key,
190  CacheMetricInfoMap::mapped_type const& metrics) {
191  auto same_hash = [key](auto itr) { return itr->getQueryPlanHash() == key; };
192  return std::find_if(metrics.cbegin(), metrics.cend(), same_hash);
193  }
194 
195  static inline std::shared_ptr<CacheItemMetric> getCacheItemMetricImpl(
196  QueryPlanHash key,
197  CacheMetricInfoMap::mapped_type const& metrics) {
198  auto itr = getCacheItemMetricItr(key, metrics);
199  return itr == metrics.cend() ? nullptr : *itr;
200  }
201 
202  std::vector<std::shared_ptr<CacheItemMetric>>& getCacheItemMetrics(
203  DeviceIdentifier device_identifier) {
204  auto itr = cache_metrics_.find(device_identifier);
205  CHECK(itr != cache_metrics_.end());
206  return itr->second;
207  }
208 
209  std::shared_ptr<CacheItemMetric> getCacheItemMetric(
210  QueryPlanHash key,
211  DeviceIdentifier device_identifier) const {
212  auto itr = cache_metrics_.find(device_identifier);
213  return itr == cache_metrics_.cend() ? nullptr
214  : getCacheItemMetricImpl(key, itr->second);
215  }
216 
217  void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes) {
218  if (bytes > total_cache_size_) {
219  return;
220  }
221  auto itr = current_cache_size_in_bytes_.find(device_identifier);
222  CHECK(itr != current_cache_size_in_bytes_.end());
223  itr->second = bytes;
224  }
225 
226  std::optional<size_t> getCurrentCacheSize(DeviceIdentifier key) const {
227  auto same_hash = [key](auto itr) { return itr.first == key; };
228  auto itr = std::find_if(current_cache_size_in_bytes_.cbegin(),
230  same_hash);
231  return itr == current_cache_size_in_bytes_.cend() ? std::nullopt
232  : std::make_optional(itr->second);
233  }
234 
235  std::shared_ptr<CacheItemMetric> putNewCacheItemMetric(
236  QueryPlanHash key,
237  DeviceIdentifier device_identifier,
238  size_t mem_size,
239  size_t compute_time) {
240  auto itr = cache_metrics_.find(device_identifier);
241  CHECK(itr != cache_metrics_.end());
242  if (auto cached_metric = getCacheItemMetricImpl(key, itr->second)) {
243  if (cached_metric->getMemSize() != mem_size) {
245  device_identifier, CacheUpdateAction::REMOVE, cached_metric->getMemSize());
246  removeCacheItemMetric(key, device_identifier);
247  } else {
248  cached_metric->incRefCount();
249  return cached_metric;
250  }
251  }
252  auto cache_metric = std::make_shared<CacheItemMetric>(key, compute_time, mem_size);
253  updateCurrentCacheSize(device_identifier, CacheUpdateAction::ADD, mem_size);
254  // we add the item to cache after we create it during query runtime
255  // so it is used at least once
256  cache_metric->incRefCount();
257  return itr->second.emplace_back(std::move(cache_metric));
258  }
259 
260  void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) {
261  auto& cache_metrics = getCacheItemMetrics(device_identifier);
262  auto itr = getCacheItemMetricItr(key, cache_metrics);
263  if (itr != cache_metrics.cend()) {
264  cache_metrics.erase(itr);
265  }
266  }
267 
268  void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset) {
269  auto metrics = getCacheItemMetrics(device_identifier);
270  metrics.erase(metrics.begin(), metrics.begin() + offset);
271  }
272 
274  size_t item_size) const {
275  auto it = current_cache_size_in_bytes_.find(device_identifier);
276  CHECK(it != current_cache_size_in_bytes_.end());
277  CHECK_LE(item_size, total_cache_size_);
278  const auto current_cache_size = it->second;
279  long rem = total_cache_size_ - current_cache_size;
280  return rem < 0 ? item_size : item_size - rem;
281  }
282 
284  for (auto& kv : current_cache_size_in_bytes_) {
285  auto cache_item_metrics = getCacheItemMetrics(kv.first);
286  VLOG(1) << "Clear cache of " << item_type_ << " from device [" << kv.first
287  << "] (# cached items: " << cache_item_metrics.size() << ", " << kv.second
288  << " bytes)";
290  CHECK_EQ(getCurrentCacheSize(kv.first).value(), 0u);
291  }
292  for (auto& kv : cache_metrics_) {
293  kv.second.clear();
294  }
295  }
296 
298  size_t item_size) const {
299  if (item_size > max_cache_item_size_ || item_size > total_cache_size_) {
301  }
302  // now we know that a cache can hold the new item since its size is less than
303  // per-item maximum size limit
304  // check if we need to remove some (or all) of cached item to make a room
305  // for the new item
306  auto current_cache_size = getCurrentCacheSize(device_identifier);
307  CHECK(current_cache_size.has_value());
308  auto cache_size_after_addition = *current_cache_size + item_size;
309  if (cache_size_after_addition > total_cache_size_) {
310  // if so, we need to remove the item to hold the new one within the cache
312  }
313  // cache has a sufficient space to hold the new item
314  // thus, there is no need to remove cached item
316  }
317 
318  void updateCurrentCacheSize(DeviceIdentifier device_identifier,
320  size_t size) {
321  auto current_cache_size = getCurrentCacheSize(device_identifier);
322  CHECK(current_cache_size.has_value());
323  if (action == CacheUpdateAction::ADD) {
324  setCurrentCacheSize(device_identifier, current_cache_size.value() + size);
325  } else {
327  CHECK_LE(size, *current_cache_size);
328  setCurrentCacheSize(device_identifier, current_cache_size.value() - size);
329  }
330  }
331 
333  auto& metric_cache = getCacheItemMetrics(device_identifier);
334  std::sort(metric_cache.begin(),
335  metric_cache.end(),
336  [](const std::shared_ptr<CacheItemMetric>& left,
337  const std::shared_ptr<CacheItemMetric>& right) {
338  auto& elem1_metrics = left->getMetrics();
339  auto& elem2_metrics = right->getMetrics();
340  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
341  if (elem1_metrics[i] != elem2_metrics[i]) {
342  return elem1_metrics[i] < elem2_metrics[i];
343  }
344  }
345  return false;
346  });
347  }
348 
349  std::string toString() const {
350  std::ostringstream oss;
351  oss << "Current memory consumption of caches for each device:\n";
352  for (auto& kv : current_cache_size_in_bytes_) {
353  oss << "\t\tDevice " << kv.first << " : " << kv.second << " bytes\n";
354  }
355  return oss.str();
356  }
357 
358  size_t getTotalCacheSize() const { return total_cache_size_; }
359  size_t getMaxCacheItemSize() const { return max_cache_item_size_; }
360  void setTotalCacheSize(size_t new_total_cache_size) {
361  if (new_total_cache_size > 0) {
362  total_cache_size_ = new_total_cache_size;
363  }
364  }
365  void setMaxCacheItemSize(size_t new_max_cache_item_size) {
366  if (new_max_cache_item_size > 0) {
367  max_cache_item_size_ = new_max_cache_item_size;
368  }
369  }
370 
371  private:
375  // metadata of cached item that belongs to a cache of a specific device
376  // 1) ref_count: how many times this cached item is recycled
377  // 2) memory_usage: the size of cached item in bytes
378  // 3) compute_time: an elapsed time to generate this cached item
380 
381  // the total amount of currently cached data per device
383 };
384 
385 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
386 struct CachedItem {
388  CACHED_ITEM_TYPE item,
389  std::shared_ptr<CacheItemMetric> item_metric_ptr,
390  std::optional<META_INFO_TYPE> metadata = std::nullopt)
391  : key(hashed_plan)
392  , cached_item(item)
393  , item_metric(item_metric_ptr)
394  , meta_info(metadata)
395  , dirty(false) {}
396 
397  void setDirty() { dirty = true; }
398  bool isDirty() const { return dirty; }
399 
401  CACHED_ITEM_TYPE cached_item;
402  std::shared_ptr<CacheItemMetric> item_metric;
403  std::optional<META_INFO_TYPE> meta_info;
404  bool dirty;
405 };
406 
407 // A main class of data recycler
408 // note that some tests which directly accesses APIs for update/modify/delete
409 // (meta)data may need to disable data recycler explicitly before running test suites
410 // to make test scenarios as expected
411 // i.e., UpdelStorageTest that calls fragmenter's updateColumn API
412 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
414  public:
415  using CachedItemContainer = std::vector<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>;
417  std::unordered_map<DeviceIdentifier, std::shared_ptr<CachedItemContainer>>;
419  std::unordered_map<CacheItemType, std::shared_ptr<PerDeviceCacheItemContainer>>;
420  using PerTypeCacheMetricTracker = std::unordered_map<CacheItemType, CacheMetricTracker>;
421 
422  DataRecycler(const std::vector<CacheItemType>& item_types,
423  size_t total_cache_size,
424  size_t max_item_size,
425  int num_gpus) {
426  for (auto& item_type : item_types) {
427  cache_item_types_.insert(item_type);
428  metric_tracker_.emplace(
429  item_type,
430  CacheMetricTracker(item_type, total_cache_size, max_item_size, num_gpus));
431  auto item_container = std::make_shared<PerDeviceCacheItemContainer>();
432  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
433  --gpu_device_identifier) {
434  item_container->emplace(gpu_device_identifier,
435  std::make_shared<CachedItemContainer>());
436  }
437  item_container->emplace(DataRecyclerUtil::CPU_DEVICE_IDENTIFIER,
438  std::make_shared<CachedItemContainer>());
439  cached_items_container_.emplace(item_type, item_container);
440  }
441  }
442 
443  virtual ~DataRecycler() = default;
444 
445  virtual CACHED_ITEM_TYPE getItemFromCache(
446  QueryPlanHash key,
447  CacheItemType item_type,
448  DeviceIdentifier device_identifier,
449  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
450 
451  virtual void putItemToCache(QueryPlanHash key,
452  CACHED_ITEM_TYPE item_ptr,
453  CacheItemType item_type,
454  DeviceIdentifier device_identifier,
455  size_t item_size,
456  size_t compute_time,
457  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
458 
459  virtual void initCache() = 0;
460 
461  virtual void clearCache() = 0;
462 
463  virtual void markCachedItemAsDirty(size_t table_key,
464  std::unordered_set<QueryPlanHash>& key_set,
465  CacheItemType item_type,
466  DeviceIdentifier device_identifier) = 0;
467 
469  auto candidate_it = std::find_if(
470  m.begin(),
471  m.end(),
472  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
473  return cached_item.key == key;
474  });
475  if (candidate_it != m.end()) {
476  candidate_it->setDirty();
477  }
478  }
479 
481  auto candidate_it = std::find_if(
482  m.begin(),
483  m.end(),
484  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
485  return cached_item.key == key;
486  });
487  return candidate_it != m.end() && candidate_it->isDirty();
488  }
489 
490  virtual std::string toString() const = 0;
491 
492  std::shared_ptr<CachedItemContainer> getCachedItemContainer(
493  CacheItemType item_type,
494  DeviceIdentifier device_identifier) const {
495  auto item_type_container_itr = cached_items_container_.find(item_type);
496  if (item_type_container_itr != cached_items_container_.end()) {
497  auto device_type_container_itr =
498  item_type_container_itr->second->find(device_identifier);
499  return device_type_container_itr != item_type_container_itr->second->end()
500  ? device_type_container_itr->second
501  : nullptr;
502  }
503  return nullptr;
504  }
505 
506  std::optional<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>
508  CacheItemType item_type,
509  DeviceIdentifier device_identifier,
511  std::lock_guard<std::mutex>& lock) {
512  auto candidate_it = std::find_if(
513  m.begin(),
514  m.end(),
515  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
516  return cached_item.key == key;
517  });
518  if (candidate_it != m.end()) {
519  if (candidate_it->isDirty()) {
521  key, item_type, device_identifier, lock, candidate_it->meta_info);
522  return std::nullopt;
523  }
524  return *candidate_it;
525  }
526  return std::nullopt;
527  }
528 
530  DeviceIdentifier device_identifier) const {
531  std::lock_guard<std::mutex> lock(cache_lock_);
532  auto container = getCachedItemContainer(item_type, device_identifier);
533  return container ? container->size() : 0;
534  }
535 
537  DeviceIdentifier device_identifier) const {
538  std::lock_guard<std::mutex> lock(cache_lock_);
539  auto container = getCachedItemContainer(item_type, device_identifier);
540  return std::count_if(container->begin(),
541  container->end(),
542  [](const auto& cached_item) { return cached_item.isDirty(); });
543  }
544 
546  DeviceIdentifier device_identifier) const {
547  std::lock_guard<std::mutex> lock(cache_lock_);
548  auto container = getCachedItemContainer(item_type, device_identifier);
549  return std::count_if(container->begin(),
550  container->end(),
551  [](const auto& cached_item) { return !cached_item.isDirty(); });
552  }
553 
555  DeviceIdentifier device_identifier) const {
556  std::lock_guard<std::mutex> lock(cache_lock_);
557  auto metric_tracker = getMetricTracker(item_type);
558  auto current_size_opt = metric_tracker.getCurrentCacheSize(device_identifier);
559  return current_size_opt ? current_size_opt.value() : 0;
560  }
561 
562  std::shared_ptr<CacheItemMetric> getCachedItemMetric(CacheItemType item_type,
563  DeviceIdentifier device_identifier,
564  QueryPlanHash key) const {
565  std::lock_guard<std::mutex> lock(cache_lock_);
566  auto cache_metric_tracker = getMetricTracker(item_type);
567  return cache_metric_tracker.getCacheItemMetric(key, device_identifier);
568  }
569 
570  void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size) {
571  if (new_total_cache_size > 0) {
572  std::lock_guard<std::mutex> lock(cache_lock_);
573  getMetricTracker(item_type).setTotalCacheSize(new_total_cache_size);
574  }
575  }
576 
577  void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size) {
578  if (new_max_cache_item_size > 0) {
579  std::lock_guard<std::mutex> lock(cache_lock_);
580  getMetricTracker(item_type).setMaxCacheItemSize(new_max_cache_item_size);
581  }
582  }
583 
584  protected:
586  DeviceIdentifier device_identifier,
587  int offset) {
588  // it removes cached items located from `idx 0` to `offset`
589  // so, call this function after sorting the cached items container vec
590  // and we should call this function under the proper locking scheme
591  auto container = getCachedItemContainer(item_type, device_identifier);
592  CHECK(container);
593  container->erase(container->begin(), container->begin() + offset);
594  }
595 
597  DeviceIdentifier device_identifier) {
598  // should call this function under the proper locking scheme
599  auto container = getCachedItemContainer(item_type, device_identifier);
600  CHECK(container);
601  std::sort(container->begin(),
602  container->end(),
605  auto& left_metrics = left.item_metric->getMetrics();
606  auto& right_metrics = right.item_metric->getMetrics();
607  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
608  if (left_metrics[i] != right_metrics[i]) {
609  return left_metrics[i] < right_metrics[i];
610  }
611  }
612  return false;
613  });
614  }
615 
616  std::mutex& getCacheLock() const { return cache_lock_; }
617 
619  auto metric_iter = metric_tracker_.find(item_type);
620  CHECK(metric_iter != metric_tracker_.end());
621  return metric_iter->second;
622  }
623 
625  return const_cast<DataRecycler*>(this)->getMetricTracker(item_type);
626  }
627 
628  std::unordered_set<CacheItemType> const& getCacheItemType() const {
629  return cache_item_types_;
630  }
631 
634  }
635 
636  private:
637  // internally called under the proper locking scheme
638  virtual bool hasItemInCache(
639  QueryPlanHash key,
640  CacheItemType item_type,
641  DeviceIdentifier device_identifier,
642  std::lock_guard<std::mutex>& lock,
643  std::optional<META_INFO_TYPE> meta_info = std::nullopt) const = 0;
644 
645  // internally called under the proper locking scheme
646  virtual void removeItemFromCache(
647  QueryPlanHash key,
648  CacheItemType item_type,
649  DeviceIdentifier device_identifier,
650  std::lock_guard<std::mutex>& lock,
651  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
652 
653  // internally called under the proper locking scheme
654  virtual void cleanupCacheForInsertion(
655  CacheItemType item_type,
656  DeviceIdentifier device_identifier,
657  size_t required_size,
658  std::lock_guard<std::mutex>& lock,
659  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
660 
661  // a set of cache item type that this recycler supports
662  std::unordered_set<CacheItemType> cache_item_types_;
663 
664  // cache metric tracker
666 
667  // per-device cached item containers for each cached item type
669 
670  mutable std::mutex cache_lock_;
671 };
Defines data structures for the semantic analysis phase of query processing.
CACHED_ITEM_TYPE cached_item
Definition: DataRecycler.h:401
std::mutex & getCacheLock() const
Definition: DataRecycler.h:616
std::unordered_map< CacheItemType, std::shared_ptr< PerDeviceCacheItemContainer >> PerTypeCacheItemContainer
Definition: DataRecycler.h:419
#define CHECK_EQ(x, y)
Definition: Logger.h:231
CacheUpdateAction
Definition: DataRecycler.h:77
size_t DeviceIdentifier
Definition: DataRecycler.h:129
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:138
virtual std::string toString() const =0
size_t calculateRequiredSpaceForItemAddition(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:273
std::shared_ptr< CacheItemMetric > putNewCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier, size_t mem_size, size_t compute_time)
Definition: DataRecycler.h:235
std::optional< CachedItem< CACHED_ITEM_TYPE, META_INFO_TYPE > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
Definition: DataRecycler.h:507
size_t getCurrentNumCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:529
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:618
CacheItemType item_type_
Definition: DataRecycler.h:372
std::unordered_map< DeviceIdentifier, size_t > CacheSizeMap
Definition: DataRecycler.h:130
DataRecycler(const std::vector< CacheItemType > &item_types, size_t total_cache_size, size_t max_item_size, int num_gpus)
Definition: DataRecycler.h:422
bool isDirty() const
Definition: DataRecycler.h:398
#define LOG(tag)
Definition: Logger.h:217
std::vector< CachedItem< std::optional< HashType >, EMPTY_META_INFO >> CachedItemContainer
Definition: DataRecycler.h:415
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
CacheMetricTracker const & getMetricTracker(CacheItemType item_type) const
Definition: DataRecycler.h:624
void setDirty()
Definition: DataRecycler.h:397
CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
Definition: DataRecycler.h:89
std::unordered_set< CacheItemType > cache_item_types_
Definition: DataRecycler.h:662
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
QueryPlanHash key
Definition: DataRecycler.h:400
std::optional< size_t > getCurrentCacheSize(DeviceIdentifier key) const
Definition: DataRecycler.h:226
size_t getCurrentNumCleanCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:545
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:492
void setMaxCacheItemSize(size_t new_max_cache_item_size)
Definition: DataRecycler.h:365
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:468
void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size)
Definition: DataRecycler.h:577
CacheAvailability canAddItem(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:297
void clearCacheMetricTracker()
Definition: DataRecycler.h:283
std::string to_string(char const *&&v)
PerTypeCacheItemContainer const & getItemCache() const
Definition: DataRecycler.h:632
CacheAvailability
Definition: DataRecycler.h:71
PerTypeCacheMetricTracker metric_tracker_
Definition: DataRecycler.h:665
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:554
std::mutex cache_lock_
Definition: DataRecycler.h:670
void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes)
Definition: DataRecycler.h:217
size_t getMaxCacheItemSize() const
Definition: DataRecycler.h:359
std::vector< std::shared_ptr< CacheItemMetric > > & getCacheItemMetrics(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:202
void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:260
CacheItemType
Definition: DataRecycler.h:38
std::optional< META_INFO_TYPE > meta_info
Definition: DataRecycler.h:403
virtual void initCache()=0
size_t getTotalCacheSize() const
Definition: DataRecycler.h:358
bool isCachedItemDirty(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:480
std::shared_ptr< CacheItemMetric > getCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:209
std::string toString() const
Definition: DataRecycler.h:349
PerTypeCacheItemContainer cached_items_container_
Definition: DataRecycler.h:668
size_t max_cache_item_size_
Definition: DataRecycler.h:374
CacheSizeMap current_cache_size_in_bytes_
Definition: DataRecycler.h:382
void updateCurrentCacheSize(DeviceIdentifier device_identifier, CacheUpdateAction action, size_t size)
Definition: DataRecycler.h:318
virtual void putItemToCache(QueryPlanHash key, CACHED_ITEM_TYPE item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::unordered_map< CacheItemType, CacheMetricTracker > PerTypeCacheMetricTracker
Definition: DataRecycler.h:420
std::shared_ptr< CacheItemMetric > getCachedItemMetric(CacheItemType item_type, DeviceIdentifier device_identifier, QueryPlanHash key) const
Definition: DataRecycler.h:562
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1453
void setTotalCacheSize(size_t new_total_cache_size)
Definition: DataRecycler.h:360
void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size)
Definition: DataRecycler.h:570
static std::shared_ptr< CacheItemMetric > getCacheItemMetricImpl(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:195
size_t getCurrentNumDirtyCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:536
std::unordered_map< DeviceIdentifier, std::shared_ptr< CachedItemContainer >> PerDeviceCacheItemContainer
Definition: DataRecycler.h:417
CacheMetricType
Definition: DataRecycler.h:84
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:596
#define CHECK_LE(x, y)
Definition: Logger.h:234
void sortCacheInfoByQueryMetric(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:332
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:628
CacheMetricTracker(CacheItemType cache_item_type, size_t total_cache_size, size_t max_cache_item_size, int num_gpus=0)
Definition: DataRecycler.h:151
virtual ~DataRecycler()=default
size_t QueryPlanHash
std::unordered_map< DeviceIdentifier, std::vector< std::shared_ptr< CacheItemMetric >>> CacheMetricInfoMap
Definition: DataRecycler.h:132
static CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:188
CachedItem(QueryPlanHash hashed_plan, CACHED_ITEM_TYPE item, std::shared_ptr< CacheItemMetric > item_metric_ptr, std::optional< META_INFO_TYPE > metadata=std::nullopt)
Definition: DataRecycler.h:387
virtual bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:79
CacheMetricInfoMap cache_metrics_
Definition: DataRecycler.h:379
#define CHECK(condition)
Definition: Logger.h:223
virtual void clearCache()=0
virtual void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
Basic constructors and methods of the row set interface.
virtual CACHED_ITEM_TYPE getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
virtual void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier)=0
void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:268
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:585
virtual void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::shared_ptr< CacheItemMetric > item_metric
Definition: DataRecycler.h:402
#define VLOG(n)
Definition: Logger.h:317
std::array< size_t, CacheMetricType::NUM_METRIC_TYPE > metrics_
Definition: DataRecycler.h:90