OmniSciDB  4201147b46
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetRecycler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ResultSetRecycler.h"
18 
19 extern bool g_is_test_env;
20 
22  CacheItemType item_type,
23  DeviceIdentifier device_identifier,
24  std::lock_guard<std::mutex>& lock,
25  std::optional<ResultSetMetaInfo> meta_info) const {
28  return false;
29  }
30  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
31  CHECK(resultset_cache);
32  auto candidate_resultset_it = std::find_if(
33  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
34  return cached_item.key == key;
35  });
36  return candidate_resultset_it != resultset_cache->end();
37 }
38 
40  std::lock_guard<std::mutex> lock(getCacheLock());
41  return hasItemInCache(key,
44  lock,
45  std::nullopt);
46 }
47 
49  QueryPlanHash key,
50  CacheItemType item_type,
51  DeviceIdentifier device_identifier,
52  std::optional<ResultSetMetaInfo> meta_info) {
55  return nullptr;
56  }
57  std::lock_guard<std::mutex> lock(getCacheLock());
58  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
59  CHECK(resultset_cache);
60  auto candidate_resultset_it = std::find_if(
61  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
62  return cached_item.key == key;
63  });
64  if (candidate_resultset_it != resultset_cache->end()) {
65  CHECK(candidate_resultset_it->meta_info);
66  if (candidate_resultset_it->isDirty()) {
68  key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
69  return nullptr;
70  }
71  auto candidate_resultset = candidate_resultset_it->cached_item;
72  decltype(std::chrono::steady_clock::now()) ts1, ts2;
73  ts1 = std::chrono::steady_clock::now();
74  // we need to copy cached resultset to support resultset recycler with concurrency
75  auto copied_rs = candidate_resultset->copy();
76  CHECK(copied_rs);
77  copied_rs->setCached(true);
78  copied_rs->initStatus();
79  candidate_resultset_it->item_metric->incRefCount();
80  ts2 = std::chrono::steady_clock::now();
81  VLOG(1) << "[" << item_type << ", "
82  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
83  << "] Get cached query resultset from cache (key: " << key
84  << ", copying it takes "
85  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
86  << "ms)";
87  return copied_rs;
88  }
89  return nullptr;
90 }
91 
93  QueryPlanHash key) {
96  return std::nullopt;
97  }
98  std::lock_guard<std::mutex> lock(getCacheLock());
101  CHECK(resultset_cache);
102  auto candidate_resultset_it = std::find_if(
103  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
104  return cached_item.key == key;
105  });
106  if (candidate_resultset_it != resultset_cache->end()) {
107  CHECK(candidate_resultset_it->meta_info);
108  if (candidate_resultset_it->isDirty()) {
112  lock,
113  candidate_resultset_it->meta_info);
114  return std::nullopt;
115  }
116  auto candidate_resultset = candidate_resultset_it->cached_item;
117  auto output_meta_info = candidate_resultset->getTargetMetaInfo();
118  return output_meta_info;
119  }
120  return std::nullopt;
121 }
122 
124  ResultSetPtr item_ptr,
125  CacheItemType item_type,
126  DeviceIdentifier device_identifier,
127  size_t item_size,
128  size_t compute_time,
129  std::optional<ResultSetMetaInfo> meta_info) {
131  key == EMPTY_HASHED_PLAN_DAG_KEY) {
132  return;
133  }
134  CHECK(meta_info.has_value());
135  std::lock_guard<std::mutex> lock(getCacheLock());
136  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
137  auto candidate_resultset_it = std::find_if(
138  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
139  return cached_item.key == key;
140  });
141  bool has_cached_resultset = false;
142  bool need_to_cleanup = false;
143  if (candidate_resultset_it != resultset_cache->end()) {
144  has_cached_resultset = true;
145  CHECK(candidate_resultset_it->meta_info);
146  if (candidate_resultset_it->isDirty()) {
147  need_to_cleanup = true;
148  } else if (candidate_resultset_it->cached_item->didOutputColumnar() !=
149  item_ptr->didOutputColumnar()) {
150  // we already have a cached resultset for the given query plan dag but
151  // requested resultset output layout and that of cached one is different
152  // so we remove the cached one and make a room for the resultset with different
153  // layout
154  need_to_cleanup = true;
155  VLOG(1) << "Failed to recycle query resultset: mismatched cached resultset layout";
156  }
157  }
158  if (need_to_cleanup) {
159  // remove dirty cached resultset
161  key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
162  has_cached_resultset = false;
163  }
164 
165  if (!has_cached_resultset) {
166  auto& metric_tracker = getMetricTracker(item_type);
167  auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
168  if (cache_status == CacheAvailability::UNAVAILABLE) {
169  LOG(INFO) << "Failed to keep a query resultset: the size of the resultset ("
170  << item_size << " bytes) exceeds the current system limit ("
172  return;
173  } else if (cache_status == CacheAvailability::AVAILABLE_AFTER_CLEANUP) {
174  auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
175  device_identifier, item_size);
176  CHECK_GT(required_size, 0UL);
177  LOG(INFO) << "Cleanup cached query resultset(s) to make a free space ("
178  << required_size << " bytes) to cache a new resultset";
179  cleanupCacheForInsertion(item_type, device_identifier, required_size, lock);
180  }
181  auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
182  key, device_identifier, item_size, compute_time);
183  CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
184  item_ptr->setCached(true);
185  item_ptr->initStatus();
186  VLOG(1) << "[" << item_type << ", "
187  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
188  << "] Put query resultset to cache (key: " << key << ")";
189  resultset_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
190  if (!meta_info->input_table_keys.empty()) {
191  addQueryPlanDagForTableKeys(key, meta_info->input_table_keys, lock);
192  }
193  }
194  return;
195 }
196 
198  CacheItemType item_type,
199  DeviceIdentifier device_identifier,
200  std::lock_guard<std::mutex>& lock,
201  std::optional<ResultSetMetaInfo> meta_info) {
203  key == EMPTY_HASHED_PLAN_DAG_KEY) {
204  return;
205  }
206  auto resultset_container = getCachedItemContainer(item_type, device_identifier);
207  auto filter = [key](auto const& item) { return item.key == key; };
208  auto itr =
209  std::find_if(resultset_container->cbegin(), resultset_container->cend(), filter);
210  if (itr == resultset_container->cend()) {
211  return;
212  } else {
213  itr->cached_item->invalidateResultSetChunks();
214  VLOG(1) << "[" << item_type << ", "
215  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
216  << "] Remove item from cache (key: " << key << ")";
217  resultset_container->erase(itr);
218  }
219  auto& cache_metrics = getMetricTracker(item_type);
220  auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
221  CHECK(cache_metric);
222  auto resultset_size = cache_metric->getMemSize();
223  cache_metrics.removeCacheItemMetric(key, device_identifier);
224  cache_metrics.updateCurrentCacheSize(
225  device_identifier, CacheUpdateAction::REMOVE, resultset_size);
226  return;
227 }
228 
230  CacheItemType item_type,
231  DeviceIdentifier device_identifier,
232  size_t required_size,
233  std::lock_guard<std::mutex>& lock,
234  std::optional<ResultSetMetaInfo> meta_info) {
235  int elimination_target_offset = 0;
236  size_t removed_size = 0;
237  auto& metric_tracker = getMetricTracker(item_type);
238  auto actual_space_to_free = required_size;
239  // cast total_cache_size to double for accurate calculation
240  double moderate_free_space =
241  static_cast<double>(metric_tracker.getTotalCacheSize()) / 2;
242  if (!g_is_test_env && required_size < moderate_free_space) {
243  // we try to make enough (and moderate) free space to avoid
244  // too frequent cache clearance
245  // we can expect that this strategy is likely to maintain cache hit ratio
246  // since elimination targets are selected based on cache metrics including
247  // # referenced (i.e., we try to keep frequently recycled items as long as we can)
248  actual_space_to_free = moderate_free_space;
249  }
250  metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
251  auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
252  sortCacheContainerByQueryMetric(item_type, device_identifier);
253  for (auto& metric : cached_item_metrics) {
254  auto target_size = metric->getMemSize();
255  ++elimination_target_offset;
256  removed_size += target_size;
257  if (removed_size > actual_space_to_free) {
258  break;
259  }
260  }
261 
262  removeCachedItemFromBeginning(item_type, device_identifier, elimination_target_offset);
263  metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
264 
265  metric_tracker.updateCurrentCacheSize(
266  device_identifier, CacheUpdateAction::REMOVE, removed_size);
267 }
268 
270  std::lock_guard<std::mutex> lock(getCacheLock());
271  for (auto& item_type : getCacheItemType()) {
273  auto item_cache = getItemCache().find(item_type)->second;
274  for (auto& kv : *item_cache) {
275  std::for_each(kv.second->begin(), kv.second->end(), [](const auto& container) {
276  container.cached_item->invalidateResultSetChunks();
277  });
278  VLOG(1) << "[" << item_type << ", "
281  << "] clear cache (# items: " << kv.second->size() << ")";
282  kv.second->clear();
283  }
284  }
286 }
287 
289  std::unordered_set<QueryPlanHash>& key_set,
290  CacheItemType item_type,
291  DeviceIdentifier device_identifier) {
292  if (!g_enable_data_recycler || !g_use_query_resultset_cache || key_set.empty()) {
293  return;
294  }
295  std::lock_guard<std::mutex> lock(getCacheLock());
296  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
297  for (auto key : key_set) {
298  removeItemFromCache(key, item_type, device_identifier, lock, std::nullopt);
299  }
301 }
302 
303 std::string ResultSetRecycler::toString() const {
304  std::ostringstream oss;
305  oss << "A current status of the query resultSet Recycler:\n";
306  for (auto& item_type : getCacheItemType()) {
307  oss << "\t" << item_type;
308  auto& metric_tracker = getMetricTracker(item_type);
309  oss << "\n\t# cached query resultsets:\n";
310  auto item_cache = getItemCache().find(item_type)->second;
311  for (auto& cache_container : *item_cache) {
312  oss << "\t\tDevice"
313  << DataRecyclerUtil::getDeviceIdentifierString(cache_container.first)
314  << ", # query resultsets: " << cache_container.second->size() << "\n";
315  for (auto& ht : *cache_container.second) {
316  oss << "\t\t\tHT] " << ht.item_metric->toString() << "\n";
317  }
318  }
319  oss << "\t" << metric_tracker.toString() << "\n";
320  }
321  return oss.str();
322 }
323 
324 std::tuple<QueryPlanHash, ResultSetPtr, std::optional<ResultSetMetaInfo>>
326  DeviceIdentifier device_identifier) {
327  std::lock_guard<std::mutex> lock(getCacheLock());
328  auto resultset_cache =
330  for (auto& rs : *resultset_cache) {
331  if (!visited.count(rs.key)) {
332  return std::make_tuple(rs.key, rs.cached_item, rs.meta_info);
333  }
334  }
335  return std::make_tuple(EMPTY_HASHED_PLAN_DAG_KEY, nullptr, std::nullopt);
336 }
337 
339  size_t hashed_query_plan_dag,
340  const std::unordered_set<size_t>& table_keys,
341  std::lock_guard<std::mutex>& lock) {
342  for (auto table_key : table_keys) {
343  auto itr = table_key_to_query_plan_dag_map_.try_emplace(table_key).first;
344  itr->second.insert(hashed_query_plan_dag);
345  }
346 }
347 
348 std::optional<std::unordered_set<size_t>>
350  std::lock_guard<std::mutex> lock(getCacheLock());
351  auto it = table_key_to_query_plan_dag_map_.find(table_key);
352  return it != table_key_to_query_plan_dag_map_.end() ? std::make_optional(it->second)
353  : std::nullopt;
354 }
355 
357  table_key_to_query_plan_dag_map_.erase(table_key);
358 }
359 
360 std::vector<std::shared_ptr<Analyzer::Expr>>& ResultSetRecycler::getTargetExprs(
361  QueryPlanHash key) const {
362  std::lock_guard<std::mutex> lock(getCacheLock());
365  CHECK(resultset_cache);
366  auto candidate_resultset_it = std::find_if(
367  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
368  return cached_item.key == key;
369  });
370  CHECK(candidate_resultset_it != resultset_cache->end());
371  CHECK(candidate_resultset_it->meta_info);
372  return candidate_resultset_it->meta_info->getTargetExprs();
373 }
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
#define CHECK_EQ(x, y)
Definition: Logger.h:230
size_t DeviceIdentifier
Definition: DataRecycler.h:129
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:138
std::vector< std::shared_ptr< Analyzer::Expr > > & getTargetExprs(QueryPlanHash key) const
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:654
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
std::optional< std::vector< TargetMetaInfo > > getOutputMetaInfo(QueryPlanHash key)
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys, std::lock_guard< std::mutex > &lock)
#define LOG(tag)
Definition: Logger.h:216
void clearCache() override
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
std::shared_ptr< ResultSet > ResultSetPtr
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:528
bool g_is_test_env
Definition: Execute.cpp:141
bool g_enable_data_recycler
Definition: Execute.cpp:146
#define CHECK_GT(x, y)
Definition: Logger.h:234
void clearCacheMetricTracker()
Definition: DataRecycler.h:319
PerTypeCacheItemContainer const & getItemCache() const
Definition: DataRecycler.h:668
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
CacheItemType
Definition: DataRecycler.h:38
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
bool g_enable_smem_group_by true
std::tuple< QueryPlanHash, ResultSetPtr, std::optional< ResultSetMetaInfo > > getCachedResultSetWithoutCacheKey(std::set< size_t > &visited, DeviceIdentifier device_identifier)
std::string toString() const override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:632
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:664
void putItemToCache(QueryPlanHash key, ResultSetPtr item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
size_t QueryPlanHash
#define CHECK(condition)
Definition: Logger.h:222
bool hasItemInCache(QueryPlanHash key)
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
virtual ResultSetPtr getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< ResultSetMetaInfo > meta_info=std::nullopt)=0
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:621
#define VLOG(n)
Definition: Logger.h:316
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)