OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetRecycler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ResultSetRecycler.h"
18 
19 extern bool g_is_test_env;
20 
22  CacheItemType item_type,
23  DeviceIdentifier device_identifier,
24  std::lock_guard<std::mutex>& lock,
25  std::optional<ResultSetMetaInfo> meta_info) const {
28  return false;
29  }
30  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
31  CHECK(resultset_cache);
32  auto candidate_resultset_it = std::find_if(
33  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
34  return cached_item.key == key;
35  });
36  return candidate_resultset_it != resultset_cache->end();
37 }
38 
40  std::lock_guard<std::mutex> lock(getCacheLock());
41  return hasItemInCache(key,
44  lock,
45  std::nullopt);
46 }
47 
49  QueryPlanHash key,
50  CacheItemType item_type,
51  DeviceIdentifier device_identifier,
52  std::optional<ResultSetMetaInfo> meta_info) {
55  return nullptr;
56  }
57  std::lock_guard<std::mutex> lock(getCacheLock());
58  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
59  CHECK(resultset_cache);
60  auto candidate_resultset_it = std::find_if(
61  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
62  return cached_item.key == key;
63  });
64  if (candidate_resultset_it != resultset_cache->end()) {
65  CHECK(candidate_resultset_it->meta_info);
66  if (candidate_resultset_it->isDirty()) {
68  key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
69  return nullptr;
70  }
71  auto candidate_resultset = candidate_resultset_it->cached_item;
72  decltype(std::chrono::steady_clock::now()) ts1, ts2;
73  ts1 = std::chrono::steady_clock::now();
74  // we need to copy cached resultset to support resultset recycler with concurrency
75  auto copied_rs = candidate_resultset->copy();
76  CHECK(copied_rs);
77  copied_rs->setCached(true);
78  copied_rs->initStatus();
79  candidate_resultset_it->item_metric->incRefCount();
80  ts2 = std::chrono::steady_clock::now();
81  VLOG(1) << "[" << item_type << ", "
82  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
83  << "] Get cached query resultset from cache (key: " << key
84  << ", copying it takes "
85  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
86  << "ms)";
87  return copied_rs;
88  }
89  return nullptr;
90 }
91 
93  QueryPlanHash key) {
96  return std::nullopt;
97  }
98  std::lock_guard<std::mutex> lock(getCacheLock());
101  CHECK(resultset_cache);
102  auto candidate_resultset_it = std::find_if(
103  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
104  return cached_item.key == key;
105  });
106  if (candidate_resultset_it != resultset_cache->end()) {
107  CHECK(candidate_resultset_it->meta_info);
108  if (candidate_resultset_it->isDirty()) {
112  lock,
113  candidate_resultset_it->meta_info);
114  return std::nullopt;
115  }
116  auto candidate_resultset = candidate_resultset_it->cached_item;
117  auto output_meta_info = candidate_resultset->getTargetMetaInfo();
118  return output_meta_info;
119  }
120  return std::nullopt;
121 }
122 
124  ResultSetPtr item_ptr,
125  CacheItemType item_type,
126  DeviceIdentifier device_identifier,
127  size_t item_size,
128  size_t compute_time,
129  std::optional<ResultSetMetaInfo> meta_info) {
131  key == EMPTY_HASHED_PLAN_DAG_KEY) {
132  return;
133  }
134  CHECK(meta_info.has_value());
135  std::lock_guard<std::mutex> lock(getCacheLock());
136  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
137  auto candidate_resultset_it = std::find_if(
138  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
139  return cached_item.key == key;
140  });
141  bool has_cached_resultset = false;
142  bool need_to_cleanup = false;
143  if (candidate_resultset_it != resultset_cache->end()) {
144  has_cached_resultset = true;
145  CHECK(candidate_resultset_it->meta_info);
146  if (candidate_resultset_it->isDirty()) {
147  need_to_cleanup = true;
148  } else if (candidate_resultset_it->cached_item->didOutputColumnar() !=
149  item_ptr->didOutputColumnar()) {
150  // we already have a cached resultset for the given query plan dag but
151  // requested resultset output layout and that of cached one is different
152  // so we remove the cached one and make a room for the resultset with different
153  // layout
154  need_to_cleanup = true;
155  VLOG(1) << "Failed to recycle query resultset: mismatched cached resultset layout";
156  }
157  }
158  if (need_to_cleanup) {
159  // remove dirty cached resultset
161  key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
162  has_cached_resultset = false;
163  }
164 
165  if (!has_cached_resultset) {
166  auto& metric_tracker = getMetricTracker(item_type);
167  if (metric_tracker.getTotalCacheSize() != g_query_resultset_cache_total_bytes) {
168  metric_tracker.setTotalCacheSize(g_query_resultset_cache_total_bytes);
169  }
170  if (metric_tracker.getMaxCacheItemSize() !=
172  metric_tracker.setMaxCacheItemSize(g_max_cacheable_query_resultset_size_bytes);
173  }
174  auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
175  if (cache_status == CacheAvailability::UNAVAILABLE) {
176  LOG(INFO) << "Failed to keep a query resultset: the size of the resultset ("
177  << item_size << " bytes) exceeds the current system limit ("
179  return;
180  } else if (cache_status == CacheAvailability::AVAILABLE_AFTER_CLEANUP) {
181  auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
182  device_identifier, item_size);
183  CHECK_GT(required_size, 0UL);
184  LOG(INFO) << "Cleanup cached query resultset(s) to make a free space ("
185  << required_size << " bytes) to cache a new resultset";
186  cleanupCacheForInsertion(item_type, device_identifier, required_size, lock);
187  }
188  auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
189  key, device_identifier, item_size, compute_time);
190  CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
191  item_ptr->setCached(true);
192  item_ptr->initStatus();
193  VLOG(1) << "[" << item_type << ", "
194  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
195  << "] Put query resultset to cache (key: " << key << ")";
196  resultset_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
197  if (!meta_info->input_table_keys.empty()) {
198  addQueryPlanDagForTableKeys(key, meta_info->input_table_keys, lock);
199  }
200  }
201  return;
202 }
203 
205  CacheItemType item_type,
206  DeviceIdentifier device_identifier,
207  std::lock_guard<std::mutex>& lock,
208  std::optional<ResultSetMetaInfo> meta_info) {
210  key == EMPTY_HASHED_PLAN_DAG_KEY) {
211  return;
212  }
213  auto resultset_container = getCachedItemContainer(item_type, device_identifier);
214  auto filter = [key](auto const& item) { return item.key == key; };
215  auto itr =
216  std::find_if(resultset_container->cbegin(), resultset_container->cend(), filter);
217  if (itr == resultset_container->cend()) {
218  return;
219  } else {
220  itr->cached_item->invalidateResultSetChunks();
221  VLOG(1) << "[" << item_type << ", "
222  << DataRecyclerUtil::getDeviceIdentifierString(device_identifier)
223  << "] Remove item from cache (key: " << key << ")";
224  resultset_container->erase(itr);
225  }
226  auto& cache_metrics = getMetricTracker(item_type);
227  auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
228  CHECK(cache_metric);
229  auto resultset_size = cache_metric->getMemSize();
230  cache_metrics.removeCacheItemMetric(key, device_identifier);
231  cache_metrics.updateCurrentCacheSize(
232  device_identifier, CacheUpdateAction::REMOVE, resultset_size);
233  return;
234 }
235 
237  CacheItemType item_type,
238  DeviceIdentifier device_identifier,
239  size_t required_size,
240  std::lock_guard<std::mutex>& lock,
241  std::optional<ResultSetMetaInfo> meta_info) {
242  int elimination_target_offset = 0;
243  size_t removed_size = 0;
244  auto& metric_tracker = getMetricTracker(item_type);
245  auto actual_space_to_free = required_size;
246  // cast total_cache_size to double for accurate calculation
247  double moderate_free_space =
248  static_cast<double>(metric_tracker.getTotalCacheSize()) / 2;
249  if (!g_is_test_env && required_size < moderate_free_space) {
250  // we try to make enough (and moderate) free space to avoid
251  // too frequent cache clearance
252  // we can expect that this strategy is likely to maintain cache hit ratio
253  // since elimination targets are selected based on cache metrics including
254  // # referenced (i.e., we try to keep frequently recycled items as long as we can)
255  actual_space_to_free = moderate_free_space;
256  }
257  metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
258  auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
259  sortCacheContainerByQueryMetric(item_type, device_identifier);
260  for (auto& metric : cached_item_metrics) {
261  auto target_size = metric->getMemSize();
262  ++elimination_target_offset;
263  removed_size += target_size;
264  if (removed_size > actual_space_to_free) {
265  break;
266  }
267  }
268 
269  removeCachedItemFromBeginning(item_type, device_identifier, elimination_target_offset);
270  metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
271 
272  metric_tracker.updateCurrentCacheSize(
273  device_identifier, CacheUpdateAction::REMOVE, removed_size);
274 }
275 
277  std::lock_guard<std::mutex> lock(getCacheLock());
278  for (auto& item_type : getCacheItemType()) {
280  auto item_cache = getItemCache().find(item_type)->second;
281  for (auto& kv : *item_cache) {
282  if (!kv.second->empty()) {
283  VLOG(1) << "[" << item_type << ", "
286  << "] clear cache (# items: " << kv.second->size() << ")";
287  kv.second->clear();
288  }
289  }
290  }
292 }
293 
295  std::unordered_set<QueryPlanHash>& key_set,
296  CacheItemType item_type,
297  DeviceIdentifier device_identifier) {
298  if (!g_enable_data_recycler || !g_use_query_resultset_cache || key_set.empty()) {
299  return;
300  }
301  std::lock_guard<std::mutex> lock(getCacheLock());
302  auto resultset_cache = getCachedItemContainer(item_type, device_identifier);
303  for (auto key : key_set) {
304  removeItemFromCache(key, item_type, device_identifier, lock, std::nullopt);
305  }
307 }
308 
309 std::string ResultSetRecycler::toString() const {
310  std::ostringstream oss;
311  oss << "A current status of the query resultSet Recycler:\n";
312  for (auto& item_type : getCacheItemType()) {
313  oss << "\t" << item_type;
314  auto& metric_tracker = getMetricTracker(item_type);
315  oss << "\n\t# cached query resultsets:\n";
316  auto item_cache = getItemCache().find(item_type)->second;
317  for (auto& cache_container : *item_cache) {
318  oss << "\t\tDevice"
319  << DataRecyclerUtil::getDeviceIdentifierString(cache_container.first)
320  << ", # query resultsets: " << cache_container.second->size() << "\n";
321  for (auto& ht : *cache_container.second) {
322  oss << "\t\t\tHT] " << ht.item_metric->toString() << "\n";
323  }
324  }
325  oss << "\t" << metric_tracker.toString() << "\n";
326  }
327  return oss.str();
328 }
329 
330 std::tuple<QueryPlanHash, ResultSetPtr, std::optional<ResultSetMetaInfo>>
332  DeviceIdentifier device_identifier) {
333  std::lock_guard<std::mutex> lock(getCacheLock());
334  auto resultset_cache =
336  for (auto& rs : *resultset_cache) {
337  if (!visited.count(rs.key)) {
338  return std::make_tuple(rs.key, rs.cached_item, rs.meta_info);
339  }
340  }
341  return std::make_tuple(EMPTY_HASHED_PLAN_DAG_KEY, nullptr, std::nullopt);
342 }
343 
345  size_t hashed_query_plan_dag,
346  const std::unordered_set<size_t>& table_keys,
347  std::lock_guard<std::mutex>& lock) {
348  for (auto table_key : table_keys) {
349  auto itr = table_key_to_query_plan_dag_map_.try_emplace(table_key).first;
350  itr->second.insert(hashed_query_plan_dag);
351  }
352 }
353 
354 std::optional<std::unordered_set<size_t>>
356  std::lock_guard<std::mutex> lock(getCacheLock());
357  auto it = table_key_to_query_plan_dag_map_.find(table_key);
358  return it != table_key_to_query_plan_dag_map_.end() ? std::make_optional(it->second)
359  : std::nullopt;
360 }
361 
363  table_key_to_query_plan_dag_map_.erase(table_key);
364 }
365 
366 std::vector<std::shared_ptr<Analyzer::Expr>>& ResultSetRecycler::getTargetExprs(
367  QueryPlanHash key) const {
368  std::lock_guard<std::mutex> lock(getCacheLock());
371  CHECK(resultset_cache);
372  auto candidate_resultset_it = std::find_if(
373  resultset_cache->begin(), resultset_cache->end(), [&key](const auto& cached_item) {
374  return cached_item.key == key;
375  });
376  CHECK(candidate_resultset_it != resultset_cache->end());
377  CHECK(candidate_resultset_it->meta_info);
378  return candidate_resultset_it->meta_info->getTargetExprs();
379 }
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t DeviceIdentifier
Definition: DataRecycler.h:129
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:138
std::vector< std::shared_ptr< Analyzer::Expr > > & getTargetExprs(QueryPlanHash key) const
bool g_use_query_resultset_cache
Definition: Execute.cpp:156
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:654
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
std::optional< std::vector< TargetMetaInfo > > getOutputMetaInfo(QueryPlanHash key)
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys, std::lock_guard< std::mutex > &lock)
#define LOG(tag)
Definition: Logger.h:285
void clearCache() override
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
std::shared_ptr< ResultSet > ResultSetPtr
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:528
bool g_enable_data_recycler
Definition: Execute.cpp:154
#define CHECK_GT(x, y)
Definition: Logger.h:305
void clearCacheMetricTracker()
Definition: DataRecycler.h:317
PerTypeCacheItemContainer const & getItemCache() const
Definition: DataRecycler.h:668
bool g_is_test_env
Definition: Execute.cpp:149
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
CacheItemType
Definition: DataRecycler.h:38
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
bool g_enable_smem_group_by true
std::tuple< QueryPlanHash, ResultSetPtr, std::optional< ResultSetMetaInfo > > getCachedResultSetWithoutCacheKey(std::set< size_t > &visited, DeviceIdentifier device_identifier)
std::string toString() const override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:632
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:664
void putItemToCache(QueryPlanHash key, ResultSetPtr item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
size_t QueryPlanHash
#define CHECK(condition)
Definition: Logger.h:291
bool hasItemInCache(QueryPlanHash key)
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
virtual ResultSetPtr getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< ResultSetMetaInfo > meta_info=std::nullopt)=0
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:621
#define VLOG(n)
Definition: Logger.h:388
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)