OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryFragmentDescriptor Class Reference

#include <QueryFragmentDescriptor.h>

Public Member Functions

 QueryFragmentDescriptor (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent, const std::vector< size_t > allowed_outer_fragment_indices)
 
void buildFragmentKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
 
template<typename DISPATCH_FCN >
void assignFragsToMultiDispatch (DISPATCH_FCN f) const
 
template<typename DISPATCH_FCN >
void assignFragsToKernelDispatch (DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
 
bool shouldCheckWorkUnitWatchdog () const
 

Static Public Member Functions

static void computeAllTablesFragments (std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
 

Protected Member Functions

void buildFragmentPerKernelMapForUnion (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
 
void buildFragmentPerKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
 
void buildMultifragKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
 
void buildFragmentPerKernelForTable (const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const bool is_temporary_table, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
 
bool terminateDispatchMaybe (size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernelDescriptor &kernel) const
 
void checkDeviceMemoryUsage (const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
 

Protected Attributes

std::vector< size_t > allowed_outer_fragment_indices_
 
size_t outer_fragments_size_ = 0
 
int64_t rowid_lookup_key_ = -1
 
std::map< int, const
TableFragments * > 
selected_tables_fragments_
 
std::map< int, std::vector
< ExecutionKernelDescriptor > > 
execution_kernels_per_device_
 
double gpu_input_mem_limit_percent_
 
std::map< size_t, size_t > tuple_count_per_device_
 
std::map< size_t, size_t > available_gpu_mem_bytes_
 

Detailed Description

Definition at line 67 of file QueryFragmentDescriptor.h.

Constructor & Destructor Documentation

QueryFragmentDescriptor::QueryFragmentDescriptor ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos,
const std::vector< Data_Namespace::MemoryInfo > &  gpu_mem_infos,
const double  gpu_input_mem_limit_percent,
const std::vector< size_t >  allowed_outer_fragment_indices 
)

Definition at line 25 of file QueryFragmentDescriptor.cpp.

References available_gpu_mem_bytes_, CHECK_EQ, RelAlgExecutionUnit::input_descs, and selected_tables_fragments_.

31  : allowed_outer_fragment_indices_(allowed_outer_fragment_indices)
32  , gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
33  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
34  CHECK_EQ(query_infos.size(), input_desc_count);
35  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
36  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
37  if (!selected_tables_fragments_.count(table_id)) {
38  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
39  &query_infos[table_idx].info.fragments;
40  }
41  }
42 
43  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
44  const auto& gpu_mem_info = gpu_mem_infos[device_id];
45  available_gpu_mem_bytes_[device_id] =
46  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
47  }
48 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< InputDescriptor > input_descs
std::vector< size_t > allowed_outer_fragment_indices_
std::map< size_t, size_t > available_gpu_mem_bytes_

Member Function Documentation

template<typename DISPATCH_FCN >
void QueryFragmentDescriptor::assignFragsToKernelDispatch ( DISPATCH_FCN  f,
const RelAlgExecutionUnit ra_exe_unit 
) const
inline

Dispatch one fragment for each device. Iterate the device map and dispatch one kernel for each device per iteration. This allows balanced dispatch as well as early termination if the number of rows passing the kernel can be computed at dispatch time and the scan limit is reached.

Definition at line 110 of file QueryFragmentDescriptor.h.

References CHECK, execution_kernels_per_device_, f, rowid_lookup_key_, and terminateDispatchMaybe().

111  {
112  if (execution_kernels_per_device_.empty()) {
113  return;
114  }
115 
116  size_t tuple_count = 0;
117 
118  std::unordered_map<int, size_t> execution_kernel_index;
119  for (const auto& device_itr : execution_kernels_per_device_) {
120  CHECK(execution_kernel_index.insert(std::make_pair(device_itr.first, size_t(0)))
121  .second);
122  }
123 
124  bool dispatch_finished = false;
125  while (!dispatch_finished) {
126  dispatch_finished = true;
127  for (const auto& device_itr : execution_kernels_per_device_) {
128  auto& kernel_idx = execution_kernel_index[device_itr.first];
129  if (kernel_idx < device_itr.second.size()) {
130  dispatch_finished = false;
131  const auto& execution_kernel = device_itr.second[kernel_idx++];
132  f(device_itr.first, execution_kernel.fragments, rowid_lookup_key_);
133  if (terminateDispatchMaybe(tuple_count, ra_exe_unit, execution_kernel)) {
134  return;
135  }
136  }
137  }
138  }
139  }
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernelDescriptor &kernel) const
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
#define CHECK(condition)
Definition: Logger.h:211
char * f

+ Here is the call graph for this function:

template<typename DISPATCH_FCN >
void QueryFragmentDescriptor::assignFragsToMultiDispatch ( DISPATCH_FCN  f) const
inline

Dispatch multi-fragment kernels. Currently GPU only. Each GPU should have only one kernel, with multiple fragments in its fragments list.

Definition at line 93 of file QueryFragmentDescriptor.h.

References CHECK_EQ, execution_kernels_per_device_, f, and rowid_lookup_key_.

93  {
94  for (const auto& device_itr : execution_kernels_per_device_) {
95  const auto& execution_kernels = device_itr.second;
96  CHECK_EQ(execution_kernels.size(), size_t(1));
97 
98  const auto& fragments_list = execution_kernels.front().fragments;
99  f(device_itr.first, fragments_list, rowid_lookup_key_);
100  }
101  }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
char * f
void QueryFragmentDescriptor::buildFragmentKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const ExecutorDeviceType device_type,
const bool  enable_multifrag_kernels,
const bool  enable_inner_join_fragment_skipping,
Executor executor 
)

Definition at line 64 of file QueryFragmentDescriptor.cpp.

References buildFragmentPerKernelMap(), buildFragmentPerKernelMapForUnion(), buildMultifragKernelMap(), RelAlgExecutionUnit::input_descs, and RelAlgExecutionUnit::union_all.

71  {
72  // For joins, only consider the cardinality of the LHS
73  // columns in the bytes per row count.
74  std::set<int> lhs_table_ids;
75  for (const auto& input_desc : ra_exe_unit.input_descs) {
76  if (input_desc.getNestLevel() == 0) {
77  lhs_table_ids.insert(input_desc.getTableId());
78  }
79  }
80 
81  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow(lhs_table_ids);
82 
83  if (ra_exe_unit.union_all) {
85  frag_offsets,
86  device_count,
87  num_bytes_for_row,
88  device_type,
89  executor);
90  } else if (enable_multifrag_kernels) {
91  buildMultifragKernelMap(ra_exe_unit,
92  frag_offsets,
93  device_count,
94  num_bytes_for_row,
95  device_type,
96  enable_inner_join_fragment_skipping,
97  executor);
98  } else {
99  buildFragmentPerKernelMap(ra_exe_unit,
100  frag_offsets,
101  device_count,
102  num_bytes_for_row,
103  device_type,
104  executor);
105  }
106 }
const std::optional< bool > union_all
std::vector< InputDescriptor > input_descs
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
void buildFragmentPerKernelMapForUnion(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)

+ Here is the call graph for this function:

void QueryFragmentDescriptor::buildFragmentPerKernelForTable ( const TableFragments fragments,
const RelAlgExecutionUnit ra_exe_unit,
const InputDescriptor table_desc,
const bool  is_temporary_table,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const size_t  num_bytes_for_row,
const ChunkMetadataVector deleted_chunk_metadata_vec,
const std::optional< size_t >  table_desc_offset,
const ExecutorDeviceType device_type,
Executor executor 
)
protected

Definition at line 108 of file QueryFragmentDescriptor.cpp.

References allowed_outer_fragment_indices_, CHECK, CHECK_GE, CHECK_GT, checkDeviceMemoryUsage(), CPU, Data_Namespace::CPU_LEVEL, execution_kernels_per_device_, GPU, Data_Namespace::GPU_LEVEL, i, RelAlgExecutionUnit::input_descs, rowid_lookup_key_, selected_tables_fragments_, and RelAlgExecutionUnit::simple_quals.

Referenced by buildFragmentPerKernelMap(), and buildFragmentPerKernelMapForUnion().

119  {
120  auto get_fragment_tuple_count = [&deleted_chunk_metadata_vec, &is_temporary_table](
121  const auto& fragment) -> std::optional<size_t> {
122  // returning std::nullopt disables execution dispatch optimizations based on tuple
123  // counts as it signals to the dispatch mechanism that a reliable tuple count cannot
124  // be obtained. This is the case for fragments which have deleted rows, temporary
125  // table fragments, or fragments in a UNION query.
126  if (is_temporary_table) {
127  // 31 Mar 2021 MAT TODO I think that the fragment Tuple count should be ok
128  // need to double check that at some later date
129  return std::nullopt;
130  }
131  if (deleted_chunk_metadata_vec.empty()) {
132  return fragment.getNumTuples();
133  }
134  const auto fragment_id = fragment.fragmentId;
135  CHECK_GE(fragment_id, 0);
136  if (static_cast<size_t>(fragment_id) < deleted_chunk_metadata_vec.size()) {
137  const auto& chunk_metadata = deleted_chunk_metadata_vec[fragment_id];
138  if (chunk_metadata.second->chunkStats.max.tinyintval == 1) {
139  return std::nullopt;
140  }
141  }
142  return fragment.getNumTuples();
143  };
144 
145  for (size_t i = 0; i < fragments->size(); i++) {
146  if (!allowed_outer_fragment_indices_.empty()) {
147  if (std::find(allowed_outer_fragment_indices_.begin(),
150  continue;
151  }
152  }
153 
154  const auto& fragment = (*fragments)[i];
155  const auto skip_frag = executor->skipFragment(
156  table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
157  if (skip_frag.first) {
158  continue;
159  }
160  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
161  const int chosen_device_count =
162  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
163  CHECK_GT(chosen_device_count, 0);
164  const auto memory_level = device_type == ExecutorDeviceType::GPU
167  const int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
168  ? fragment.deviceIds[static_cast<int>(memory_level)]
169  : fragment.shard % chosen_device_count;
170 
171  if (device_type == ExecutorDeviceType::GPU) {
172  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
173  }
174 
175  ExecutionKernelDescriptor execution_kernel_desc{
176  device_id, {}, get_fragment_tuple_count(fragment)};
177  if (table_desc_offset) {
178  const auto frag_ids =
179  executor->getTableFragmentIndices(ra_exe_unit,
180  device_type,
181  *table_desc_offset,
182  i,
184  executor->getInnerTabIdToJoinCond());
185  const auto table_id = ra_exe_unit.input_descs[*table_desc_offset].getTableId();
186  execution_kernel_desc.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
187 
188  } else {
189  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
190  const auto frag_ids =
191  executor->getTableFragmentIndices(ra_exe_unit,
192  device_type,
193  j,
194  i,
196  executor->getInnerTabIdToJoinCond());
197  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
198  auto table_frags_it = selected_tables_fragments_.find(table_id);
199  CHECK(table_frags_it != selected_tables_fragments_.end());
200 
201  execution_kernel_desc.fragments.emplace_back(
202  FragmentsPerTable{table_id, frag_ids});
203  }
204  }
205 
206  auto itr = execution_kernels_per_device_.find(device_id);
207  if (itr == execution_kernels_per_device_.end()) {
208  auto const pair = execution_kernels_per_device_.insert(std::make_pair(
209  device_id,
210  std::vector<ExecutionKernelDescriptor>{std::move(execution_kernel_desc)}));
211  CHECK(pair.second);
212  } else {
213  itr->second.emplace_back(std::move(execution_kernel_desc));
214  }
215  }
216 }
std::map< int, const TableFragments * > selected_tables_fragments_
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:224
#define CHECK_GT(x, y)
Definition: Logger.h:223
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
#define CHECK(condition)
Definition: Logger.h:211
std::vector< size_t > allowed_outer_fragment_indices_
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildFragmentPerKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const size_t  num_bytes_for_row,
const ExecutorDeviceType device_type,
Executor executor 
)
protected

Definition at line 283 of file QueryFragmentDescriptor.cpp.

References buildFragmentPerKernelForTable(), CHECK, RelAlgExecutionUnit::input_descs, outer_fragments_size_, selected_tables_fragments_, and table_is_temporary().

Referenced by buildFragmentKernelMap().

289  {
290  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
291  const int outer_table_id = outer_table_desc.getTableId();
292  auto it = selected_tables_fragments_.find(outer_table_id);
293  CHECK(it != selected_tables_fragments_.end());
294  const auto outer_fragments = it->second;
295  outer_fragments_size_ = outer_fragments->size();
296 
297  const auto& catalog = executor->getCatalog();
298 
299  ChunkMetadataVector deleted_chunk_metadata_vec;
300 
301  bool is_temporary_table = false;
302  if (outer_table_id > 0) {
303  // Temporary tables will not have a table descriptor and not have deleted rows.
304  const auto td = catalog->getMetadataForTable(outer_table_id);
305  CHECK(td);
306  if (table_is_temporary(td)) {
307  // for temporary tables, we won't have delete column metadata available. However, we
308  // know the table fits in memory as it is a temporary table, so signal to the lower
309  // layers that we can disregard the early out select * optimization
310  is_temporary_table = true;
311  } else {
312  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
313  if (deleted_cd) {
314  // 01 Apr 2021 MAT TODO this code is called on logical tables (ie not the shards)
315  // I wonder if this makes sense in those cases
316  td->fragmenter->getFragmenterId();
317  auto frags = td->fragmenter->getFragmentsForQuery().fragments;
318  for (auto frag : frags) {
319  auto chunk_meta_it =
320  frag.getChunkMetadataMapPhysical().find(deleted_cd->columnId);
321  if (chunk_meta_it != frag.getChunkMetadataMapPhysical().end()) {
322  const auto& chunk_meta = chunk_meta_it->second;
323  ChunkKey chunk_key_prefix = {catalog->getCurrentDB().dbId,
324  outer_table_id,
325  deleted_cd->columnId,
326  frag.fragmentId};
327  deleted_chunk_metadata_vec.emplace_back(
328  std::pair{chunk_key_prefix, chunk_meta});
329  }
330  }
331  }
332  }
333  }
334 
335  buildFragmentPerKernelForTable(outer_fragments,
336  ra_exe_unit,
337  outer_table_desc,
338  is_temporary_table,
339  frag_offsets,
340  device_count,
341  num_bytes_for_row,
342  deleted_chunk_metadata_vec,
343  std::nullopt,
344  device_type,
345  executor);
346 }
std::map< int, const TableFragments * > selected_tables_fragments_
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< InputDescriptor > input_descs
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool table_is_temporary(const TableDescriptor *const td)
void buildFragmentPerKernelForTable(const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const bool is_temporary_table, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildFragmentPerKernelMapForUnion ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const size_t  num_bytes_for_row,
const ExecutorDeviceType device_type,
Executor executor 
)
protected

Definition at line 218 of file QueryFragmentDescriptor.cpp.

References gpu_enabled::accumulate(), buildFragmentPerKernelForTable(), CHECK, execution_kernels_per_device_, RelAlgExecutionUnit::input_descs, shared::printContainer(), selected_tables_fragments_, table_is_temporary(), and VLOG.

Referenced by buildFragmentKernelMap().

224  {
225  const auto& catalog = executor->getCatalog();
226 
227  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
228  auto const& table_desc = ra_exe_unit.input_descs[j];
229  int const table_id = table_desc.getTableId();
230  TableFragments const* fragments = selected_tables_fragments_.at(table_id);
231 
232  auto data_mgr = executor->getDataMgr();
233  ChunkMetadataVector deleted_chunk_metadata_vec;
234 
235  bool is_temporary_table = false;
236  if (table_id > 0) {
237  // Temporary tables will not have a table descriptor and not have deleted rows.
238  const auto td = catalog->getMetadataForTable(table_id);
239  CHECK(td);
240  if (table_is_temporary(td)) {
241  // for temporary tables, we won't have delete column metadata available. However,
242  // we know the table fits in memory as it is a temporary table, so signal to the
243  // lower layers that we can disregard the early out select * optimization
244  is_temporary_table = true;
245  } else {
246  const auto deleted_cd = executor->plan_state_->getDeletedColForTable(table_id);
247  if (deleted_cd) {
248  ChunkKey chunk_key_prefix = {
249  catalog->getCurrentDB().dbId, table_id, deleted_cd->columnId};
250  data_mgr->getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
251  chunk_key_prefix);
252  }
253  }
254  }
255 
257  ra_exe_unit,
258  table_desc,
259  is_temporary_table,
260  frag_offsets,
261  device_count,
262  num_bytes_for_row,
263  {},
264  j,
265  device_type,
266  executor);
267 
268  std::vector<int> table_ids =
271  std::vector<int>(),
272  [](auto&& vec, auto& exe_kern) {
273  vec.push_back(exe_kern.fragments[0].table_id);
274  return vec;
275  });
276  VLOG(1) << "execution_kernels_per_device_.size()="
278  << " execution_kernels_per_device_[0][*].fragments[0].table_id="
279  << shared::printContainer(table_ids);
280  }
281 }
std::map< int, const TableFragments * > selected_tables_fragments_
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< Fragmenter_Namespace::FragmentInfo > TableFragments
std::vector< InputDescriptor > input_descs
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool table_is_temporary(const TableDescriptor *const td)
void buildFragmentPerKernelForTable(const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const bool is_temporary_table, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:211
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:104
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildMultifragKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const size_t  num_bytes_for_row,
const ExecutorDeviceType device_type,
const bool  enable_inner_join_fragment_skipping,
Executor executor 
)
protected

Definition at line 348 of file QueryFragmentDescriptor.cpp.

References allowed_outer_fragment_indices_, CHECK, CHECK_EQ, checkDeviceMemoryUsage(), execution_kernels_per_device_, GPU, Data_Namespace::GPU_LEVEL, if(), RelAlgExecutionUnit::input_descs, outer_fragments_size_, rowid_lookup_key_, selected_tables_fragments_, and RelAlgExecutionUnit::simple_quals.

Referenced by buildFragmentKernelMap().

355  {
356  // Allocate all the fragments of the tables involved in the query to available
357  // devices. The basic idea: the device is decided by the outer table in the
358  // query (the first table in a join) and we need to broadcast the fragments
359  // in the inner table to each device. Sharding will change this model.
360  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
361  const int outer_table_id = outer_table_desc.getTableId();
362  auto it = selected_tables_fragments_.find(outer_table_id);
363  CHECK(it != selected_tables_fragments_.end());
364  const auto outer_fragments = it->second;
365  outer_fragments_size_ = outer_fragments->size();
366 
367  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
368 
369  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
370  ++outer_frag_id) {
371  if (!allowed_outer_fragment_indices_.empty()) {
372  if (std::find(allowed_outer_fragment_indices_.begin(),
374  outer_frag_id) == allowed_outer_fragment_indices_.end()) {
375  continue;
376  }
377  }
378 
379  const auto& fragment = (*outer_fragments)[outer_frag_id];
380  auto skip_frag = executor->skipFragment(outer_table_desc,
381  fragment,
382  ra_exe_unit.simple_quals,
383  frag_offsets,
384  outer_frag_id);
385  if (enable_inner_join_fragment_skipping &&
386  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
387  skip_frag = executor->skipFragmentInnerJoins(
388  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
389  }
390  if (skip_frag.first) {
391  continue;
392  }
393  const int device_id =
394  fragment.shard == -1
395  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
396  : fragment.shard % device_count;
397  if (device_type == ExecutorDeviceType::GPU) {
398  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
399  }
400  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
401  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
402  auto table_frags_it = selected_tables_fragments_.find(table_id);
403  CHECK(table_frags_it != selected_tables_fragments_.end());
404  const auto frag_ids =
405  executor->getTableFragmentIndices(ra_exe_unit,
406  device_type,
407  j,
408  outer_frag_id,
410  inner_table_id_to_join_condition);
411 
412  if (execution_kernels_per_device_.find(device_id) ==
414  std::vector<ExecutionKernelDescriptor> kernel_descs{
415  ExecutionKernelDescriptor{device_id, FragmentsList{}, std::nullopt}};
416  CHECK(
417  execution_kernels_per_device_.insert(std::make_pair(device_id, kernel_descs))
418  .second);
419  }
420 
421  // Multifrag kernels only have one execution kernel per device. Grab the execution
422  // kernel object and push back into its fragments list.
423  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
424  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
425 
426  auto& kernel_frag_list = execution_kernel.fragments;
427  if (kernel_frag_list.size() < j + 1) {
428  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
429  } else {
430  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
431  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
432  for (const int frag_id : frag_ids) {
433  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
434  curr_frag_ids.end()) {
435  curr_frag_ids.push_back(frag_id);
436  }
437  }
438  }
439  }
440  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
441  }
442 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< InputDescriptor > input_descs
std::vector< FragmentsPerTable > FragmentsList
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
#define CHECK(condition)
Definition: Logger.h:211
std::vector< size_t > allowed_outer_fragment_indices_
if(yyssp >=yyss+yystacksize-1)
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::checkDeviceMemoryUsage ( const Fragmenter_Namespace::FragmentInfo fragment,
const int  device_id,
const size_t  num_cols 
)
protected

Definition at line 478 of file QueryFragmentDescriptor.cpp.

References available_gpu_mem_bytes_, CHECK_GE, g_cluster, Fragmenter_Namespace::FragmentInfo::getNumTuples(), gpu_input_mem_limit_percent_, LOG, tuple_count_per_device_, and logger::WARNING.

Referenced by buildFragmentPerKernelForTable(), and buildMultifragKernelMap().

481  {
482  if (g_cluster) {
483  // Disabled in distributed mode for now
484  return;
485  }
486  CHECK_GE(device_id, 0);
487  tuple_count_per_device_[device_id] += fragment.getNumTuples();
488  const size_t gpu_bytes_limit =
490  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
491  LOG(WARNING) << "Not enough memory on device " << device_id
492  << " for input chunks totaling "
493  << tuple_count_per_device_[device_id] * num_bytes_for_row
494  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
495  throw QueryMustRunOnCpu();
496  }
497 }
std::map< size_t, size_t > tuple_count_per_device_
#define LOG(tag)
Definition: Logger.h:205
#define CHECK_GE(x, y)
Definition: Logger.h:224
bool g_cluster
std::map< size_t, size_t > available_gpu_mem_bytes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::computeAllTablesFragments ( std::map< int, const TableFragments * > &  all_tables_fragments,
const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos 
)
static

Definition at line 50 of file QueryFragmentDescriptor.cpp.

References CHECK_EQ, and RelAlgExecutionUnit::input_descs.

Referenced by ExecutionKernel::runImpl().

53  {
54  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
55  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
56  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
57  const auto& fragments = query_infos[tab_idx].info.fragments;
58  if (!all_tables_fragments.count(table_id)) {
59  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
60  }
61  }
62 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< InputDescriptor > input_descs

+ Here is the caller graph for this function:

bool QueryFragmentDescriptor::shouldCheckWorkUnitWatchdog ( ) const
inline

Definition at line 141 of file QueryFragmentDescriptor.h.

References execution_kernels_per_device_, and rowid_lookup_key_.

141  {
142  return rowid_lookup_key_ < 0 && !execution_kernels_per_device_.empty();
143  }
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
bool QueryFragmentDescriptor::terminateDispatchMaybe ( size_t &  tuple_count,
const RelAlgExecutionUnit ra_exe_unit,
const ExecutionKernelDescriptor kernel 
) const
protected

Definition at line 460 of file QueryFragmentDescriptor.cpp.

References anonymous_namespace{QueryFragmentDescriptor.cpp}::is_sample_query(), SortInfo::limit, SortInfo::offset, ExecutionKernelDescriptor::outer_tuple_count, and RelAlgExecutionUnit::sort_info.

Referenced by assignFragsToKernelDispatch().

463  {
464  const auto sample_query_limit =
465  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
466  if (!kernel.outer_tuple_count) {
467  return false;
468  } else {
469  tuple_count += *kernel.outer_tuple_count;
470  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
471  tuple_count >= sample_query_limit) {
472  return true;
473  }
474  }
475  return false;
476 }
std::optional< size_t > outer_tuple_count
const size_t limit
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
const size_t offset

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<size_t> QueryFragmentDescriptor::allowed_outer_fragment_indices_
protected
std::map<size_t, size_t> QueryFragmentDescriptor::available_gpu_mem_bytes_
protected

Definition at line 156 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage(), and QueryFragmentDescriptor().

std::map<int, std::vector<ExecutionKernelDescriptor> > QueryFragmentDescriptor::execution_kernels_per_device_
protected
double QueryFragmentDescriptor::gpu_input_mem_limit_percent_
protected

Definition at line 154 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage().

size_t QueryFragmentDescriptor::outer_fragments_size_ = 0
protected
int64_t QueryFragmentDescriptor::rowid_lookup_key_ = -1
protected
std::map<int, const TableFragments*> QueryFragmentDescriptor::selected_tables_fragments_
protected
std::map<size_t, size_t> QueryFragmentDescriptor::tuple_count_per_device_
protected

Definition at line 155 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage().


The documentation for this class was generated from the following files: