29 #include <boost/functional/hash.hpp>
33 #include <unordered_map>
55 constexpr
char const* cache_item_type_str[]{
56 "Perfect Join Hashtable",
57 "Baseline Join Hashtable",
58 "Overlaps Join Hashtable",
59 "Hashing Scheme for Join Hashtable",
60 "Baseline Join Hashtable's Approximated Cardinality",
61 "Overlaps Join Hashtable's Auto Tuner's Parameters",
64 static_assert(
sizeof(cache_item_type_str) /
sizeof(*cache_item_type_str) ==
66 return os << cache_item_type_str[item_type];
90 : query_plan_hash_(query_plan_hash),
metrics_({0, mem_size, compute_time}) {}
92 QueryPlanHash getQueryPlanHash()
const {
return query_plan_hash_; }
102 const std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>& getMetrics()
const {
106 void setComputeTime(
size_t compute_time) {
110 void setMemSize(
const size_t mem_size) {
115 std::ostringstream oss;
116 oss <<
"Query plan hash: " << query_plan_hash_
125 std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>
metrics_;
132 std::unordered_map<DeviceIdentifier, std::vector<std::shared_ptr<CacheItemMetric>>>;
152 size_t total_cache_size,
153 size_t max_cache_item_size,
160 for (
int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
161 --gpu_device_identifier) {
163 std::vector<std::shared_ptr<CacheItemMetric>>());
167 std::vector<std::shared_ptr<CacheItemMetric>>());
171 LOG(
INFO) <<
"The total cache size of " << cache_item_type
172 <<
" is set too low, so we suggest raising it larger than 256MB";
175 if (max_cache_item_size < 1024 * 1024 * 10) {
177 <<
"The maximum item size of " << cache_item_type
178 <<
" that can be cached is set too low, we suggest raising it larger than 10MB";
181 LOG(
INFO) <<
"The maximum item size of " << cache_item_type
182 <<
" is set larger than its total cache size, so we force to set the "
183 "maximum item size as equal to the total cache size";
190 CacheMetricInfoMap::mapped_type
const& metrics) {
191 auto same_hash = [key](
auto itr) {
return itr->getQueryPlanHash() == key; };
192 return std::find_if(metrics.cbegin(), metrics.cend(), same_hash);
197 CacheMetricInfoMap::mapped_type
const& metrics) {
199 return itr == metrics.cend() ?
nullptr : *itr;
227 auto same_hash = [key](
auto itr) {
return itr.first == key; };
232 : std::make_optional(itr->second);
239 size_t compute_time) {
243 if (cached_metric->getMemSize() != mem_size) {
248 cached_metric->incRefCount();
249 return cached_metric;
252 auto cache_metric = std::make_shared<CacheItemMetric>(key, compute_time, mem_size);
256 cache_metric->incRefCount();
257 return itr->second.emplace_back(std::move(cache_metric));
263 if (itr != cache_metrics.cend()) {
264 cache_metrics.erase(itr);
270 metrics.erase(metrics.begin(), metrics.begin() + offset);
274 size_t item_size)
const {
278 const auto current_cache_size = it->second;
280 return rem < 0 ? item_size : item_size - rem;
286 VLOG(1) <<
"Clear cache of " <<
item_type_ <<
" from device [" << kv.first
287 <<
"] (# cached items: " << cache_item_metrics.size() <<
", " << kv.second
298 size_t item_size)
const {
307 CHECK(current_cache_size.has_value());
308 auto cache_size_after_addition = *current_cache_size + item_size;
322 CHECK(current_cache_size.has_value());
327 CHECK_LE(size, *current_cache_size);
336 [](
const std::shared_ptr<CacheItemMetric>& left,
337 const std::shared_ptr<CacheItemMetric>& right) {
338 auto& elem1_metrics = left->getMetrics();
339 auto& elem2_metrics = right->getMetrics();
341 if (elem1_metrics[i] != elem2_metrics[i]) {
342 return elem1_metrics[i] < elem2_metrics[i];
350 std::ostringstream oss;
351 oss <<
"Current memory consumption of caches for each device:\n";
353 oss <<
"\t\tDevice " << kv.first <<
" : " << kv.second <<
" bytes\n";
361 if (new_total_cache_size > 0) {
366 if (new_max_cache_item_size > 0) {
385 template <
typename CACHED_ITEM_TYPE,
typename META_INFO_TYPE>
388 CACHED_ITEM_TYPE item,
389 std::shared_ptr<CacheItemMetric> item_metric_ptr,
390 std::optional<META_INFO_TYPE> metadata = std::nullopt)
412 template <
typename CACHED_ITEM_TYPE,
typename META_INFO_TYPE>
417 std::unordered_map<DeviceIdentifier, std::shared_ptr<CachedItemContainer>>;
419 std::unordered_map<CacheItemType, std::shared_ptr<PerDeviceCacheItemContainer>>;
423 size_t total_cache_size,
424 size_t max_item_size,
426 for (
auto& item_type : item_types) {
431 auto item_container = std::make_shared<PerDeviceCacheItemContainer>();
432 for (
int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
433 --gpu_device_identifier) {
434 item_container->emplace(gpu_device_identifier,
435 std::make_shared<CachedItemContainer>());
438 std::make_shared<CachedItemContainer>());
449 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
452 CACHED_ITEM_TYPE item_ptr,
457 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
464 std::unordered_set<QueryPlanHash>& key_set,
469 auto candidate_it = std::find_if(
473 return cached_item.key == key;
475 if (candidate_it != m.end()) {
476 candidate_it->setDirty();
481 auto candidate_it = std::find_if(
485 return cached_item.key == key;
487 return candidate_it != m.end() && candidate_it->isDirty();
490 virtual std::string
toString()
const = 0;
497 auto device_type_container_itr =
498 item_type_container_itr->second->find(device_identifier);
499 return device_type_container_itr != item_type_container_itr->second->end()
500 ? device_type_container_itr->second
506 std::optional<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>
511 std::lock_guard<std::mutex>& lock) {
512 auto candidate_it = std::find_if(
516 return cached_item.key == key;
518 if (candidate_it != m.end()) {
519 if (candidate_it->isDirty()) {
521 key, item_type, device_identifier, lock, candidate_it->meta_info);
524 return *candidate_it;
533 return container ? container->size() : 0;
540 return std::count_if(container->begin(),
542 [](
const auto& cached_item) {
return cached_item.isDirty(); });
549 return std::count_if(container->begin(),
551 [](
const auto& cached_item) {
return !cached_item.isDirty(); });
558 auto current_size_opt = metric_tracker.getCurrentCacheSize(device_identifier);
559 return current_size_opt ? current_size_opt.value() : 0;
567 return cache_metric_tracker.getCacheItemMetric(key, device_identifier);
571 if (new_total_cache_size > 0) {
578 if (new_max_cache_item_size > 0) {
593 container->erase(container->begin(), container->begin() + offset);
605 auto& left_metrics = left.
item_metric->getMetrics();
606 auto& right_metrics = right.item_metric->getMetrics();
608 if (left_metrics[i] != right_metrics[i]) {
609 return left_metrics[i] < right_metrics[i];
621 return metric_iter->second;
642 std::lock_guard<std::mutex>& lock,
643 std::optional<META_INFO_TYPE> meta_info = std::nullopt)
const = 0;
650 std::lock_guard<std::mutex>& lock,
651 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
657 size_t required_size,
658 std::lock_guard<std::mutex>& lock,
659 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
Defines data structures for the semantic analysis phase of query processing.
CACHED_ITEM_TYPE cached_item
std::mutex & getCacheLock() const
std::unordered_map< CacheItemType, std::shared_ptr< PerDeviceCacheItemContainer >> PerTypeCacheItemContainer
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
virtual std::string toString() const =0
size_t calculateRequiredSpaceForItemAddition(DeviceIdentifier device_identifier, size_t item_size) const
std::shared_ptr< CacheItemMetric > putNewCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier, size_t mem_size, size_t compute_time)
std::optional< CachedItem< CACHED_ITEM_TYPE, META_INFO_TYPE > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
size_t getCurrentNumCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
std::unordered_map< DeviceIdentifier, size_t > CacheSizeMap
DataRecycler(const std::vector< CacheItemType > &item_types, size_t total_cache_size, size_t max_item_size, int num_gpus)
std::vector< CachedItem< std::optional< HashType >, EMPTY_META_INFO >> CachedItemContainer
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
CacheMetricTracker const & getMetricTracker(CacheItemType item_type) const
CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
std::unordered_set< CacheItemType > cache_item_types_
DEVICE void sort(ARGS &&...args)
std::optional< size_t > getCurrentCacheSize(DeviceIdentifier key) const
size_t getCurrentNumCleanCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
void setMaxCacheItemSize(size_t new_max_cache_item_size)
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size)
CacheAvailability canAddItem(DeviceIdentifier device_identifier, size_t item_size) const
void clearCacheMetricTracker()
PerTypeCacheItemContainer const & getItemCache() const
PerTypeCacheMetricTracker metric_tracker_
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes)
size_t getMaxCacheItemSize() const
std::vector< std::shared_ptr< CacheItemMetric > > & getCacheItemMetrics(DeviceIdentifier device_identifier)
void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier)
std::optional< META_INFO_TYPE > meta_info
virtual void initCache()=0
size_t getTotalCacheSize() const
bool isCachedItemDirty(QueryPlanHash key, CachedItemContainer &m) const
std::shared_ptr< CacheItemMetric > getCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) const
std::string toString() const
PerTypeCacheItemContainer cached_items_container_
size_t max_cache_item_size_
CacheSizeMap current_cache_size_in_bytes_
void updateCurrentCacheSize(DeviceIdentifier device_identifier, CacheUpdateAction action, size_t size)
virtual void putItemToCache(QueryPlanHash key, CACHED_ITEM_TYPE item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::unordered_map< CacheItemType, CacheMetricTracker > PerTypeCacheMetricTracker
std::shared_ptr< CacheItemMetric > getCachedItemMetric(CacheItemType item_type, DeviceIdentifier device_identifier, QueryPlanHash key) const
std::string toString(const Executor::ExtModuleKinds &kind)
void setTotalCacheSize(size_t new_total_cache_size)
void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size)
static std::shared_ptr< CacheItemMetric > getCacheItemMetricImpl(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
size_t getCurrentNumDirtyCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
std::unordered_map< DeviceIdentifier, std::shared_ptr< CachedItemContainer >> PerDeviceCacheItemContainer
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
void sortCacheInfoByQueryMetric(DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
CacheMetricTracker(CacheItemType cache_item_type, size_t total_cache_size, size_t max_cache_item_size, int num_gpus=0)
virtual ~DataRecycler()=default
std::unordered_map< DeviceIdentifier, std::vector< std::shared_ptr< CacheItemMetric >>> CacheMetricInfoMap
static CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
CachedItem(QueryPlanHash hashed_plan, CACHED_ITEM_TYPE item, std::shared_ptr< CacheItemMetric > item_metric_ptr, std::optional< META_INFO_TYPE > metadata=std::nullopt)
virtual bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt) const =0
bool g_enable_watchdog false
CacheMetricInfoMap cache_metrics_
virtual void clearCache()=0
virtual void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
Basic constructors and methods of the row set interface.
virtual CACHED_ITEM_TYPE getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
virtual void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier)=0
void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset)
Execution unit for relational algebra. It's a low-level description of any relational algebra operati...
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
virtual void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::shared_ptr< CacheItemMetric > item_metric
std::array< size_t, CacheMetricType::NUM_METRIC_TYPE > metrics_