OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SpeculativeTopN.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "SpeculativeTopN.h"
18 
19 #include "Logger/Logger.h"
20 #include "RelAlgExecutor.h"
21 #include "ResultSet.h"
22 
24 
26  const std::vector<Analyzer::Expr*>& target_exprs,
27  const size_t truncate_n)
28  : unknown_(0) {
29  CHECK_EQ(rows.colCount(), target_exprs.size());
30  const bool count_first = dynamic_cast<const Analyzer::AggExpr*>(target_exprs[0]);
31  for (size_t i = 0; i < truncate_n + 1; ++i) {
32  const auto crt_row = rows.getNextRow(false, false);
33  if (crt_row.empty()) {
34  break;
35  }
36  int64_t key{0};
37  size_t val{0};
38  CHECK_EQ(rows.colCount(), crt_row.size());
39  {
40  auto scalar_r = boost::get<ScalarTargetValue>(&crt_row[0]);
41  CHECK(scalar_r);
42  auto p = boost::get<int64_t>(scalar_r);
43  CHECK(p);
44  if (count_first) {
45  val = *p;
46  } else {
47  key = *p;
48  }
49  }
50  {
51  auto scalar_r = boost::get<ScalarTargetValue>(&crt_row[1]);
52  CHECK(scalar_r);
53  auto p = boost::get<int64_t>(scalar_r);
54  CHECK(p);
55  if (count_first) {
56  key = *p;
57  } else {
58  val = *p;
59  }
60  }
61  if (i < truncate_n) {
62  const auto it_ok = map_.emplace(key, SpeculativeTopNVal{val, false});
63  CHECK(it_ok.second);
64  } else {
65  unknown_ = val;
66  }
67  }
68 }
69 
71  for (auto& kv : map_) {
72  auto& this_entry = kv.second;
73  const auto that_it = that.map_.find(kv.first);
74  if (that_it != that.map_.end()) {
75  const auto& that_entry = that_it->second;
76  CHECK(!that_entry.unknown);
77  this_entry.val += that_entry.val;
78  that.map_.erase(that_it);
79  } else {
80  this_entry.val += that.unknown_;
81  this_entry.unknown = that.unknown_;
82  }
83  }
84  for (const auto& kv : that.map_) {
85  const auto it_ok = map_.emplace(
86  kv.first, SpeculativeTopNVal{kv.second.val + unknown_, unknown_ != 0});
87  CHECK(it_ok.second);
88  }
89  unknown_ += that.unknown_;
90 }
91 
92 std::shared_ptr<ResultSet> SpeculativeTopNMap::asRows(
93  const RelAlgExecutionUnit& ra_exe_unit,
94  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
96  const Executor* executor,
97  const size_t top_n,
98  const bool desc) const {
99  std::vector<SpeculativeTopNEntry> vec;
100  for (const auto& kv : map_) {
101  vec.emplace_back(SpeculativeTopNEntry{kv.first, kv.second.val, kv.second.unknown});
102  }
103  if (desc) {
104  std::sort(vec.begin(), vec.end(), std::greater<SpeculativeTopNEntry>());
105  } else {
106  std::sort(vec.begin(), vec.end());
107  }
108  const auto num_rows = std::min(top_n, vec.size());
109  for (size_t i = 0; i < num_rows; ++i) {
110  if (vec[i].unknown) {
111  throw SpeculativeTopNFailed();
112  }
113  }
114  CHECK_EQ(size_t(2), ra_exe_unit.target_exprs.size());
115 
116  // Top N key-value pairs are stored into a new ResultSet with a new
117  // QueryMemoryDescriptor to be passed over. We use row-wise GroupByBaselineHash, as it
118  // is the most flexible layout for dealing with key-value pairs in the storage (for
119  // iterations and reduction).
120  auto query_mem_desc_rs = query_mem_desc;
121  query_mem_desc_rs.setQueryDescriptionType(QueryDescriptionType::GroupByBaselineHash);
122  query_mem_desc_rs.setOutputColumnar(false);
123  query_mem_desc_rs.setEntryCount(num_rows);
124  query_mem_desc_rs.clearSlotInfo();
125  query_mem_desc_rs.addColSlotInfo({std::make_tuple(8, 8)});
126  query_mem_desc_rs.addColSlotInfo({std::make_tuple(8, 8)});
127  query_mem_desc_rs.setAllTargetGroupbyIndices({-1, -1});
128 
129  auto rs = std::make_shared<ResultSet>(
130  target_exprs_to_infos(ra_exe_unit.target_exprs, query_mem_desc_rs),
132  query_mem_desc_rs,
133  row_set_mem_owner,
134  executor->blockSize(),
135  executor->gridSize());
136  auto rs_storage = rs->allocateStorage();
137  auto rs_buff = reinterpret_cast<int64_t*>(rs_storage->getUnderlyingBuffer());
138  const bool count_first =
139  dynamic_cast<const Analyzer::AggExpr*>(ra_exe_unit.target_exprs[0]);
140 
141  // going throug the TopN results, and properly storing them into the GroupByBaselineHash
142  // layout (including the group column (key) and two agg columns (key and value)) to
143  // imitate the regular Group By query's result.
144  for (size_t i = 0; i < num_rows; ++i) {
145  rs_buff[0] = vec[i].key;
146  int64_t col0 = vec[i].key;
147  int64_t col1 = vec[i].val;
148  if (count_first) {
149  std::swap(col0, col1);
150  }
151  rs_buff[1] = col0;
152  rs_buff[2] = col1;
153  rs_buff += 3;
154  }
155  return rs;
156 }
157 
158 void SpeculativeTopNBlacklist::add(const std::shared_ptr<Analyzer::Expr> expr,
159  const bool desc) {
160  std::lock_guard<std::mutex> lock(mutex_);
161  for (const auto& e : blacklist_) {
162  CHECK(!(*e.first == *expr) || e.second != desc);
163  }
164  blacklist_.emplace_back(expr, desc);
165 }
166 
167 bool SpeculativeTopNBlacklist::contains(const std::shared_ptr<Analyzer::Expr> expr,
168  const bool desc) const {
169  std::lock_guard<std::mutex> lock(mutex_);
170  for (const auto& e : blacklist_) {
171  if (*e.first == *expr && e.second == desc) {
172  return true;
173  }
174  }
175  return false;
176 }
177 
190  if (g_cluster) {
191  return false;
192  }
193  if (ra_exe_unit.target_exprs.size() != 2) {
194  return false;
195  }
196  for (const auto target_expr : ra_exe_unit.target_exprs) {
197  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
198  if (agg_expr && !shared::is_any<kCOUNT, kCOUNT_IF>(agg_expr->get_aggtype())) {
199  return false;
200  }
201  }
202  return query_mem_desc.sortOnGpu() && ra_exe_unit.sort_info.limit &&
204 }
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void reduce(SpeculativeTopNMap &that)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
SortAlgorithm algorithm
tuple rows
Definition: report.py:114
size_t val
std::optional< size_t > limit
Speculative top N algorithm.
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
#define CHECK(condition)
Definition: Logger.h:291
std::vector< std::pair< std::shared_ptr< Analyzer::Expr >, bool > > blacklist_
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
Basic constructors and methods of the row set interface.
std::unordered_map< int64_t, SpeculativeTopNVal > map_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114