OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryFragmentDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <DataMgr/DataMgr.h>
20 #include "../Execute.h"
21 
23  const RelAlgExecutionUnit& ra_exe_unit,
24  const std::vector<InputTableInfo>& query_infos,
25  const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
26  const double gpu_input_mem_limit_percent)
27  : gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
28  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
29  CHECK_EQ(query_infos.size(), input_desc_count);
30  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
31  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
32  if (!selected_tables_fragments_.count(table_id)) {
33  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
34  &query_infos[table_idx].info.fragments;
35  }
36  }
37 
38  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
39  const auto& gpu_mem_info = gpu_mem_infos[device_id];
40  available_gpu_mem_bytes_[device_id] =
41  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
42  }
43 }
44 
46  std::map<int, const TableFragments*>& all_tables_fragments,
47  const RelAlgExecutionUnit& ra_exe_unit,
48  const std::vector<InputTableInfo>& query_infos) {
49  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
50  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
51  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
52  const auto& fragments = query_infos[tab_idx].info.fragments;
53  if (!all_tables_fragments.count(table_id)) {
54  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
55  }
56  }
57 }
58 
60  const RelAlgExecutionUnit& ra_exe_unit,
61  const std::vector<uint64_t>& frag_offsets,
62  const int device_count,
63  const ExecutorDeviceType& device_type,
64  const bool enable_multifrag_kernels,
65  const bool enable_inner_join_fragment_skipping,
66  Executor* executor) {
67  if (enable_multifrag_kernels) {
68  buildMultifragKernelMap(ra_exe_unit,
69  frag_offsets,
70  device_count,
71  device_type,
72  enable_inner_join_fragment_skipping,
73  executor);
74  } else {
76  ra_exe_unit, frag_offsets, device_count, device_type, executor);
77  }
78 }
79 
80 namespace {
81 std::optional<size_t> compute_fragment_tuple_count(
82  const Fragmenter_Namespace::FragmentInfo& fragment,
83  const ColumnDescriptor* deleted_cd) {
84  if (deleted_cd) {
85  return std::nullopt;
86  } else {
87  return fragment.getNumTuples();
88  }
89 }
90 
91 } // namespace
92 
94  const RelAlgExecutionUnit& ra_exe_unit,
95  const std::vector<uint64_t>& frag_offsets,
96  const int device_count,
97  const ExecutorDeviceType& device_type,
98  Executor* executor) {
99  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
100  const int outer_table_id = outer_table_desc.getTableId();
101  auto it = selected_tables_fragments_.find(outer_table_id);
102  CHECK(it != selected_tables_fragments_.end());
103  const auto outer_fragments = it->second;
104  outer_fragments_size_ = outer_fragments->size();
105 
106  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
107 
108  const ColumnDescriptor* deleted_cd{nullptr};
109  if (outer_table_id > 0) {
110  // Temporary tables will not have a table descriptor and will also not have deleted
111  // rows.
112  const auto& catalog = executor->getCatalog();
113  const auto td = catalog->getMetadataForTable(outer_table_id);
114  CHECK(td);
115  deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
116  }
117 
118  for (size_t i = 0; i < outer_fragments->size(); ++i) {
119  const auto& fragment = (*outer_fragments)[i];
120  const auto skip_frag = executor->skipFragment(
121  outer_table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
122  if (skip_frag.first) {
123  continue;
124  }
125  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
126  const int chosen_device_count =
127  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
128  CHECK_GT(chosen_device_count, 0);
129  const auto memory_level = device_type == ExecutorDeviceType::GPU
132  int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
133  ? fragment.deviceIds[static_cast<int>(memory_level)]
134  : fragment.shard % chosen_device_count;
135 
136  if (device_type == ExecutorDeviceType::GPU) {
137  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
138  }
139 
140  ExecutionKernel execution_kernel{
141  device_id, {}, compute_fragment_tuple_count(fragment, deleted_cd)};
142  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
143  const auto frag_ids =
144  executor->getTableFragmentIndices(ra_exe_unit,
145  device_type,
146  j,
147  i,
149  executor->getInnerTabIdToJoinCond());
150  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
151  auto table_frags_it = selected_tables_fragments_.find(table_id);
152  CHECK(table_frags_it != selected_tables_fragments_.end());
153 
154  execution_kernel.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
155  }
156 
157  if (execution_kernels_per_device_.find(device_id) ==
160  .insert(std::make_pair(device_id,
161  std::vector<ExecutionKernel>{execution_kernel}))
162  .second);
163  } else {
164  execution_kernels_per_device_[device_id].emplace_back(execution_kernel);
165  }
166  }
167 }
168 
170  const RelAlgExecutionUnit& ra_exe_unit,
171  const std::vector<uint64_t>& frag_offsets,
172  const int device_count,
173  const ExecutorDeviceType& device_type,
174  const bool enable_inner_join_fragment_skipping,
175  Executor* executor) {
176  // Allocate all the fragments of the tables involved in the query to available
177  // devices. The basic idea: the device is decided by the outer table in the
178  // query (the first table in a join) and we need to broadcast the fragments
179  // in the inner table to each device. Sharding will change this model.
180  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
181  const int outer_table_id = outer_table_desc.getTableId();
182  auto it = selected_tables_fragments_.find(outer_table_id);
183  CHECK(it != selected_tables_fragments_.end());
184  const auto outer_fragments = it->second;
185  outer_fragments_size_ = outer_fragments->size();
186 
187  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
188  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
189 
190  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
191  ++outer_frag_id) {
192  const auto& fragment = (*outer_fragments)[outer_frag_id];
193  auto skip_frag = executor->skipFragment(outer_table_desc,
194  fragment,
195  ra_exe_unit.simple_quals,
196  frag_offsets,
197  outer_frag_id);
198  if (enable_inner_join_fragment_skipping &&
199  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
200  skip_frag = executor->skipFragmentInnerJoins(
201  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
202  }
203  if (skip_frag.first) {
204  continue;
205  }
206  const int device_id =
207  fragment.shard == -1
208  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
209  : fragment.shard % device_count;
210  if (device_type == ExecutorDeviceType::GPU) {
211  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
212  }
213  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
214  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
215  auto table_frags_it = selected_tables_fragments_.find(table_id);
216  CHECK(table_frags_it != selected_tables_fragments_.end());
217  const auto frag_ids =
218  executor->getTableFragmentIndices(ra_exe_unit,
219  device_type,
220  j,
221  outer_frag_id,
223  inner_table_id_to_join_condition);
224 
225  if (execution_kernels_per_device_.find(device_id) ==
227  std::vector<ExecutionKernel> kernels{
228  ExecutionKernel{device_id, FragmentsList{}, std::nullopt}};
229  CHECK(execution_kernels_per_device_.insert(std::make_pair(device_id, kernels))
230  .second);
231  }
232 
233  // Multifrag kernels only have one execution kernel per device. Grab the execution
234  // kernel object and push back into its fragments list.
235  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
236  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
237 
238  auto& kernel_frag_list = execution_kernel.fragments;
239  if (kernel_frag_list.size() < j + 1) {
240  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
241  } else {
242  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
243  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
244  for (const int frag_id : frag_ids) {
245  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
246  curr_frag_ids.end()) {
247  curr_frag_ids.push_back(frag_id);
248  }
249  }
250  }
251  }
252  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
253  }
254 }
255 
256 namespace {
257 
258 bool is_sample_query(const RelAlgExecutionUnit& ra_exe_unit) {
259  const bool result = ra_exe_unit.input_descs.size() == 1 &&
260  ra_exe_unit.simple_quals.empty() && ra_exe_unit.quals.empty() &&
261  ra_exe_unit.sort_info.order_entries.empty() &&
262  ra_exe_unit.scan_limit;
263  if (result) {
264  CHECK_EQ(size_t(1), ra_exe_unit.groupby_exprs.size());
265  CHECK(!ra_exe_unit.groupby_exprs.front());
266  }
267  return result;
268 }
269 
270 } // namespace
271 
273  size_t& tuple_count,
274  const RelAlgExecutionUnit& ra_exe_unit,
275  const ExecutionKernel& kernel) const {
276  const auto sample_query_limit =
277  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
278  if (!kernel.outer_tuple_count) {
279  return false;
280  } else {
281  tuple_count += *kernel.outer_tuple_count;
282  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
283  tuple_count >= sample_query_limit) {
284  return true;
285  }
286  }
287  return false;
288 }
289 
291  const Fragmenter_Namespace::FragmentInfo& fragment,
292  const int device_id,
293  const size_t num_bytes_for_row) {
294  if (g_cluster) {
295  // Disabled in distributed mode for now
296  return;
297  }
298  CHECK_GE(device_id, 0);
299  tuple_count_per_device_[device_id] += fragment.getNumTuples();
300  const size_t gpu_bytes_limit =
302  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
303  LOG(WARNING) << "Not enough memory on device " << device_id
304  << " for input chunks totaling "
305  << tuple_count_per_device_[device_id] * num_bytes_for_row
306  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
307  throw QueryMustRunOnCpu();
308  }
309 }
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent)
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:198
bool g_cluster
std::map< size_t, size_t > tuple_count_per_device_
ExecutorDeviceType
#define LOG(tag)
Definition: Logger.h:185
const std::list< Analyzer::OrderEntry > order_entries
#define CHECK_GE(x, y)
Definition: Logger.h:203
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
const std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::vector< FragmentsPerTable > FragmentsList
const size_t limit
CHECK(cgen_state)
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, Executor *executor)
const SortInfo sort_info
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernel &kernel) const
specifies the content in-memory of a row in the column metadata table
std::optional< size_t > compute_fragment_tuple_count(const Fragmenter_Namespace::FragmentInfo &fragment, const ColumnDescriptor *deleted_cd)
std::optional< size_t > outer_tuple_count
std::list< std::shared_ptr< Analyzer::Expr > > quals
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_
const size_t offset
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for a query.
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals