OmniSciDB  04ee39c94c
QueryFragmentDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <DataMgr/DataMgr.h>
20 #include "../Execute.h"
21 
23  const RelAlgExecutionUnit& ra_exe_unit,
24  const std::vector<InputTableInfo>& query_infos,
25  const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
26  const double gpu_input_mem_limit_percent)
27  : gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
28  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
29  CHECK_EQ(query_infos.size(), input_desc_count);
30  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
31  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
32  if (!selected_tables_fragments_.count(table_id)) {
33  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
34  &query_infos[table_idx].info.fragments;
35  }
36  }
37 
38  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
39  const auto& gpu_mem_info = gpu_mem_infos[device_id];
40  available_gpu_mem_bytes_[device_id] =
41  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
42  }
43 }
44 
46  std::map<int, const TableFragments*>& all_tables_fragments,
47  const RelAlgExecutionUnit& ra_exe_unit,
48  const std::vector<InputTableInfo>& query_infos) {
49  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
50  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
51  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
52  const auto& fragments = query_infos[tab_idx].info.fragments;
53  if (!all_tables_fragments.count(table_id)) {
54  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
55  }
56  }
57 }
58 
60  const RelAlgExecutionUnit& ra_exe_unit,
61  const std::vector<uint64_t>& frag_offsets,
62  const int device_count,
63  const ExecutorDeviceType& device_type,
64  const bool enable_multifrag_kernels,
65  const bool enable_inner_join_fragment_skipping,
66  Executor* executor) {
67  if (enable_multifrag_kernels) {
68  buildMultifragKernelMap(ra_exe_unit,
69  frag_offsets,
70  device_count,
71  device_type,
72  enable_inner_join_fragment_skipping,
73  executor);
74  } else {
76  ra_exe_unit, frag_offsets, device_count, device_type, executor);
77  }
78 }
79 
81  const RelAlgExecutionUnit& ra_exe_unit,
82  const std::vector<uint64_t>& frag_offsets,
83  const int device_count,
84  const ExecutorDeviceType& device_type,
85  Executor* executor) {
86  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
87  const int outer_table_id = outer_table_desc.getTableId();
88  auto it = selected_tables_fragments_.find(outer_table_id);
89  CHECK(it != selected_tables_fragments_.end());
90  const auto outer_fragments = it->second;
91  outer_fragments_size_ = outer_fragments->size();
92 
93  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
94 
95  for (size_t i = 0; i < outer_fragments->size(); ++i) {
96  const auto& fragment = (*outer_fragments)[i];
97  const auto skip_frag = executor->skipFragment(
98  outer_table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
99  if (skip_frag.first) {
100  continue;
101  }
102  // NOTE: Using kernel index instead of frag index now
103  outer_fragment_tuple_sizes_.push_back(fragment.getNumTuples());
104  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
105  const int chosen_device_count =
106  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
107  CHECK_GT(chosen_device_count, 0);
108  const auto memory_level = device_type == ExecutorDeviceType::GPU
111  int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
112  ? fragment.deviceIds[static_cast<int>(memory_level)]
113  : fragment.shard % chosen_device_count;
114 
115  if (device_type == ExecutorDeviceType::GPU) {
116  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
117  }
118  // Since we may have skipped fragments, the fragments_per_kernel_ vector may be
119  // smaller than the outer_fragments size
120  CHECK_LE(fragments_per_kernel_.size(), i);
121  fragments_per_kernel_.emplace_back(FragmentsList{});
122  const auto kernel_id = fragments_per_kernel_.size() - 1;
123  CHECK(kernels_per_device_[device_id].insert(kernel_id).second);
124 
125  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
126  const auto frag_ids =
127  executor->getTableFragmentIndices(ra_exe_unit,
128  device_type,
129  j,
130  i,
132  executor->getInnerTabIdToJoinCond());
133  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
134  auto table_frags_it = selected_tables_fragments_.find(table_id);
135  CHECK(table_frags_it != selected_tables_fragments_.end());
136 
137  fragments_per_kernel_[kernel_id].emplace_back(
138  FragmentsPerTable{table_id, frag_ids});
139  }
140  }
141 }
142 
144  const RelAlgExecutionUnit& ra_exe_unit,
145  const std::vector<uint64_t>& frag_offsets,
146  const int device_count,
147  const ExecutorDeviceType& device_type,
148  const bool enable_inner_join_fragment_skipping,
149  Executor* executor) {
150  // Allocate all the fragments of the tables involved in the query to available
151  // devices. The basic idea: the device is decided by the outer table in the
152  // query (the first table in a join) and we need to broadcast the fragments
153  // in the inner table to each device. Sharding will change this model.
154  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
155  const int outer_table_id = outer_table_desc.getTableId();
156  auto it = selected_tables_fragments_.find(outer_table_id);
157  CHECK(it != selected_tables_fragments_.end());
158  const auto outer_fragments = it->second;
159  outer_fragments_size_ = outer_fragments->size();
160 
161  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
162  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
163 
164  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
165  ++outer_frag_id) {
166  const auto& fragment = (*outer_fragments)[outer_frag_id];
167  auto skip_frag = executor->skipFragment(outer_table_desc,
168  fragment,
169  ra_exe_unit.simple_quals,
170  frag_offsets,
171  outer_frag_id);
172  if (enable_inner_join_fragment_skipping &&
173  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
174  skip_frag = executor->skipFragmentInnerJoins(
175  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
176  }
177  if (skip_frag.first) {
178  continue;
179  }
180  const int device_id =
181  fragment.shard == -1
182  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
183  : fragment.shard % device_count;
184  if (device_type == ExecutorDeviceType::GPU) {
185  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
186  }
187  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
188  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
189  auto table_frags_it = selected_tables_fragments_.find(table_id);
190  CHECK(table_frags_it != selected_tables_fragments_.end());
191  const auto frag_ids =
192  executor->getTableFragmentIndices(ra_exe_unit,
193  device_type,
194  j,
195  outer_frag_id,
197  inner_table_id_to_join_condition);
198 
199  if (kernels_per_device_.find(device_id) == kernels_per_device_.end()) {
200  fragments_per_kernel_.emplace_back(FragmentsList{});
201  kernels_per_device_.insert(std::make_pair(
202  device_id, std::set<size_t>({fragments_per_kernel_.size() - 1})));
203  }
204  const auto kernel_id = *kernels_per_device_[device_id].begin();
205  CHECK_LT(kernel_id, fragments_per_kernel_.size());
206  if (fragments_per_kernel_[kernel_id].size() < j + 1) {
207  fragments_per_kernel_[kernel_id].emplace_back(
208  FragmentsPerTable{table_id, frag_ids});
209  } else {
210  CHECK_EQ(fragments_per_kernel_[kernel_id][j].table_id, table_id);
211  auto& curr_frag_ids = fragments_per_kernel_[kernel_id][j].fragment_ids;
212  for (const int frag_id : frag_ids) {
213  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
214  curr_frag_ids.end()) {
215  curr_frag_ids.push_back(frag_id);
216  }
217  }
218  }
219  }
220  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
221  }
222 }
223 
224 namespace {
225 
226 bool is_sample_query(const RelAlgExecutionUnit& ra_exe_unit) {
227  const bool result = ra_exe_unit.input_descs.size() == 1 &&
228  ra_exe_unit.simple_quals.empty() && ra_exe_unit.quals.empty() &&
229  ra_exe_unit.sort_info.order_entries.empty() &&
230  ra_exe_unit.scan_limit;
231  if (result) {
232  CHECK_EQ(size_t(1), ra_exe_unit.groupby_exprs.size());
233  CHECK(!ra_exe_unit.groupby_exprs.front());
234  }
235  return result;
236 }
237 
238 } // namespace
239 
241  const RelAlgExecutionUnit& ra_exe_unit,
242  const size_t kernel_id) const {
243  const auto sample_query_limit =
244  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
245  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
246  getOuterFragmentTupleSize(kernel_id) >= sample_query_limit) {
247  return true;
248  }
249  return false;
250 }
251 
253  const Fragmenter_Namespace::FragmentInfo& fragment,
254  const int device_id,
255  const size_t num_bytes_for_row) {
256  if (g_cluster) {
257  // Disabled in distributed mode for now
258  return;
259  }
260  CHECK_GE(device_id, 0);
261  tuple_count_per_device_[device_id] += fragment.getNumTuples();
262  const size_t gpu_bytes_limit =
264  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
265  LOG(WARNING) << "Not enough memory on device " << device_id
266  << " for input chunks totaling "
267  << tuple_count_per_device_[device_id] * num_bytes_for_row
268  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
269  throw QueryMustRunOnCpu();
270  }
271 }
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent)
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::map< size_t, size_t > tuple_count_per_device_
ExecutorDeviceType
#define LOG(tag)
Definition: Logger.h:182
const std::list< Analyzer::OrderEntry > order_entries
static void computeAllTablesFragments(std::map< int, const TableFragments *> &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
std::vector< size_t > outer_fragment_tuple_sizes_
#define CHECK_GE(x, y)
Definition: Logger.h:200
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
const std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::vector< FragmentsPerTable > FragmentsList
std::map< int, std::set< size_t > > kernels_per_device_
bool terminateDispatchMaybe(const RelAlgExecutionUnit &ra_exe_unit, const size_t kernel_id) const
const size_t limit
std::vector< FragmentsList > fragments_per_kernel_
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, Executor *executor)
const SortInfo sort_info
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
#define CHECK_LT(x, y)
Definition: Logger.h:197
#define CHECK_LE(x, y)
Definition: Logger.h:198
const size_t getOuterFragmentTupleSize(const size_t frag_index) const
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:187
bool g_cluster
const size_t offset
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for a query.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals