OmniSciDB  2c44a3935d
QueryFragmentDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <DataMgr/DataMgr.h>
20 #include "../Execute.h"
21 #include "Shared/misc.h"
22 
24  const RelAlgExecutionUnit& ra_exe_unit,
25  const std::vector<InputTableInfo>& query_infos,
26  const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
27  const double gpu_input_mem_limit_percent,
28  std::vector<size_t> allowed_outer_fragment_indices)
29  : allowed_outer_fragment_indices_(allowed_outer_fragment_indices)
30  , gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
31  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
32  CHECK_EQ(query_infos.size(), input_desc_count);
33  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
34  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
35  if (!selected_tables_fragments_.count(table_id)) {
36  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
37  &query_infos[table_idx].info.fragments;
38  }
39  }
40 
41  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
42  const auto& gpu_mem_info = gpu_mem_infos[device_id];
43  available_gpu_mem_bytes_[device_id] =
44  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
45  }
46 }
47 
49  std::map<int, const TableFragments*>& all_tables_fragments,
50  const RelAlgExecutionUnit& ra_exe_unit,
51  const std::vector<InputTableInfo>& query_infos) {
52  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
53  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
54  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
55  const auto& fragments = query_infos[tab_idx].info.fragments;
56  if (!all_tables_fragments.count(table_id)) {
57  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
58  }
59  }
60 }
61 
63  const RelAlgExecutionUnit& ra_exe_unit,
64  const std::vector<uint64_t>& frag_offsets,
65  const int device_count,
66  const ExecutorDeviceType& device_type,
67  const bool enable_multifrag_kernels,
68  const bool enable_inner_join_fragment_skipping,
69  Executor* executor) {
70  // For joins, only consider the cardinality of the LHS
71  // columns in the bytes per row count.
72  std::set<int> lhs_table_ids;
73  for (const auto& input_desc : ra_exe_unit.input_descs) {
74  if (input_desc.getNestLevel() == 0) {
75  lhs_table_ids.insert(input_desc.getTableId());
76  }
77  }
78 
79  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow(lhs_table_ids);
80 
81  if (ra_exe_unit.union_all) {
83  frag_offsets,
84  device_count,
85  num_bytes_for_row,
86  device_type,
87  executor);
88  } else if (enable_multifrag_kernels) {
89  buildMultifragKernelMap(ra_exe_unit,
90  frag_offsets,
91  device_count,
92  num_bytes_for_row,
93  device_type,
94  enable_inner_join_fragment_skipping,
95  executor);
96  } else {
97  buildFragmentPerKernelMap(ra_exe_unit,
98  frag_offsets,
99  device_count,
100  num_bytes_for_row,
101  device_type,
102  executor);
103  }
104 }
105 
106 namespace {
107 std::optional<size_t> compute_fragment_tuple_count(
108  const Fragmenter_Namespace::FragmentInfo& fragment,
109  const ColumnDescriptor* deleted_cd) {
110  if (deleted_cd) {
111  return std::nullopt;
112  } else {
113  return fragment.getNumTuples();
114  }
115 }
116 
117 } // namespace
118 
120  const RelAlgExecutionUnit& ra_exe_unit,
121  const std::vector<uint64_t>& frag_offsets,
122  const int device_count,
123  const size_t num_bytes_for_row,
124  const ExecutorDeviceType& device_type,
125  Executor* executor) {
126  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
127  auto const& table_desc = ra_exe_unit.input_descs[j];
128  int const table_id = table_desc.getTableId();
129  TableFragments const* fragments = selected_tables_fragments_.at(table_id);
130 
131  const ColumnDescriptor* deleted_cd{nullptr};
132  if (table_id > 0) {
133  // Temporary tables will not have a table descriptor and not have deleted rows.
134  const auto& catalog = executor->getCatalog();
135  const auto td = catalog->getMetadataForTable(table_id);
136  CHECK(td);
137  deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
138  }
139  VLOG(1) << "table_id=" << table_id << " fragments->size()=" << fragments->size()
140  << " fragments->front().physicalTableId="
141  << fragments->front().physicalTableId
142  << " fragments->front().getNumTuples()=" << fragments->front().getNumTuples()
143  << " fragments->front().getPhysicalNumTuples()="
144  << fragments->front().getPhysicalNumTuples();
145 
146  for (size_t i = 0; i < fragments->size(); ++i) {
147  const auto& fragment = (*fragments)[i];
148  const auto skip_frag = executor->skipFragment(
149  table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
150  if (skip_frag.first) {
151  continue;
152  }
153  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
154  const int chosen_device_count =
155  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
156  CHECK_GT(chosen_device_count, 0);
157  const auto memory_level = device_type == ExecutorDeviceType::GPU
160 
161  int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
162  ? fragment.deviceIds[static_cast<int>(memory_level)]
163  : fragment.shard % chosen_device_count;
164 
165  VLOG(1) << "device_type_is_cpu=" << (device_type == ExecutorDeviceType::CPU)
166  << " chosen_device_count=" << chosen_device_count
167  << " fragment.shard=" << fragment.shard
168  << " fragment.deviceIds.size()=" << fragment.deviceIds.size()
169  << " int(memory_level)=" << int(memory_level) << " device_id=" << device_id;
170 
171  if (device_type == ExecutorDeviceType::GPU) {
172  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
173  }
174 
175  const auto frag_ids =
176  executor->getTableFragmentIndices(ra_exe_unit,
177  device_type,
178  j,
179  i,
181  executor->getInnerTabIdToJoinCond());
182 
183  VLOG(1) << "table_id=" << table_id << " frag_ids.size()=" << frag_ids.size()
184  << " frag_ids.front()=" << frag_ids.front();
185  ExecutionKernel execution_kernel{
186  device_id,
187  {FragmentsPerTable{table_id, frag_ids}},
188  compute_fragment_tuple_count(fragment, deleted_cd)};
189 
190  auto itr = execution_kernels_per_device_.find(device_id);
191  if (itr == execution_kernels_per_device_.end()) {
192  auto const pair = execution_kernels_per_device_.insert(std::make_pair(
193  device_id, std::vector<ExecutionKernel>{std::move(execution_kernel)}));
194  CHECK(pair.second);
195  } else {
196  itr->second.emplace_back(std::move(execution_kernel));
197  }
198  }
199  std::vector<int> table_ids =
200  std::accumulate(execution_kernels_per_device_[0].begin(),
202  std::vector<int>(),
203  [](auto&& vec, auto& exe_kern) {
204  vec.push_back(exe_kern.fragments[0].table_id);
205  return vec;
206  });
207  VLOG(1) << "execution_kernels_per_device_.size()="
209  << " execution_kernels_per_device_[0].size()="
210  << execution_kernels_per_device_[0].size()
211  << " execution_kernels_per_device_[0][*].fragments[0].table_id="
212  << shared::printContainer(table_ids);
213  }
214 }
215 
217  const RelAlgExecutionUnit& ra_exe_unit,
218  const std::vector<uint64_t>& frag_offsets,
219  const int device_count,
220  const size_t num_bytes_for_row,
221  const ExecutorDeviceType& device_type,
222  Executor* executor) {
223  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
224  const int outer_table_id = outer_table_desc.getTableId();
225  auto it = selected_tables_fragments_.find(outer_table_id);
226  CHECK(it != selected_tables_fragments_.end());
227  const auto outer_fragments = it->second;
228  outer_fragments_size_ = outer_fragments->size();
229 
230  const ColumnDescriptor* deleted_cd{nullptr};
231  if (outer_table_id > 0) {
232  // Intermediate tables will not have a table descriptor and will also not have
233  // deleted rows.
234  const auto& catalog = executor->getCatalog();
235  const auto td = catalog->getMetadataForTable(outer_table_id);
236  CHECK(td);
237  deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
238  }
239 
240  for (size_t i = 0; i < outer_fragments->size(); ++i) {
241  if (!allowed_outer_fragment_indices_.empty()) {
242  if (std::find(allowed_outer_fragment_indices_.begin(),
244  i) == allowed_outer_fragment_indices_.end()) {
245  continue;
246  }
247  }
248 
249  const auto& fragment = (*outer_fragments)[i];
250  const auto skip_frag = executor->skipFragment(
251  outer_table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
252  if (skip_frag.first) {
253  continue;
254  }
255  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
256  const int chosen_device_count =
257  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
258  CHECK_GT(chosen_device_count, 0);
259  const auto memory_level = device_type == ExecutorDeviceType::GPU
262  int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
263  ? fragment.deviceIds[static_cast<int>(memory_level)]
264  : fragment.shard % chosen_device_count;
265 
266  if (device_type == ExecutorDeviceType::GPU) {
267  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
268  }
269 
270  ExecutionKernel execution_kernel{
271  device_id, {}, compute_fragment_tuple_count(fragment, deleted_cd)};
272  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
273  const auto frag_ids =
274  executor->getTableFragmentIndices(ra_exe_unit,
275  device_type,
276  j,
277  i,
279  executor->getInnerTabIdToJoinCond());
280  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
281  auto table_frags_it = selected_tables_fragments_.find(table_id);
282  CHECK(table_frags_it != selected_tables_fragments_.end());
283 
284  execution_kernel.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
285  }
286 
287  if (execution_kernels_per_device_.find(device_id) ==
290  .insert(std::make_pair(device_id,
291  std::vector<ExecutionKernel>{execution_kernel}))
292  .second);
293  } else {
294  execution_kernels_per_device_[device_id].emplace_back(execution_kernel);
295  }
296  }
297 }
298 
300  const RelAlgExecutionUnit& ra_exe_unit,
301  const std::vector<uint64_t>& frag_offsets,
302  const int device_count,
303  const size_t num_bytes_for_row,
304  const ExecutorDeviceType& device_type,
305  const bool enable_inner_join_fragment_skipping,
306  Executor* executor) {
307  // Allocate all the fragments of the tables involved in the query to available
308  // devices. The basic idea: the device is decided by the outer table in the
309  // query (the first table in a join) and we need to broadcast the fragments
310  // in the inner table to each device. Sharding will change this model.
311  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
312  const int outer_table_id = outer_table_desc.getTableId();
313  auto it = selected_tables_fragments_.find(outer_table_id);
314  CHECK(it != selected_tables_fragments_.end());
315  const auto outer_fragments = it->second;
316  outer_fragments_size_ = outer_fragments->size();
317 
318  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
319 
320  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
321  ++outer_frag_id) {
322  if (!allowed_outer_fragment_indices_.empty()) {
323  if (std::find(allowed_outer_fragment_indices_.begin(),
325  outer_frag_id) == allowed_outer_fragment_indices_.end()) {
326  continue;
327  }
328  }
329 
330  const auto& fragment = (*outer_fragments)[outer_frag_id];
331  auto skip_frag = executor->skipFragment(outer_table_desc,
332  fragment,
333  ra_exe_unit.simple_quals,
334  frag_offsets,
335  outer_frag_id);
336  if (enable_inner_join_fragment_skipping &&
337  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
338  skip_frag = executor->skipFragmentInnerJoins(
339  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
340  }
341  if (skip_frag.first) {
342  continue;
343  }
344  const int device_id =
345  fragment.shard == -1
346  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
347  : fragment.shard % device_count;
348  if (device_type == ExecutorDeviceType::GPU) {
349  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
350  }
351  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
352  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
353  auto table_frags_it = selected_tables_fragments_.find(table_id);
354  CHECK(table_frags_it != selected_tables_fragments_.end());
355  const auto frag_ids =
356  executor->getTableFragmentIndices(ra_exe_unit,
357  device_type,
358  j,
359  outer_frag_id,
361  inner_table_id_to_join_condition);
362 
363  if (execution_kernels_per_device_.find(device_id) ==
365  std::vector<ExecutionKernel> kernels{
366  ExecutionKernel{device_id, FragmentsList{}, std::nullopt}};
367  CHECK(execution_kernels_per_device_.insert(std::make_pair(device_id, kernels))
368  .second);
369  }
370 
371  // Multifrag kernels only have one execution kernel per device. Grab the execution
372  // kernel object and push back into its fragments list.
373  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
374  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
375 
376  auto& kernel_frag_list = execution_kernel.fragments;
377  if (kernel_frag_list.size() < j + 1) {
378  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
379  } else {
380  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
381  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
382  for (const int frag_id : frag_ids) {
383  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
384  curr_frag_ids.end()) {
385  curr_frag_ids.push_back(frag_id);
386  }
387  }
388  }
389  }
390  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
391  }
392 }
393 
394 namespace {
395 
396 bool is_sample_query(const RelAlgExecutionUnit& ra_exe_unit) {
397  const bool result = ra_exe_unit.input_descs.size() == 1 &&
398  ra_exe_unit.simple_quals.empty() && ra_exe_unit.quals.empty() &&
399  ra_exe_unit.sort_info.order_entries.empty() &&
400  ra_exe_unit.scan_limit;
401  if (result) {
402  CHECK_EQ(size_t(1), ra_exe_unit.groupby_exprs.size());
403  CHECK(!ra_exe_unit.groupby_exprs.front());
404  }
405  return result;
406 }
407 
408 } // namespace
409 
411  size_t& tuple_count,
412  const RelAlgExecutionUnit& ra_exe_unit,
413  const ExecutionKernel& kernel) const {
414  const auto sample_query_limit =
415  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
416  if (!kernel.outer_tuple_count) {
417  return false;
418  } else {
419  tuple_count += *kernel.outer_tuple_count;
420  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
421  tuple_count >= sample_query_limit) {
422  return true;
423  }
424  }
425  return false;
426 }
427 
429  const Fragmenter_Namespace::FragmentInfo& fragment,
430  const int device_id,
431  const size_t num_bytes_for_row) {
432  if (g_cluster) {
433  // Disabled in distributed mode for now
434  return;
435  }
436  CHECK_GE(device_id, 0);
437  tuple_count_per_device_[device_id] += fragment.getNumTuples();
438  const size_t gpu_bytes_limit =
440  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
441  LOG(WARNING) << "Not enough memory on device " << device_id
442  << " for input chunks totaling "
443  << tuple_count_per_device_[device_id] * num_bytes_for_row
444  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
445  throw QueryMustRunOnCpu();
446  }
447 }
448 
449 std::ostream& operator<<(std::ostream& os, FragmentsPerTable const& fragments_per_table) {
450  os << "table_id(" << fragments_per_table.table_id << ") fragment_ids";
451  for (size_t i = 0; i < fragments_per_table.fragment_ids.size(); ++i) {
452  os << (i ? ' ' : '(') << fragments_per_table.fragment_ids[i];
453  }
454  return os << ')';
455 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent, const std::vector< size_t > allowed_outer_fragment_indices)
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernel &kernel) const
std::map< size_t, size_t > tuple_count_per_device_
ExecutorDeviceType
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:188
const std::list< Analyzer::OrderEntry > order_entries
std::vector< Fragmenter_Namespace::FragmentInfo > TableFragments
static void computeAllTablesFragments(std::map< int, const TableFragments *> &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< FragmentsPerTable > FragmentsList
const size_t limit
const SortInfo sort_info
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::ostream & operator<<(std::ostream &os, FragmentsPerTable const &fragments_per_table)
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
specifies the content in-memory of a row in the column metadata table
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::optional< size_t > compute_fragment_tuple_count(const Fragmenter_Namespace::FragmentInfo &fragment, const ColumnDescriptor *deleted_cd)
std::optional< size_t > outer_tuple_count
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:197
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_
std::vector< size_t > allowed_outer_fragment_indices_
bool g_cluster
void buildFragmentPerKernelMapForUnion(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< size_t > fragment_ids
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:60
const size_t offset
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for a query.
#define VLOG(n)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals