OmniSciDB  2e3a973ef4
QueryFragmentDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "DataMgr/DataMgr.h"
21 #include "QueryEngine/Execute.h"
22 #include "Shared/misc.h"
23 
25  const RelAlgExecutionUnit& ra_exe_unit,
26  const std::vector<InputTableInfo>& query_infos,
27  const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
28  const double gpu_input_mem_limit_percent,
29  std::vector<size_t> allowed_outer_fragment_indices)
30  : allowed_outer_fragment_indices_(allowed_outer_fragment_indices)
31  , gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
32  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
33  CHECK_EQ(query_infos.size(), input_desc_count);
34  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
35  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
36  if (!selected_tables_fragments_.count(table_id)) {
37  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
38  &query_infos[table_idx].info.fragments;
39  }
40  }
41 
42  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
43  const auto& gpu_mem_info = gpu_mem_infos[device_id];
44  available_gpu_mem_bytes_[device_id] =
45  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
46  }
47 }
48 
50  std::map<int, const TableFragments*>& all_tables_fragments,
51  const RelAlgExecutionUnit& ra_exe_unit,
52  const std::vector<InputTableInfo>& query_infos) {
53  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
54  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
55  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
56  const auto& fragments = query_infos[tab_idx].info.fragments;
57  if (!all_tables_fragments.count(table_id)) {
58  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
59  }
60  }
61 }
62 
64  const RelAlgExecutionUnit& ra_exe_unit,
65  const std::vector<uint64_t>& frag_offsets,
66  const int device_count,
67  const ExecutorDeviceType& device_type,
68  const bool enable_multifrag_kernels,
69  const bool enable_inner_join_fragment_skipping,
70  Executor* executor) {
71  // For joins, only consider the cardinality of the LHS
72  // columns in the bytes per row count.
73  std::set<int> lhs_table_ids;
74  for (const auto& input_desc : ra_exe_unit.input_descs) {
75  if (input_desc.getNestLevel() == 0) {
76  lhs_table_ids.insert(input_desc.getTableId());
77  }
78  }
79 
80  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow(lhs_table_ids);
81 
82  if (ra_exe_unit.union_all) {
84  frag_offsets,
85  device_count,
86  num_bytes_for_row,
87  device_type,
88  executor);
89  } else if (enable_multifrag_kernels) {
90  buildMultifragKernelMap(ra_exe_unit,
91  frag_offsets,
92  device_count,
93  num_bytes_for_row,
94  device_type,
95  enable_inner_join_fragment_skipping,
96  executor);
97  } else {
98  buildFragmentPerKernelMap(ra_exe_unit,
99  frag_offsets,
100  device_count,
101  num_bytes_for_row,
102  device_type,
103  executor);
104  }
105 }
106 
108  const TableFragments* fragments,
109  const RelAlgExecutionUnit& ra_exe_unit,
110  const InputDescriptor& table_desc,
111  const std::vector<uint64_t>& frag_offsets,
112  const int device_count,
113  const size_t num_bytes_for_row,
114  const ChunkMetadataVector& deleted_chunk_metadata_vec,
115  const std::optional<size_t> table_desc_offset,
116  const ExecutorDeviceType& device_type,
117  Executor* executor) {
118  auto get_fragment_tuple_count =
119  [&deleted_chunk_metadata_vec](const auto& fragment) -> std::optional<size_t> {
120  // returning std::nullopt disables execution dispatch optimizations based on tuple
121  // counts as it signals to the dispatch mechanism that a reliable tuple count cannot
122  // be obtained. This is the case for fragments which have deleted rows, temporary
123  // table fragments, or fragments in a UNION query.
124  if (deleted_chunk_metadata_vec.empty()) {
125  return std::nullopt;
126  }
127  const auto fragment_id = fragment.fragmentId;
128  CHECK_GE(fragment_id, 0);
129  if (static_cast<size_t>(fragment_id) < deleted_chunk_metadata_vec.size()) {
130  const auto& chunk_metadata = deleted_chunk_metadata_vec[fragment_id];
131  if (chunk_metadata.second->chunkStats.max.tinyintval == 1) {
132  return std::nullopt;
133  }
134  }
135  return fragment.getNumTuples();
136  };
137 
138  for (size_t i = 0; i < fragments->size(); i++) {
139  if (!allowed_outer_fragment_indices_.empty()) {
140  if (std::find(allowed_outer_fragment_indices_.begin(),
142  i) == allowed_outer_fragment_indices_.end()) {
143  continue;
144  }
145  }
146 
147  const auto& fragment = (*fragments)[i];
148  const auto skip_frag = executor->skipFragment(
149  table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
150  if (skip_frag.first) {
151  continue;
152  }
153  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
154  const int chosen_device_count =
155  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
156  CHECK_GT(chosen_device_count, 0);
157  const auto memory_level = device_type == ExecutorDeviceType::GPU
160  const int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
161  ? fragment.deviceIds[static_cast<int>(memory_level)]
162  : fragment.shard % chosen_device_count;
163 
164  if (device_type == ExecutorDeviceType::GPU) {
165  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
166  }
167 
168  ExecutionKernelDescriptor execution_kernel_desc{
169  device_id, {}, get_fragment_tuple_count(fragment)};
170  if (table_desc_offset) {
171  const auto frag_ids =
172  executor->getTableFragmentIndices(ra_exe_unit,
173  device_type,
174  *table_desc_offset,
175  i,
177  executor->getInnerTabIdToJoinCond());
178  const auto table_id = ra_exe_unit.input_descs[*table_desc_offset].getTableId();
179  execution_kernel_desc.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
180 
181  } else {
182  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
183  const auto frag_ids =
184  executor->getTableFragmentIndices(ra_exe_unit,
185  device_type,
186  j,
187  i,
189  executor->getInnerTabIdToJoinCond());
190  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
191  auto table_frags_it = selected_tables_fragments_.find(table_id);
192  CHECK(table_frags_it != selected_tables_fragments_.end());
193 
194  execution_kernel_desc.fragments.emplace_back(
195  FragmentsPerTable{table_id, frag_ids});
196  }
197  }
198 
199  auto itr = execution_kernels_per_device_.find(device_id);
200  if (itr == execution_kernels_per_device_.end()) {
201  auto const pair = execution_kernels_per_device_.insert(std::make_pair(
202  device_id,
203  std::vector<ExecutionKernelDescriptor>{std::move(execution_kernel_desc)}));
204  CHECK(pair.second);
205  } else {
206  itr->second.emplace_back(std::move(execution_kernel_desc));
207  }
208  }
209 }
210 
212  const RelAlgExecutionUnit& ra_exe_unit,
213  const std::vector<uint64_t>& frag_offsets,
214  const int device_count,
215  const size_t num_bytes_for_row,
216  const ExecutorDeviceType& device_type,
217  Executor* executor) {
218  const auto& catalog = executor->getCatalog();
219 
220  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
221  auto const& table_desc = ra_exe_unit.input_descs[j];
222  int const table_id = table_desc.getTableId();
223  TableFragments const* fragments = selected_tables_fragments_.at(table_id);
224 
225  auto& data_mgr = catalog->getDataMgr();
226  ChunkMetadataVector deleted_chunk_metadata_vec;
227  if (table_id > 0) {
228  // Temporary tables will not have a table descriptor and not have deleted rows.
229  const auto deleted_cd = executor->plan_state_->getDeletedColForTable(table_id);
230  if (deleted_cd) {
231  ChunkKey chunk_key_prefix = {
232  catalog->getCurrentDB().dbId, table_id, deleted_cd->columnId};
233  data_mgr.getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
234  chunk_key_prefix);
235  }
236  }
237 
239  ra_exe_unit,
240  table_desc,
241  frag_offsets,
242  device_count,
243  num_bytes_for_row,
244  {},
245  j,
246  device_type,
247  executor);
248 
249  std::vector<int> table_ids =
250  std::accumulate(execution_kernels_per_device_[0].begin(),
252  std::vector<int>(),
253  [](auto&& vec, auto& exe_kern) {
254  vec.push_back(exe_kern.fragments[0].table_id);
255  return vec;
256  });
257  VLOG(1) << "execution_kernels_per_device_.size()="
259  << " execution_kernels_per_device_[0][*].fragments[0].table_id="
260  << shared::printContainer(table_ids);
261  }
262 }
263 
265  const RelAlgExecutionUnit& ra_exe_unit,
266  const std::vector<uint64_t>& frag_offsets,
267  const int device_count,
268  const size_t num_bytes_for_row,
269  const ExecutorDeviceType& device_type,
270  Executor* executor) {
271  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
272  const int outer_table_id = outer_table_desc.getTableId();
273  auto it = selected_tables_fragments_.find(outer_table_id);
274  CHECK(it != selected_tables_fragments_.end());
275  const auto outer_fragments = it->second;
276  outer_fragments_size_ = outer_fragments->size();
277 
278  const auto& catalog = executor->getCatalog();
279 
280  auto& data_mgr = catalog->getDataMgr();
281  ChunkMetadataVector deleted_chunk_metadata_vec;
282 
283  if (outer_table_id > 0) {
284  // Temporary tables will not have a table descriptor and not have deleted rows.
285  const auto deleted_cd = executor->plan_state_->getDeletedColForTable(outer_table_id);
286  if (deleted_cd) {
287  ChunkKey chunk_key_prefix = {
288  catalog->getCurrentDB().dbId, outer_table_id, deleted_cd->columnId};
289  data_mgr.getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
290  chunk_key_prefix);
291  }
292  }
293 
294  buildFragmentPerKernelForTable(outer_fragments,
295  ra_exe_unit,
296  outer_table_desc,
297  frag_offsets,
298  device_count,
299  num_bytes_for_row,
300  deleted_chunk_metadata_vec,
301  std::nullopt,
302  device_type,
303  executor);
304 }
305 
307  const RelAlgExecutionUnit& ra_exe_unit,
308  const std::vector<uint64_t>& frag_offsets,
309  const int device_count,
310  const size_t num_bytes_for_row,
311  const ExecutorDeviceType& device_type,
312  const bool enable_inner_join_fragment_skipping,
313  Executor* executor) {
314  // Allocate all the fragments of the tables involved in the query to available
315  // devices. The basic idea: the device is decided by the outer table in the
316  // query (the first table in a join) and we need to broadcast the fragments
317  // in the inner table to each device. Sharding will change this model.
318  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
319  const int outer_table_id = outer_table_desc.getTableId();
320  auto it = selected_tables_fragments_.find(outer_table_id);
321  CHECK(it != selected_tables_fragments_.end());
322  const auto outer_fragments = it->second;
323  outer_fragments_size_ = outer_fragments->size();
324 
325  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
326 
327  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
328  ++outer_frag_id) {
329  if (!allowed_outer_fragment_indices_.empty()) {
330  if (std::find(allowed_outer_fragment_indices_.begin(),
332  outer_frag_id) == allowed_outer_fragment_indices_.end()) {
333  continue;
334  }
335  }
336 
337  const auto& fragment = (*outer_fragments)[outer_frag_id];
338  auto skip_frag = executor->skipFragment(outer_table_desc,
339  fragment,
340  ra_exe_unit.simple_quals,
341  frag_offsets,
342  outer_frag_id);
343  if (enable_inner_join_fragment_skipping &&
344  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
345  skip_frag = executor->skipFragmentInnerJoins(
346  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
347  }
348  if (skip_frag.first) {
349  continue;
350  }
351  const int device_id =
352  fragment.shard == -1
353  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
354  : fragment.shard % device_count;
355  if (device_type == ExecutorDeviceType::GPU) {
356  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
357  }
358  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
359  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
360  auto table_frags_it = selected_tables_fragments_.find(table_id);
361  CHECK(table_frags_it != selected_tables_fragments_.end());
362  const auto frag_ids =
363  executor->getTableFragmentIndices(ra_exe_unit,
364  device_type,
365  j,
366  outer_frag_id,
368  inner_table_id_to_join_condition);
369 
370  if (execution_kernels_per_device_.find(device_id) ==
372  std::vector<ExecutionKernelDescriptor> kernel_descs{
373  ExecutionKernelDescriptor{device_id, FragmentsList{}, std::nullopt}};
374  CHECK(
375  execution_kernels_per_device_.insert(std::make_pair(device_id, kernel_descs))
376  .second);
377  }
378 
379  // Multifrag kernels only have one execution kernel per device. Grab the execution
380  // kernel object and push back into its fragments list.
381  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
382  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
383 
384  auto& kernel_frag_list = execution_kernel.fragments;
385  if (kernel_frag_list.size() < j + 1) {
386  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
387  } else {
388  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
389  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
390  for (const int frag_id : frag_ids) {
391  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
392  curr_frag_ids.end()) {
393  curr_frag_ids.push_back(frag_id);
394  }
395  }
396  }
397  }
398  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
399  }
400 }
401 
402 namespace {
403 
404 bool is_sample_query(const RelAlgExecutionUnit& ra_exe_unit) {
405  const bool result = ra_exe_unit.input_descs.size() == 1 &&
406  ra_exe_unit.simple_quals.empty() && ra_exe_unit.quals.empty() &&
407  ra_exe_unit.sort_info.order_entries.empty() &&
408  ra_exe_unit.scan_limit;
409  if (result) {
410  CHECK_EQ(size_t(1), ra_exe_unit.groupby_exprs.size());
411  CHECK(!ra_exe_unit.groupby_exprs.front());
412  }
413  return result;
414 }
415 
416 } // namespace
417 
419  size_t& tuple_count,
420  const RelAlgExecutionUnit& ra_exe_unit,
421  const ExecutionKernelDescriptor& kernel) const {
422  const auto sample_query_limit =
423  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
424  if (!kernel.outer_tuple_count) {
425  return false;
426  } else {
427  tuple_count += *kernel.outer_tuple_count;
428  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
429  tuple_count >= sample_query_limit) {
430  return true;
431  }
432  }
433  return false;
434 }
435 
437  const Fragmenter_Namespace::FragmentInfo& fragment,
438  const int device_id,
439  const size_t num_bytes_for_row) {
440  if (g_cluster) {
441  // Disabled in distributed mode for now
442  return;
443  }
444  CHECK_GE(device_id, 0);
445  tuple_count_per_device_[device_id] += fragment.getNumTuples();
446  const size_t gpu_bytes_limit =
448  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
449  LOG(WARNING) << "Not enough memory on device " << device_id
450  << " for input chunks totaling "
451  << tuple_count_per_device_[device_id] * num_bytes_for_row
452  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
453  throw QueryMustRunOnCpu();
454  }
455 }
456 
457 std::ostream& operator<<(std::ostream& os, FragmentsPerTable const& fragments_per_table) {
458  os << "table_id(" << fragments_per_table.table_id << ") fragment_ids";
459  for (size_t i = 0; i < fragments_per_table.fragment_ids.size(); ++i) {
460  os << (i ? ' ' : '(') << fragments_per_table.fragment_ids[i];
461  }
462  return os << ')';
463 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent, const std::vector< size_t > allowed_outer_fragment_indices)
std::optional< size_t > outer_tuple_count
std::map< size_t, size_t > tuple_count_per_device_
ExecutorDeviceType
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:188
const std::list< Analyzer::OrderEntry > order_entries
std::vector< Fragmenter_Namespace::FragmentInfo > TableFragments
static void computeAllTablesFragments(std::map< int, const TableFragments *> &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< FragmentsPerTable > FragmentsList
const size_t limit
const SortInfo sort_info
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernelDescriptor &kernel) const
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::ostream & operator<<(std::ostream &os, FragmentsPerTable const &fragments_per_table)
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< size_t > allowed_outer_fragment_indices_
bool g_cluster
void buildFragmentPerKernelMapForUnion(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
std::vector< size_t > fragment_ids
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const size_t offset
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
void buildFragmentPerKernelForTable(const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for an execution kernel.
#define VLOG(n)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals