OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryFragmentDescriptor Class Reference

#include <QueryFragmentDescriptor.h>

Public Member Functions

 QueryFragmentDescriptor (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent)
 
void buildFragmentKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
 
template<typename DISPATCH_FCN >
void assignFragsToMultiDispatch (DISPATCH_FCN f) const
 
template<typename DISPATCH_FCN >
void assignFragsToKernelDispatch (DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
 
bool shouldCheckWorkUnitWatchdog () const
 

Static Public Member Functions

static void computeAllTablesFragments (std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
 

Protected Member Functions

void buildFragmentPerKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, Executor *executor)
 
void buildMultifragKernelMap (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
 
bool terminateDispatchMaybe (size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernel &kernel) const
 
void checkDeviceMemoryUsage (const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
 

Protected Attributes

size_t outer_fragments_size_ = 0
 
int64_t rowid_lookup_key_ = -1
 
std::map< int, const
TableFragments * > 
selected_tables_fragments_
 
std::map< int, std::vector
< ExecutionKernel > > 
execution_kernels_per_device_
 
double gpu_input_mem_limit_percent_
 
std::map< size_t, size_t > tuple_count_per_device_
 
std::map< size_t, size_t > available_gpu_mem_bytes_
 

Detailed Description

Definition at line 64 of file QueryFragmentDescriptor.h.

Constructor & Destructor Documentation

QueryFragmentDescriptor::QueryFragmentDescriptor ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos,
const std::vector< Data_Namespace::MemoryInfo > &  gpu_mem_infos,
const double  gpu_input_mem_limit_percent 
)

Definition at line 22 of file QueryFragmentDescriptor.cpp.

References available_gpu_mem_bytes_, CHECK_EQ, RelAlgExecutionUnit::input_descs, and selected_tables_fragments_.

27  : gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
28  const size_t input_desc_count{ra_exe_unit.input_descs.size()};
29  CHECK_EQ(query_infos.size(), input_desc_count);
30  for (size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
31  const auto table_id = ra_exe_unit.input_descs[table_idx].getTableId();
32  if (!selected_tables_fragments_.count(table_id)) {
33  selected_tables_fragments_[ra_exe_unit.input_descs[table_idx].getTableId()] =
34  &query_infos[table_idx].info.fragments;
35  }
36  }
37 
38  for (size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
39  const auto& gpu_mem_info = gpu_mem_infos[device_id];
40  available_gpu_mem_bytes_[device_id] =
41  gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
42  }
43 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const std::vector< InputDescriptor > input_descs
std::map< size_t, size_t > available_gpu_mem_bytes_

Member Function Documentation

template<typename DISPATCH_FCN >
void QueryFragmentDescriptor::assignFragsToKernelDispatch ( DISPATCH_FCN  f,
const RelAlgExecutionUnit ra_exe_unit 
) const
inline

Dispatch one fragment for each device. Iterate the device map and dispatch one kernel for each device per iteration. This allows balanced dispatch as well as early termination if the number of rows passing the kernel can be computed at dispatch time and the scan limit is reached.

Definition at line 106 of file QueryFragmentDescriptor.h.

References CHECK(), execution_kernels_per_device_, rowid_lookup_key_, and terminateDispatchMaybe().

Referenced by Executor::dispatchFragments().

107  {
108  if (execution_kernels_per_device_.empty()) {
109  return;
110  }
111 
112  size_t tuple_count = 0;
113 
114  std::unordered_map<int, size_t> execution_kernel_index;
115  for (const auto& device_itr : execution_kernels_per_device_) {
116  CHECK(execution_kernel_index.insert(std::make_pair(device_itr.first, size_t(0)))
117  .second);
118  }
119 
120  bool dispatch_finished = false;
121  while (!dispatch_finished) {
122  dispatch_finished = true;
123  for (const auto& device_itr : execution_kernels_per_device_) {
124  auto& kernel_idx = execution_kernel_index[device_itr.first];
125  if (kernel_idx < device_itr.second.size()) {
126  dispatch_finished = false;
127  const auto& execution_kernel = device_itr.second[kernel_idx++];
128  f(device_itr.first, execution_kernel.fragments, rowid_lookup_key_);
129 
130  if (terminateDispatchMaybe(tuple_count, ra_exe_unit, execution_kernel)) {
131  return;
132  }
133  }
134  }
135  }
136  }
CHECK(cgen_state)
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernel &kernel) const
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DISPATCH_FCN >
void QueryFragmentDescriptor::assignFragsToMultiDispatch ( DISPATCH_FCN  f) const
inline

Dispatch multi-fragment kernels. Currently GPU only. Each GPU should have only one kernel, with multiple fragments in its fragments list.

Definition at line 89 of file QueryFragmentDescriptor.h.

References CHECK_EQ, execution_kernels_per_device_, and rowid_lookup_key_.

Referenced by Executor::dispatchFragments().

89  {
90  for (const auto& device_itr : execution_kernels_per_device_) {
91  const auto& execution_kernels = device_itr.second;
92  CHECK_EQ(execution_kernels.size(), size_t(1));
93 
94  const auto& fragments_list = execution_kernels.front().fragments;
95  f(device_itr.first, fragments_list, rowid_lookup_key_);
96  }
97  }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildFragmentKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const ExecutorDeviceType device_type,
const bool  enable_multifrag_kernels,
const bool  enable_inner_join_fragment_skipping,
Executor executor 
)

Definition at line 59 of file QueryFragmentDescriptor.cpp.

References buildFragmentPerKernelMap(), and buildMultifragKernelMap().

Referenced by Executor::dispatchFragments().

66  {
67  if (enable_multifrag_kernels) {
68  buildMultifragKernelMap(ra_exe_unit,
69  frag_offsets,
70  device_count,
71  device_type,
72  enable_inner_join_fragment_skipping,
73  executor);
74  } else {
76  ra_exe_unit, frag_offsets, device_count, device_type, executor);
77  }
78 }
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, Executor *executor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildFragmentPerKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const ExecutorDeviceType device_type,
Executor executor 
)
protected

Definition at line 93 of file QueryFragmentDescriptor.cpp.

References CHECK(), CHECK_GT, checkDeviceMemoryUsage(), anonymous_namespace{QueryFragmentDescriptor.cpp}::compute_fragment_tuple_count(), CPU, Data_Namespace::CPU_LEVEL, execution_kernels_per_device_, GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::input_descs, outer_fragments_size_, rowid_lookup_key_, selected_tables_fragments_, and RelAlgExecutionUnit::simple_quals.

Referenced by buildFragmentKernelMap().

98  {
99  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
100  const int outer_table_id = outer_table_desc.getTableId();
101  auto it = selected_tables_fragments_.find(outer_table_id);
102  CHECK(it != selected_tables_fragments_.end());
103  const auto outer_fragments = it->second;
104  outer_fragments_size_ = outer_fragments->size();
105 
106  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
107 
108  const ColumnDescriptor* deleted_cd{nullptr};
109  if (outer_table_id > 0) {
110  // Temporary tables will not have a table descriptor and will also not have deleted
111  // rows.
112  const auto& catalog = executor->getCatalog();
113  const auto td = catalog->getMetadataForTable(outer_table_id);
114  CHECK(td);
115  deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
116  }
117 
118  for (size_t i = 0; i < outer_fragments->size(); ++i) {
119  const auto& fragment = (*outer_fragments)[i];
120  const auto skip_frag = executor->skipFragment(
121  outer_table_desc, fragment, ra_exe_unit.simple_quals, frag_offsets, i);
122  if (skip_frag.first) {
123  continue;
124  }
125  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
126  const int chosen_device_count =
127  device_type == ExecutorDeviceType::CPU ? 1 : device_count;
128  CHECK_GT(chosen_device_count, 0);
129  const auto memory_level = device_type == ExecutorDeviceType::GPU
132  int device_id = (device_type == ExecutorDeviceType::CPU || fragment.shard == -1)
133  ? fragment.deviceIds[static_cast<int>(memory_level)]
134  : fragment.shard % chosen_device_count;
135 
136  if (device_type == ExecutorDeviceType::GPU) {
137  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
138  }
139 
140  ExecutionKernel execution_kernel{
141  device_id, {}, compute_fragment_tuple_count(fragment, deleted_cd)};
142  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
143  const auto frag_ids =
144  executor->getTableFragmentIndices(ra_exe_unit,
145  device_type,
146  j,
147  i,
149  executor->getInnerTabIdToJoinCond());
150  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
151  auto table_frags_it = selected_tables_fragments_.find(table_id);
152  CHECK(table_frags_it != selected_tables_fragments_.end());
153 
154  execution_kernel.fragments.emplace_back(FragmentsPerTable{table_id, frag_ids});
155  }
156 
157  if (execution_kernels_per_device_.find(device_id) ==
160  .insert(std::make_pair(device_id,
161  std::vector<ExecutionKernel>{execution_kernel}))
162  .second);
163  } else {
164  execution_kernels_per_device_[device_id].emplace_back(execution_kernel);
165  }
166  }
167 }
std::map< int, const TableFragments * > selected_tables_fragments_
const std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:202
CHECK(cgen_state)
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
specifies the content in-memory of a row in the column metadata table
std::optional< size_t > compute_fragment_tuple_count(const Fragmenter_Namespace::FragmentInfo &fragment, const ColumnDescriptor *deleted_cd)
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::buildMultifragKernelMap ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< uint64_t > &  frag_offsets,
const int  device_count,
const ExecutorDeviceType device_type,
const bool  enable_inner_join_fragment_skipping,
Executor executor 
)
protected

Definition at line 169 of file QueryFragmentDescriptor.cpp.

References CHECK(), CHECK_EQ, checkDeviceMemoryUsage(), execution_kernels_per_device_, GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::input_descs, outer_fragments_size_, rowid_lookup_key_, selected_tables_fragments_, and RelAlgExecutionUnit::simple_quals.

Referenced by buildFragmentKernelMap().

175  {
176  // Allocate all the fragments of the tables involved in the query to available
177  // devices. The basic idea: the device is decided by the outer table in the
178  // query (the first table in a join) and we need to broadcast the fragments
179  // in the inner table to each device. Sharding will change this model.
180  const auto& outer_table_desc = ra_exe_unit.input_descs.front();
181  const int outer_table_id = outer_table_desc.getTableId();
182  auto it = selected_tables_fragments_.find(outer_table_id);
183  CHECK(it != selected_tables_fragments_.end());
184  const auto outer_fragments = it->second;
185  outer_fragments_size_ = outer_fragments->size();
186 
187  const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
188  const auto num_bytes_for_row = executor->getNumBytesForFetchedRow();
189 
190  for (size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
191  ++outer_frag_id) {
192  const auto& fragment = (*outer_fragments)[outer_frag_id];
193  auto skip_frag = executor->skipFragment(outer_table_desc,
194  fragment,
195  ra_exe_unit.simple_quals,
196  frag_offsets,
197  outer_frag_id);
198  if (enable_inner_join_fragment_skipping &&
199  (skip_frag == std::pair<bool, int64_t>(false, -1))) {
200  skip_frag = executor->skipFragmentInnerJoins(
201  outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
202  }
203  if (skip_frag.first) {
204  continue;
205  }
206  const int device_id =
207  fragment.shard == -1
208  ? fragment.deviceIds[static_cast<int>(Data_Namespace::GPU_LEVEL)]
209  : fragment.shard % device_count;
210  if (device_type == ExecutorDeviceType::GPU) {
211  checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
212  }
213  for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
214  const auto table_id = ra_exe_unit.input_descs[j].getTableId();
215  auto table_frags_it = selected_tables_fragments_.find(table_id);
216  CHECK(table_frags_it != selected_tables_fragments_.end());
217  const auto frag_ids =
218  executor->getTableFragmentIndices(ra_exe_unit,
219  device_type,
220  j,
221  outer_frag_id,
223  inner_table_id_to_join_condition);
224 
225  if (execution_kernels_per_device_.find(device_id) ==
227  std::vector<ExecutionKernel> kernels{
228  ExecutionKernel{device_id, FragmentsList{}, std::nullopt}};
229  CHECK(execution_kernels_per_device_.insert(std::make_pair(device_id, kernels))
230  .second);
231  }
232 
233  // Multifrag kernels only have one execution kernel per device. Grab the execution
234  // kernel object and push back into its fragments list.
235  CHECK_EQ(execution_kernels_per_device_[device_id].size(), size_t(1));
236  auto& execution_kernel = execution_kernels_per_device_[device_id].front();
237 
238  auto& kernel_frag_list = execution_kernel.fragments;
239  if (kernel_frag_list.size() < j + 1) {
240  kernel_frag_list.emplace_back(FragmentsPerTable{table_id, frag_ids});
241  } else {
242  CHECK_EQ(kernel_frag_list[j].table_id, table_id);
243  auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
244  for (const int frag_id : frag_ids) {
245  if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
246  curr_frag_ids.end()) {
247  curr_frag_ids.push_back(frag_id);
248  }
249  }
250  }
251  }
252  rowid_lookup_key_ = std::max(rowid_lookup_key_, skip_frag.second);
253  }
254 }
std::map< int, const TableFragments * > selected_tables_fragments_
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const std::vector< InputDescriptor > input_descs
std::vector< FragmentsPerTable > FragmentsList
CHECK(cgen_state)
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::checkDeviceMemoryUsage ( const Fragmenter_Namespace::FragmentInfo fragment,
const int  device_id,
const size_t  num_cols 
)
protected

Definition at line 290 of file QueryFragmentDescriptor.cpp.

References available_gpu_mem_bytes_, CHECK_GE, g_cluster, Fragmenter_Namespace::FragmentInfo::getNumTuples(), gpu_input_mem_limit_percent_, LOG, tuple_count_per_device_, and logger::WARNING.

Referenced by buildFragmentPerKernelMap(), and buildMultifragKernelMap().

293  {
294  if (g_cluster) {
295  // Disabled in distributed mode for now
296  return;
297  }
298  CHECK_GE(device_id, 0);
299  tuple_count_per_device_[device_id] += fragment.getNumTuples();
300  const size_t gpu_bytes_limit =
302  if (tuple_count_per_device_[device_id] * num_bytes_for_row > gpu_bytes_limit) {
303  LOG(WARNING) << "Not enough memory on device " << device_id
304  << " for input chunks totaling "
305  << tuple_count_per_device_[device_id] * num_bytes_for_row
306  << " bytes (available device memory: " << gpu_bytes_limit << " bytes)";
307  throw QueryMustRunOnCpu();
308  }
309 }
bool g_cluster
std::map< size_t, size_t > tuple_count_per_device_
#define LOG(tag)
Definition: Logger.h:185
#define CHECK_GE(x, y)
Definition: Logger.h:203
std::map< size_t, size_t > available_gpu_mem_bytes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void QueryFragmentDescriptor::computeAllTablesFragments ( std::map< int, const TableFragments * > &  all_tables_fragments,
const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos 
)
static

Definition at line 45 of file QueryFragmentDescriptor.cpp.

References CHECK_EQ, and RelAlgExecutionUnit::input_descs.

Referenced by Executor::ExecutionDispatch::runImpl().

48  {
49  for (size_t tab_idx = 0; tab_idx < ra_exe_unit.input_descs.size(); ++tab_idx) {
50  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
51  CHECK_EQ(query_infos[tab_idx].table_id, table_id);
52  const auto& fragments = query_infos[tab_idx].info.fragments;
53  if (!all_tables_fragments.count(table_id)) {
54  all_tables_fragments.insert(std::make_pair(table_id, &fragments));
55  }
56  }
57 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const std::vector< InputDescriptor > input_descs

+ Here is the caller graph for this function:

bool QueryFragmentDescriptor::shouldCheckWorkUnitWatchdog ( ) const
inline

Definition at line 138 of file QueryFragmentDescriptor.h.

References execution_kernels_per_device_, and rowid_lookup_key_.

Referenced by Executor::dispatchFragments().

138  {
139  return rowid_lookup_key_ < 0 && !execution_kernels_per_device_.empty();
140  }
std::map< int, std::vector< ExecutionKernel > > execution_kernels_per_device_

+ Here is the caller graph for this function:

bool QueryFragmentDescriptor::terminateDispatchMaybe ( size_t &  tuple_count,
const RelAlgExecutionUnit ra_exe_unit,
const ExecutionKernel kernel 
) const
protected

Definition at line 272 of file QueryFragmentDescriptor.cpp.

References anonymous_namespace{QueryFragmentDescriptor.cpp}::is_sample_query(), SortInfo::limit, SortInfo::offset, ExecutionKernel::outer_tuple_count, and RelAlgExecutionUnit::sort_info.

Referenced by assignFragsToKernelDispatch().

275  {
276  const auto sample_query_limit =
277  ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
278  if (!kernel.outer_tuple_count) {
279  return false;
280  } else {
281  tuple_count += *kernel.outer_tuple_count;
282  if (is_sample_query(ra_exe_unit) && sample_query_limit > 0 &&
283  tuple_count >= sample_query_limit) {
284  return true;
285  }
286  }
287  return false;
288 }
const size_t limit
const SortInfo sort_info
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
std::optional< size_t > outer_tuple_count
const size_t offset

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::map<size_t, size_t> QueryFragmentDescriptor::available_gpu_mem_bytes_
protected

Definition at line 152 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage(), and QueryFragmentDescriptor().

std::map<int, std::vector<ExecutionKernel> > QueryFragmentDescriptor::execution_kernels_per_device_
protected
double QueryFragmentDescriptor::gpu_input_mem_limit_percent_
protected

Definition at line 150 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage().

size_t QueryFragmentDescriptor::outer_fragments_size_ = 0
protected
int64_t QueryFragmentDescriptor::rowid_lookup_key_ = -1
protected
std::map<int, const TableFragments*> QueryFragmentDescriptor::selected_tables_fragments_
protected
std::map<size_t, size_t> QueryFragmentDescriptor::tuple_count_per_device_
protected

Definition at line 151 of file QueryFragmentDescriptor.h.

Referenced by checkDeviceMemoryUsage().


The documentation for this class was generated from the following files: