OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StreamingTopN.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "StreamingTopN.h"
17 #include "RelAlgExecutor.h"
18 #include "Shared/checked_alloc.h"
19 #include "TopKSort.h"
20 
21 namespace streaming_top_n {
22 
23 size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count) {
24  const auto row_size_quad = row_size / sizeof(int64_t);
25  return (1 + n + row_size_quad * n) * thread_count * sizeof(int64_t);
26 }
27 
28 size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count) {
29  return (1 + n) * thread_count * sizeof(int64_t);
30 }
31 
32 std::vector<int8_t> get_rows_copy_from_heaps(const int64_t* heaps,
33  const size_t heaps_size,
34  const size_t n,
35  const size_t thread_count) {
36  const auto rows_offset = streaming_top_n::get_rows_offset_of_heaps(n, thread_count);
37  const auto row_buff_size = heaps_size - rows_offset;
38  std::vector<int8_t> rows_copy(row_buff_size);
39  const auto rows_ptr = reinterpret_cast<const int8_t*>(heaps) + rows_offset;
40  std::memcpy(&rows_copy[0], rows_ptr, row_buff_size);
41  return rows_copy;
42 }
43 
44 } // namespace streaming_top_n
45 
46 bool use_streaming_top_n(const RelAlgExecutionUnit& ra_exe_unit,
47  const bool output_columnar) {
48  if (g_cluster) {
49  return false; // TODO(miyu)
50  }
51 
52  for (const auto target_expr : ra_exe_unit.target_exprs) {
53  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
54  return false;
55  }
56  if (dynamic_cast<const Analyzer::WindowFunction*>(target_expr)) {
57  return false;
58  }
59  }
60 
61  // TODO: Allow streaming top n for columnar output
62  if (!output_columnar && ra_exe_unit.sort_info.order_entries.size() == 1 &&
63  ra_exe_unit.sort_info.limit &&
65  const auto only_order_entry = ra_exe_unit.sort_info.order_entries.front();
66  CHECK_GT(only_order_entry.tle_no, int(0));
67  CHECK_LE(static_cast<size_t>(only_order_entry.tle_no),
68  ra_exe_unit.target_exprs.size());
69  const auto order_entry_expr = ra_exe_unit.target_exprs[only_order_entry.tle_no - 1];
70  const auto n = ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit;
71  if ((order_entry_expr->get_type_info().is_number() ||
72  order_entry_expr->get_type_info().is_time()) &&
73  n <= 100000) { // TODO(miyu): relax?
74  return true;
75  }
76  }
77 
78  return false;
79 }
80 
81 size_t get_heap_key_slot_index(const std::vector<Analyzer::Expr*>& target_exprs,
82  const size_t target_idx) {
83  size_t slot_idx = 0;
84  for (size_t i = 0; i < target_idx; ++i) {
85  auto agg_info = get_target_info(target_exprs[i], g_bigint_count);
86  slot_idx = advance_slot(slot_idx, agg_info, false);
87  }
88  return slot_idx;
89 }
90 
91 #ifdef HAVE_CUDA
92 std::vector<int8_t> pick_top_n_rows_from_dev_heaps(
93  Data_Namespace::DataMgr* data_mgr,
94  const int64_t* dev_heaps_buffer,
95  const RelAlgExecutionUnit& ra_exe_unit,
97  const size_t thread_count,
98  const int device_id) {
99  CHECK(!query_mem_desc.canOutputColumnar());
100  CHECK_EQ(ra_exe_unit.sort_info.order_entries.size(), size_t(1));
101  const auto& only_oe = ra_exe_unit.sort_info.order_entries.back();
102  const auto oe_col_idx = only_oe.tle_no - 1;
103  const auto n = ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit;
104  const auto group_key_bytes = query_mem_desc.getEffectiveKeyWidth();
105  const PodOrderEntry pod_oe{only_oe.tle_no, only_oe.is_desc, only_oe.nulls_first};
106  const auto key_slot_idx = get_heap_key_slot_index(ra_exe_unit.target_exprs, oe_col_idx);
107  GroupByBufferLayoutInfo oe_layout{
108  n * thread_count,
109  query_mem_desc.getColOffInBytes(key_slot_idx),
110  static_cast<size_t>(query_mem_desc.getPaddedSlotWidthBytes(oe_col_idx)),
111  query_mem_desc.getRowSize(),
112  get_target_info(ra_exe_unit.target_exprs[oe_col_idx], g_bigint_count),
113  -1};
115  data_mgr,
116  dev_heaps_buffer,
117  query_mem_desc.getBufferSizeBytes(
118  ra_exe_unit, thread_count, ExecutorDeviceType::GPU),
119  n,
120  pod_oe,
121  oe_layout,
122  group_key_bytes,
123  thread_count,
124  device_id);
125 }
126 #endif // HAVE_CUDA
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:198
size_t getBufferSizeBytes(const RelAlgExecutionUnit &ra_exe_unit, const unsigned thread_count, const ExecutorDeviceType device_type) const
bool use_streaming_top_n(const RelAlgExecutionUnit &ra_exe_unit, const bool output_columnar)
bool g_cluster
Streaming Top N algorithm.
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:65
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
const std::list< Analyzer::OrderEntry > order_entries
const SortAlgorithm algorithm
size_t getEffectiveKeyWidth() const
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
Definition: TopKSort.cu:281
size_t get_heap_key_slot_index(const std::vector< Analyzer::Expr * > &target_exprs, const size_t target_idx)
#define CHECK_GT(x, y)
Definition: Logger.h:202
const size_t limit
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
CHECK(cgen_state)
const SortInfo sort_info
bool g_bigint_count
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int tle_no
#define CHECK_LE(x, y)
Definition: Logger.h:201
std::vector< int8_t > get_rows_copy_from_heaps(const int64_t *heaps, const size_t heaps_size, const size_t n, const size_t thread_count)
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const size_t offset
size_t getColOffInBytes(const size_t col_idx) const