OmniSciDB  62f1aeabe6
SpeculativeTopN.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "SpeculativeTopN.h"
18 
19 #include "RelAlgExecutor.h"
20 #include "ResultSet.h"
21 #include "Shared/Logger.h"
22 
24 
26  const std::vector<Analyzer::Expr*>& target_exprs,
27  const size_t truncate_n)
28  : unknown_(0) {
29  CHECK_EQ(rows.colCount(), target_exprs.size());
30  const bool count_first = dynamic_cast<const Analyzer::AggExpr*>(target_exprs[0]);
31  for (size_t i = 0; i < truncate_n + 1; ++i) {
32  const auto crt_row = rows.getNextRow(false, false);
33  if (crt_row.empty()) {
34  break;
35  }
36  int64_t key{0};
37  size_t val{0};
38  CHECK_EQ(rows.colCount(), crt_row.size());
39  {
40  auto scalar_r = boost::get<ScalarTargetValue>(&crt_row[0]);
41  CHECK(scalar_r);
42  auto p = boost::get<int64_t>(scalar_r);
43  CHECK(p);
44  if (count_first) {
45  val = *p;
46  } else {
47  key = *p;
48  }
49  }
50  {
51  auto scalar_r = boost::get<ScalarTargetValue>(&crt_row[1]);
52  CHECK(scalar_r);
53  auto p = boost::get<int64_t>(scalar_r);
54  CHECK(p);
55  if (count_first) {
56  key = *p;
57  } else {
58  val = *p;
59  }
60  }
61  if (i < truncate_n) {
62  const auto it_ok = map_.emplace(key, SpeculativeTopNVal{val, false});
63  CHECK(it_ok.second);
64  } else {
65  unknown_ = val;
66  }
67  }
68 }
69 
71  for (auto& kv : map_) {
72  auto& this_entry = kv.second;
73  const auto that_it = that.map_.find(kv.first);
74  if (that_it != that.map_.end()) {
75  const auto& that_entry = that_it->second;
76  CHECK(!that_entry.unknown);
77  this_entry.val += that_entry.val;
78  that.map_.erase(that_it);
79  } else {
80  this_entry.val += that.unknown_;
81  this_entry.unknown = that.unknown_;
82  }
83  }
84  for (const auto& kv : that.map_) {
85  const auto it_ok = map_.emplace(
86  kv.first, SpeculativeTopNVal{kv.second.val + unknown_, unknown_ != 0});
87  CHECK(it_ok.second);
88  }
89  unknown_ += that.unknown_;
90 }
91 
92 std::shared_ptr<ResultSet> SpeculativeTopNMap::asRows(
93  const RelAlgExecutionUnit& ra_exe_unit,
94  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
95  const QueryMemoryDescriptor& query_mem_desc,
96  const Executor* executor,
97  const size_t top_n,
98  const bool desc) const {
99  std::vector<SpeculativeTopNEntry> vec;
100  for (const auto& kv : map_) {
101  vec.emplace_back(SpeculativeTopNEntry{kv.first, kv.second.val, kv.second.unknown});
102  }
103  if (desc) {
104  std::sort(vec.begin(), vec.end(), std::greater<SpeculativeTopNEntry>());
105  } else {
106  std::sort(vec.begin(), vec.end());
107  }
108  const auto num_rows = std::min(top_n, vec.size());
109  for (size_t i = 0; i < num_rows; ++i) {
110  if (vec[i].unknown) {
111  throw SpeculativeTopNFailed();
112  }
113  }
114  CHECK_EQ(size_t(2), ra_exe_unit.target_exprs.size());
115 
116  // Top N key-value pairs are stored into a new ResultSet with a new
117  // QueryMemoryDescriptor to be passed over. We use row-wise GroupByBaselineHash, as it
118  // is the most flexible layout for dealing with key-value pairs in the storage (for
119  // iterations and reduction).
120  auto query_mem_desc_rs = query_mem_desc;
122  query_mem_desc_rs.setOutputColumnar(false);
123  query_mem_desc_rs.setEntryCount(num_rows);
124  query_mem_desc_rs.clearSlotInfo();
125  query_mem_desc_rs.addColSlotInfo({std::make_tuple(8, 8)});
126  query_mem_desc_rs.addColSlotInfo({std::make_tuple(8, 8)});
127  query_mem_desc_rs.setAllTargetGroupbyIndices({-1, -1});
128 
129  auto rs = std::make_shared<ResultSet>(
130  target_exprs_to_infos(ra_exe_unit.target_exprs, query_mem_desc_rs),
132  query_mem_desc_rs,
133  row_set_mem_owner,
134  executor);
135  auto rs_storage = rs->allocateStorage();
136  auto rs_buff = reinterpret_cast<int64_t*>(rs_storage->getUnderlyingBuffer());
137  const bool count_first =
138  dynamic_cast<const Analyzer::AggExpr*>(ra_exe_unit.target_exprs[0]);
139 
140  // going throug the TopN results, and properly storing them into the GroupByBaselineHash
141  // layout (including the group column (key) and two agg columns (key and value)) to
142  // imitate the regular Group By query's result.
143  for (size_t i = 0; i < num_rows; ++i) {
144  rs_buff[0] = vec[i].key;
145  int64_t col0 = vec[i].key;
146  int64_t col1 = vec[i].val;
147  if (count_first) {
148  std::swap(col0, col1);
149  }
150  rs_buff[1] = col0;
151  rs_buff[2] = col1;
152  rs_buff += 3;
153  }
154  return rs;
155 }
156 
157 void SpeculativeTopNBlacklist::add(const std::shared_ptr<Analyzer::Expr> expr,
158  const bool desc) {
159  std::lock_guard<std::mutex> lock(mutex_);
160  for (const auto& e : blacklist_) {
161  CHECK(!(*e.first == *expr) || e.second != desc);
162  }
163  blacklist_.emplace_back(expr, desc);
164 }
165 
166 bool SpeculativeTopNBlacklist::contains(const std::shared_ptr<Analyzer::Expr> expr,
167  const bool desc) const {
168  std::lock_guard<std::mutex> lock(mutex_);
169  for (const auto& e : blacklist_) {
170  if (*e.first == *expr && e.second == desc) {
171  return true;
172  }
173  }
174  return false;
175 }
176 
188  const QueryMemoryDescriptor& query_mem_desc) {
189  if (g_cluster) {
190  return false;
191  }
192  if (ra_exe_unit.target_exprs.size() != 2) {
193  return false;
194  }
195  for (const auto target_expr : ra_exe_unit.target_exprs) {
196  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
197  if (agg_expr && agg_expr->get_aggtype() != kCOUNT) {
198  return false;
199  }
200  }
201  return query_mem_desc.sortOnGpu() && ra_exe_unit.sort_info.limit &&
203 }
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void reduce(SpeculativeTopNMap &that)
const int8_t const int64_t * num_rows
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
void setQueryDescriptionType(const QueryDescriptionType val)
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
const size_t limit
size_t val
const SortInfo sort_info
Speculative top N algorithm.
Definition: sqldefs.h:76
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr *> &targets, const QueryMemoryDescriptor &query_mem_desc)
#define CHECK(condition)
Definition: Logger.h:197
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
Basic constructors and methods of the row set interface.
std::unordered_map< int64_t, SpeculativeTopNVal > map_