OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryFragmentDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "DataMgr/DataMgr.h"
22 #include "QueryEngine/Execute.h"
23 #include "Shared/misc.h"
24 
26  const RelAlgExecutionUnit& ra_exe_unit,
27  const std::vector<InputTableInfo>& query_infos,
28  const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
29  const double gpu_input_mem_limit_percent,
30  std::vector<size_t> allowed_outer_fragment_indices)
31  : allowed_outer_fragment_indices_(allowed_outer_fragment_indices)
32  , gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
33  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
34  CHECK_EQ(query_infos.size(), input_desc_count);
35  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
36  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
37  if (!selected_tables_fragments_.count(table_id)) {
38  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
39  &query_infos[table_idx].info.fragments;
40  }
41  }
42 
43  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
44  const auto& gpu_mem_info = gpu_mem_infos[device_id];
45  available_gpu_mem_bytes_[device_id] =
46  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
47  }
48 }
49 
51  std::map<int, const TableFragments*>& all_tables_fragments,
52  const RelAlgExecutionUnit& ra_exe_unit,
53  const std::vector<InputTableInfo>& query_infos) {
54  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
55  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
56  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
57  const auto& fragments = query_infos[tab_idx].info.fragments;
58  if (!all_tables_fragments.count(table_id)) {
59  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
60  }
61  }
62 }
63 
65  const RelAlgExecutionUnit& ra_exe_unit,
66  const std::vector<uint64_t>& frag_offsets,
67  const int device_count,
68  const ExecutorDeviceType& device_type,
69  const bool enable_multifrag_kernels,
70  const bool enable_inner_join_fragment_skipping,
71  Executor* executor) {
72  // For joins, only consider the cardinality of the LHS
73  // columns in the bytes per row count.
74  std::set<int> lhs_table_ids;
75  for (const auto& input_desc : ra_exe_unit.input_descs) {
76  if (input_desc.getNestLevel() == 0) {
77  lhs_table_ids.insert(input_desc.getTableId());
78  }
79  }
80 
81  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow(lhs_table_ids);
82 
83  if (ra_exe_unit.union_all) {
85  frag_offsets,
86  device_count,
87  num_bytes_for_row,
88  device_type,
89  executor);
90  } else if (enable_multifrag_kernels) {
91  buildMultifragKernelMap(ra_exe_unit,
92  frag_offsets,
93  device_count,
94  num_bytes_for_row,
95  device_type,
96  enable_inner_join_fragment_skipping,
97  executor);
98  } else {
99  buildFragmentPerKernelMap(ra_exe_unit,
100  frag_offsets,
101  device_count,
102  num_bytes_for_row,
103  device_type,
104  executor);
105  }
106 }
107 
109  const TableFragments* fragments,
110  const RelAlgExecutionUnit& ra_exe_unit,
111  const InputDescriptor& table_desc,
112  const bool is_temporary_table,
113  const std::vector<uint64_t>& frag_offsets,
114  const int device_count,
115  const size_t num_bytes_for_row,
116  const ChunkMetadataVector& deleted_chunk_metadata_vec,
117  const std::optional<size_t> table_desc_offset,
118  const ExecutorDeviceType& device_type,
119  Executor* executor) {
120  auto get_fragment_tuple_count = [&deleted_chunk_metadata_vec, &is_temporary_table](
121  const auto& fragment) -> std::optional<size_t> {
122  // returning std::nullopt disables execution dispatch optimizations based on tuple
123  // counts as it signals to the dispatch mechanism that a reliable tuple count cannot
124  // be obtained. This is the case for fragments which have deleted rows, temporary
125  // table fragments, or fragments in a UNION query.
126  if (is_temporary_table) {
127  return std::nullopt;
128  }
129  if (deleted_chunk_metadata_vec.empty()) {
130  return fragment.getNumTuples();
131  }
132  const auto fragment_id = fragment.fragmentId;
133  CHECK_GE(fragment_id, 0);
134  if (static_cast<size_t>(fragment_id) < deleted_chunk_metadata_vec.size()) {
135  const auto& chunk_metadata = deleted_chunk_metadata_vec[fragment_id];
136  if (chunk_metadata.second->chunkStats.max.tinyintval == 1) {
137  return std::nullopt;
138  }
139  }
140  return fragment.getNumTuples();
141  };
142 
143  for (size_t i = 0; i < fragments->size(); i++) {
144  if (!allowed_outer_fragment_indices_.empty()) {
145  if (std::find(allowed_outer_fragment_indices_.begin(),
147  i) == allowed_outer_fragment_indices_.end()) {
148  continue;
149  }
150  }
151 
152  const auto& fragment = (*fragments)[i];
153  const auto skip_frag = executor->skipFragment(
154  table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
155  if (skip_frag.first) {
156  continue;
157  }
158  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
159  const int chosen_device_count =
160  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
161  CHECK_GT(chosen_device_count, 0);
162  const auto memory_level = device_type == ExecutorDeviceType::GPU
165  const int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
166  ? fragment.deviceIds[static_cast<int>(memory_level)]
167  : fragment.shard % chosen_device_count;
168 
169  if (device_type == ExecutorDeviceType::GPU) {
170  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
171  }
172 
173  ExecutionKernelDescriptor execution_kernel_desc{
174  device_id, {}, get_fragment_tuple_count(fragment)};
175  if (table_desc_offset) {
176  const auto frag_ids =
177  executor->getTableFragmentIndices(ra_exe_unit,
178  device_type,
179  *table_desc_offset,
180  i,
182  executor->getInnerTabIdToJoinCond());
183  const auto table_id = ra_exe_unit.input_descs[*table_desc_offset].getTableId();
184  execution_kernel_desc.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
185 
186  } else {
187  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
188  const auto frag_ids =
189  executor->getTableFragmentIndices(ra_exe_unit,
190  device_type,
191  j,
192  i,
194  executor->getInnerTabIdToJoinCond());
195  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
196  auto table_frags_it = selected_tables_fragments_.find(table_id);
197  CHECK(table_frags_it != selected_tables_fragments_.end());
198 
199  execution_kernel_desc.fragments.emplace_back(
200  FragmentsPerTable{table_id, frag_ids});
201  }
202  }
203 
204  auto itr = execution_kernels_per_device_.find(device_id);
205  if (itr == execution_kernels_per_device_.end()) {
206  auto const pair = execution_kernels_per_device_.insert(std::make_pair(
207  device_id,
208  std::vector<ExecutionKernelDescriptor>{std::move(execution_kernel_desc)}));
209  CHECK(pair.second);
210  } else {
211  itr->second.emplace_back(std::move(execution_kernel_desc));
212  }
213  }
214 }
215 
217  const RelAlgExecutionUnit& ra_exe_unit,
218  const std::vector<uint64_t>& frag_offsets,
219  const int device_count,
220  const size_t num_bytes_for_row,
221  const ExecutorDeviceType& device_type,
222  Executor* executor) {
223  const auto& catalog = executor->getCatalog();
224 
225  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
226  auto const& table_desc = ra_exe_unit.input_descs[j];
227  int const table_id = table_desc.getTableId();
228  TableFragments const* fragments = selected_tables_fragments_.at(table_id);
229 
230  auto& data_mgr = catalog->getDataMgr();
231  ChunkMetadataVector deleted_chunk_metadata_vec;
232 
233  bool is_temporary_table = false;
234  if (table_id > 0) {
235  // Temporary tables will not have a table descriptor and not have deleted rows.
236  const auto td = catalog->getMetadataForTable(table_id);
237  CHECK(td);
238  if (table_is_temporary(td)) {
239  // for temporary tables, we won't have delete column metadata available. However,
240  // we know the table fits in memory as it is a temporary table, so signal to the
241  // lower layers that we can disregard the early out select * optimization
242  is_temporary_table = true;
243  } else {
244  const auto deleted_cd = executor->plan_state_->getDeletedColForTable(table_id);
245  if (deleted_cd) {
246  ChunkKey chunk_key_prefix = {
247  catalog->getCurrentDB().dbId, table_id, deleted_cd->columnId};
248  data_mgr.getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
249  chunk_key_prefix);
250  }
251  }
252  }
253 
255  ra_exe_unit,
256  table_desc,
257  is_temporary_table,
258  frag_offsets,
259  device_count,
260  num_bytes_for_row,
261  {},
262  j,
263  device_type,
264  executor);
265 
266  std::vector<int> table_ids =
269  std::vector<int>(),
270  [](auto&& vec, auto& exe_kern) {
271  vec.push_back(exe_kern.fragments[0].table_id);
272  return vec;
273  });
274  VLOG(1) << "execution_kernels_per_device_.size()="
276  << " execution_kernels_per_device_[0][*].fragments[0].table_id="
277  << shared::printContainer(table_ids);
278  }
279 }
280 
282  const RelAlgExecutionUnit& ra_exe_unit,
283  const std::vector<uint64_t>& frag_offsets,
284  const int device_count,
285  const size_t num_bytes_for_row,
286  const ExecutorDeviceType& device_type,
287  Executor* executor) {
288  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
289  const int outer_table_id = outer_table_desc.getTableId();
290  auto it = selected_tables_fragments_.find(outer_table_id);
291  CHECK(it != selected_tables_fragments_.end());
292  const auto outer_fragments = it->second;
293  outer_fragments_size_ = outer_fragments->size();
294 
295  const auto& catalog = executor->getCatalog();
296 
297  auto& data_mgr = catalog->getDataMgr();
298  ChunkMetadataVector deleted_chunk_metadata_vec;
299 
300  bool is_temporary_table = false;
301  if (outer_table_id > 0) {
302  // Temporary tables will not have a table descriptor and not have deleted rows.
303  const auto td = catalog->getMetadataForTable(outer_table_id);
304  CHECK(td);
305  if (table_is_temporary(td)) {
306  // for temporary tables, we won't have delete column metadata available. However, we
307  // know the table fits in memory as it is a temporary table, so signal to the lower
308  // layers that we can disregard the early out select * optimization
309  is_temporary_table = true;
310  } else {
311  const auto deleted_cd =
312  executor->plan_state_->getDeletedColForTable(outer_table_id);
313  if (deleted_cd) {
314  ChunkKey chunk_key_prefix = {
315  catalog->getCurrentDB().dbId, outer_table_id, deleted_cd->columnId};
316  data_mgr.getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
317  chunk_key_prefix);
318  }
319  }
320  }
321 
322  buildFragmentPerKernelForTable(outer_fragments,
323  ra_exe_unit,
324  outer_table_desc,
325  is_temporary_table,
326  frag_offsets,
327  device_count,
328  num_bytes_for_row,
329  deleted_chunk_metadata_vec,
330  std::nullopt,
331  device_type,
332  executor);
333 }
334 
336  const RelAlgExecutionUnit& ra_exe_unit,
337  const std::vector<uint64_t>& frag_offsets,
338  const int device_count,
339  const size_t num_bytes_for_row,
340  const ExecutorDeviceType& device_type,
341  const bool enable_inner_join_fragment_skipping,
342  Executor* executor) {
343  // Allocate all the fragments of the tables involved in the query to available
344  // devices. The basic idea: the device is decided by the outer table in the
345  // query (the first table in a join) and we need to broadcast the fragments
346  // in the inner table to each device. Sharding will change this model.
347  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
348  const int outer_table_id = outer_table_desc.getTableId();
349  auto it = selected_tables_fragments_.find(outer_table_id);
350  CHECK(it != selected_tables_fragments_.end());
351  const auto outer_fragments = it->second;
352  outer_fragments_size_ = outer_fragments->size();
353 
354  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
355 
356  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
357  ++outer_frag_id) {
358  if (!allowed_outer_fragment_indices_.empty()) {
359  if (std::find(allowed_outer_fragment_indices_.begin(),
361  outer_frag_id) == allowed_outer_fragment_indices_.end()) {
362  continue;
363  }
364  }
365 
366  const auto& fragment = (*outer_fragments)[outer_frag_id];
367  auto skip_frag = executor->skipFragment(outer_table_desc,
368  fragment,
369  ra_exe_unit.simple_quals,
370  frag_offsets,
371  outer_frag_id);
372  if (enable_inner_join_fragment_skipping &&
373  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
374  skip_frag = executor->skipFragmentInnerJoins(
375  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
376  }
377  if (skip_frag.first) {
378  continue;
379  }
380  const int device_id =
381  fragment.shard == -1
382  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
383  : fragment.shard % device_count;
384  if (device_type == ExecutorDeviceType::GPU) {
385  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
386  }
387  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
388  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
389  auto table_frags_it = selected_tables_fragments_.find(table_id);
390  CHECK(table_frags_it != selected_tables_fragments_.end());
391  const auto frag_ids =
392  executor->getTableFragmentIndices(ra_exe_unit,
393  device_type,
394  j,
395  outer_frag_id,
397  inner_table_id_to_join_condition);
398 
399  if (execution_kernels_per_device_.find(device_id) ==
401  std::vector<ExecutionKernelDescriptor> kernel_descs{
402  ExecutionKernelDescriptor{device_id, FragmentsList{}, std::nullopt}};
403  CHECK(
404  execution_kernels_per_device_.insert(std::make_pair(device_id, kernel_descs))
405  .second);
406  }
407 
408  // Multifrag kernels only have one execution kernel per device. Grab the execution
409  // kernel object and push back into its fragments list.
410  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
411  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
412 
413  auto& kernel_frag_list = execution_kernel.fragments;
414  if (kernel_frag_list.size() < j + 1) {
415  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
416  } else {
417  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
418  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
419  for (const int frag_id : frag_ids) {
420  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
421  curr_frag_ids.end()) {
422  curr_frag_ids.push_back(frag_id);
423  }
424  }
425  }
426  }
427  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
428  }
429 }
430 
431 namespace {
432 
433 bool is_sample_query(const RelAlgExecutionUnit& ra_exe_unit) {
434  const bool result = ra_exe_unit.input_descs.size() == 1 &&
435  ra_exe_unit.simple_quals.empty() && ra_exe_unit.quals.empty() &&
436  ra_exe_unit.sort_info.order_entries.empty() &&
437  ra_exe_unit.scan_limit;
438  if (result) {
439  CHECK_EQ(size_t(1), ra_exe_unit.groupby_exprs.size());
440  CHECK(!ra_exe_unit.groupby_exprs.front());
441  }
442  return result;
443 }
444 
445 } // namespace
446 
448  size_t& tuple_count,
449  const RelAlgExecutionUnit& ra_exe_unit,
450  const ExecutionKernelDescriptor& kernel) const {
451  const auto sample_query_limit =
452  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
453  if (!kernel.outer_tuple_count) {
454  return false;
455  } else {
456  tuple_count += *kernel.outer_tuple_count;
457  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
458  tuple_count >= sample_query_limit) {
459  return true;
460  }
461  }
462  return false;
463 }
464 
466  const Fragmenter_Namespace::FragmentInfo& fragment,
467  const int device_id,
468  const size_t num_bytes_for_row) {
469  if (g_cluster) {
470  // Disabled in distributed mode for now
471  return;
472  }
473  CHECK_GE(device_id, 0);
474  tuple_count_per_device_[device_id] += fragment.getNumTuples();
475  const size_t gpu_bytes_limit =
477  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
478  LOG(WARNING) << "Not enough memory on device " << device_id
479  << " for input chunks totaling "
480  << tuple_count_per_device_[device_id] * num_bytes_for_row
481  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
482  throw QueryMustRunOnCpu();
483  }
484 }
485 
486 std::ostream& operator<<(std::ostream& os, FragmentsPerTable const& fragments_per_table) {
487  os << "table_id(" << fragments_per_table.table_id << ") fragment_ids";
488  for (size_t i = 0; i < fragments_per_table.fragment_ids.size(); ++i) {
489  os << (i ? ' ' : '(') << fragments_per_table.fragment_ids[i];
490  }
491  return os << ')';
492 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent, const std::vector< size_t > allowed_outer_fragment_indices)
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< size_t > outer_tuple_count
std::map< size_t, size_t > tuple_count_per_device_
ExecutorDeviceType
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernelDescriptor &kernel) const
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:188
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
const std::list< Analyzer::OrderEntry > order_entries
std::vector< Fragmenter_Namespace::FragmentInfo > TableFragments
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< FragmentsPerTable > FragmentsList
const size_t limit
const SortInfo sort_info
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
bool table_is_temporary(const TableDescriptor *const td)
std::list< std::shared_ptr< Analyzer::Expr > > quals
void buildFragmentPerKernelForTable(const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const bool is_temporary_table, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< size_t > allowed_outer_fragment_indices_
bool g_cluster
void buildFragmentPerKernelMapForUnion(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< size_t > fragment_ids
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const size_t offset
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for an execution kernel.
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals