OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
39 #include "Parser/ParserNode.h"
69 #include "Shared/checked_alloc.h"
70 #include "Shared/measure.h"
71 #include "Shared/misc.h"
72 #include "Shared/scope.h"
73 #include "Shared/shard_key.h"
74 #include "Shared/threading.h"
75 
76 bool g_enable_watchdog{false};
79 size_t g_cpu_sub_task_size{500'000};
80 bool g_enable_filter_function{true};
81 unsigned g_dynamic_watchdog_time_limit{10000};
82 bool g_allow_cpu_retry{true};
83 bool g_allow_query_step_cpu_retry{true};
84 bool g_null_div_by_zero{false};
85 unsigned g_trivial_loop_join_threshold{1000};
86 bool g_from_table_reordering{true};
87 bool g_inner_join_fragment_skipping{true};
88 extern bool g_enable_smem_group_by;
89 extern std::unique_ptr<llvm::Module> udf_gpu_module;
90 extern std::unique_ptr<llvm::Module> udf_cpu_module;
91 bool g_enable_filter_push_down{false};
92 float g_filter_push_down_low_frac{-1.0f};
93 float g_filter_push_down_high_frac{-1.0f};
94 size_t g_filter_push_down_passing_row_ubound{0};
95 bool g_enable_columnar_output{false};
96 bool g_enable_left_join_filter_hoisting{true};
97 bool g_optimize_row_initialization{true};
98 bool g_enable_overlaps_hashjoin{true};
99 bool g_enable_distance_rangejoin{true};
100 bool g_enable_hashjoin_many_to_many{false};
101 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
102 double g_overlaps_target_entries_per_bin{1.3};
103 bool g_strip_join_covered_quals{false};
104 size_t g_constrained_by_in_threshold{10};
105 size_t g_default_max_groups_buffer_entry_guess{16384};
106 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
107 bool g_enable_window_functions{true};
108 bool g_enable_table_functions{false};
109 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
110 size_t g_min_memory_allocation_size{
111  256}; // minimum memory allocation required for projection query output buffer
112  // without pre-flight count
113 bool g_enable_bump_allocator{false};
114 double g_bump_allocator_step_reduction{0.75};
115 bool g_enable_direct_columnarization{true};
116 extern bool g_enable_experimental_string_functions;
117 bool g_enable_lazy_fetch{true};
118 bool g_enable_runtime_query_interrupt{true};
119 bool g_enable_non_kernel_time_query_interrupt{true};
120 bool g_use_estimator_result_cache{true};
121 unsigned g_pending_query_interrupt_freq{1000};
122 double g_running_query_interrupt_freq{0.1};
123 size_t g_gpu_smem_threshold{
124  4096}; // GPU shared memory threshold (in bytes), if larger
125  // buffer sizes are required we do not use GPU shared
126  // memory optimizations Setting this to 0 means unlimited
127  // (subject to other dynamically calculated caps)
128 bool g_enable_smem_grouped_non_count_agg{
129  true}; // enable use of shared memory when performing group-by with select non-count
130  // aggregates
131 bool g_enable_smem_non_grouped_agg{
132  true}; // enable optimizations for using GPU shared memory in implementation of
133  // non-grouped aggregates
134 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
135  // limits the allocation for the output buffer arena
136  // and data recycler test
137 size_t g_enable_parallel_linearization{
138  10000}; // # rows that we are trying to linearize varlen col in parallel
139 bool g_enable_data_recycler{true};
140 bool g_use_hashtable_cache{true};
141 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
142 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
143 
144 size_t g_approx_quantile_buffer{1000};
145 size_t g_approx_quantile_centroids{300};
146 
147 bool g_enable_automatic_ir_metadata{true};
148 
149 extern bool g_cache_string_hash;
150 
151 int const Executor::max_gpu_count;
152 
153 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
154 
155 Executor::Executor(const ExecutorId executor_id,
156  Data_Namespace::DataMgr* data_mgr,
157  const size_t block_size_x,
158  const size_t grid_size_x,
159  const size_t max_gpu_slab_size,
160  const std::string& debug_dir,
161  const std::string& debug_file)
162  : cgen_state_(new CgenState({}, false))
163  , cpu_code_cache_(code_cache_size)
164  , gpu_code_cache_(code_cache_size)
165  , block_size_x_(block_size_x)
166  , grid_size_x_(grid_size_x)
167  , max_gpu_slab_size_(max_gpu_slab_size)
168  , debug_dir_(debug_dir)
169  , debug_file_(debug_file)
170  , executor_id_(executor_id)
171  , catalog_(nullptr)
172  , data_mgr_(data_mgr)
173  , temporary_tables_(nullptr)
174  , input_table_info_cache_(this) {}
175 
176 std::shared_ptr<Executor> Executor::getExecutor(
177  const ExecutorId executor_id,
178  const std::string& debug_dir,
179  const std::string& debug_file,
180  const SystemParameters& system_parameters) {
181  INJECT_TIMER(getExecutor);
182 
183  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
184  auto it = executors_.find(executor_id);
185  if (it != executors_.end()) {
186  return it->second;
187  }
188 
189  auto& data_mgr = Catalog_Namespace::SysCatalog::instance().getDataMgr();
190  auto executor = std::make_shared<Executor>(executor_id,
191  &data_mgr,
192  system_parameters.cuda_block_size,
193  system_parameters.cuda_grid_size,
194  system_parameters.max_gpu_slab_size,
195  debug_dir,
196  debug_file);
197  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
198  return executor;
199 }
200 
201 void Executor::clearMemory(const Data_Namespace::MemoryLevel memory_level) {
202  switch (memory_level) {
203  case Data_Namespace::MemoryLevel::CPU_LEVEL:
204  case Data_Namespace::MemoryLevel::GPU_LEVEL: {
205  mapd_unique_lock<mapd_shared_mutex> flush_lock(
206  execute_mutex_); // Don't flush memory while queries are running
207 
209  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
210  // The hash table cache uses CPU memory not managed by the buffer manager. In the
211  // future, we should manage these allocations with the buffer manager directly.
212  // For now, assume the user wants to purge the hash table cache when they clear
213  // CPU memory (currently used in ExecuteTest to lower memory pressure)
215  }
216  break;
217  }
218  default: {
219  throw std::runtime_error(
220  "Clearing memory levels other than the CPU level or GPU level is not "
221  "supported.");
222  }
223  }
224 }
225 
227  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
228 }
229 
231  const int dict_id_in,
232  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
233  const bool with_generation) const {
234  CHECK(row_set_mem_owner);
235  std::lock_guard<std::mutex> lock(
236  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
237  return row_set_mem_owner->getOrAddStringDictProxy(
238  dict_id_in, with_generation, catalog_);
239 }
240 
242  const int dict_id_in,
243  const bool with_generation,
244  const Catalog_Namespace::Catalog* catalog) {
245  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
246  CHECK(catalog);
247  const auto dd = catalog->getMetadataForDict(dict_id);
248  if (dd) {
249  CHECK(dd->stringDict);
250  CHECK_LE(dd->dictNBits, 32);
251  const int64_t generation =
252  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
253  return addStringDict(dd->stringDict, dict_id, generation);
254  }
255  CHECK_EQ(0, dict_id);
256  if (!lit_str_dict_proxy_) {
257  std::shared_ptr<StringDictionary> tsd =
258  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
260  tsd, 0, 0)); // use 0 string_dict_id to denote literal proxy
261  }
262  return lit_str_dict_proxy_.get();
263 }
264 
266  std::lock_guard<std::mutex> lock(state_mutex_);
267  return t_digests_
268  .emplace_back(std::make_unique<quantile::TDigest>(
270  .get();
271 }
272 
273 bool Executor::isCPUOnly() const {
274  CHECK(data_mgr_);
275  return !data_mgr_->getCudaMgr();
276 }
277 
279  const Analyzer::ColumnVar* col_var) const {
281  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
282 }
283 
285  const Analyzer::ColumnVar* col_var,
286  int n) const {
287  const auto cd = getColumnDescriptor(col_var);
288  if (!cd || n > cd->columnType.get_physical_cols()) {
289  return nullptr;
290  }
292  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
293 }
294 
296  return catalog_;
297 }
298 
300  catalog_ = catalog;
301 }
302 
303 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
304  return row_set_mem_owner_;
305 }
306 
308  return temporary_tables_;
309 }
310 
312  return input_table_info_cache_.getTableInfo(table_id);
313 }
314 
315 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
316  return table_generations_.getGeneration(table_id);
317 }
318 
320  return agg_col_range_cache_.getColRange(phys_input);
321 }
322 
323 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
324  size_t num_bytes = 0;
325  if (!plan_state_) {
326  return 0;
327  }
328  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
329  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
330  continue;
331  }
332 
333  if (fetched_col_pair.first < 0) {
334  num_bytes += 8;
335  } else {
336  const auto cd =
337  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
338  const auto& ti = cd->columnType;
339  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
340  ? 4
341  : ti.get_size();
342  if (sz < 0) {
343  // for varlen types, only account for the pointer/size for each row, for now
344  num_bytes += 16;
345  } else {
346  num_bytes += sz;
347  }
348  }
349  }
350  return num_bytes;
351 }
352 
354  const std::vector<Analyzer::Expr*>& target_exprs) const {
355  CHECK(plan_state_);
356  for (const auto target_expr : target_exprs) {
357  if (plan_state_->isLazyFetchColumn(target_expr)) {
358  return true;
359  }
360  }
361  return false;
362 }
363 
364 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
365  const std::vector<Analyzer::Expr*>& target_exprs) const {
366  CHECK(plan_state_);
367  CHECK(catalog_);
368  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
369  for (const auto target_expr : target_exprs) {
370  if (!plan_state_->isLazyFetchColumn(target_expr)) {
371  col_lazy_fetch_info.emplace_back(
372  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
373  } else {
374  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
375  CHECK(col_var);
376  auto col_id = col_var->get_column_id();
377  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
378  auto cd = (col_var->get_table_id() > 0)
379  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
380  : nullptr;
381  if (cd && IS_GEO(cd->columnType.get_type())) {
382  // Geo coords cols will be processed in sequence. So we only need to track the
383  // first coords col in lazy fetch info.
384  {
385  auto cd0 =
386  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
387  auto col0_ti = cd0->columnType;
388  CHECK(!cd0->isVirtualCol);
389  auto col0_var = makeExpr<Analyzer::ColumnVar>(
390  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
391  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
392  col_lazy_fetch_info.emplace_back(
393  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
394  }
395  } else {
396  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
397  const auto& col_ti = col_var->get_type_info();
398  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
399  }
400  }
401  }
402  return col_lazy_fetch_info;
403 }
404 
406  input_table_info_cache_.clear();
407  agg_col_range_cache_.clear();
408  table_generations_.clear();
409 }
410 
411 std::vector<int8_t> Executor::serializeLiterals(
412  const std::unordered_map<int, CgenState::LiteralValues>& literals,
413  const int device_id) {
414  if (literals.empty()) {
415  return {};
416  }
417  const auto dev_literals_it = literals.find(device_id);
418  CHECK(dev_literals_it != literals.end());
419  const auto& dev_literals = dev_literals_it->second;
420  size_t lit_buf_size{0};
421  std::vector<std::string> real_strings;
422  std::vector<std::vector<double>> double_array_literals;
423  std::vector<std::vector<int8_t>> align64_int8_array_literals;
424  std::vector<std::vector<int32_t>> int32_array_literals;
425  std::vector<std::vector<int8_t>> align32_int8_array_literals;
426  std::vector<std::vector<int8_t>> int8_array_literals;
427  for (const auto& lit : dev_literals) {
428  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
429  if (lit.which() == 7) {
430  const auto p = boost::get<std::string>(&lit);
431  CHECK(p);
432  real_strings.push_back(*p);
433  } else if (lit.which() == 8) {
434  const auto p = boost::get<std::vector<double>>(&lit);
435  CHECK(p);
436  double_array_literals.push_back(*p);
437  } else if (lit.which() == 9) {
438  const auto p = boost::get<std::vector<int32_t>>(&lit);
439  CHECK(p);
440  int32_array_literals.push_back(*p);
441  } else if (lit.which() == 10) {
442  const auto p = boost::get<std::vector<int8_t>>(&lit);
443  CHECK(p);
444  int8_array_literals.push_back(*p);
445  } else if (lit.which() == 11) {
446  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
447  CHECK(p);
448  if (p->second == 64) {
449  align64_int8_array_literals.push_back(p->first);
450  } else if (p->second == 32) {
451  align32_int8_array_literals.push_back(p->first);
452  } else {
453  CHECK(false);
454  }
455  }
456  }
457  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
458  throw TooManyLiterals();
459  }
460  int16_t crt_real_str_off = lit_buf_size;
461  for (const auto& real_str : real_strings) {
462  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
463  lit_buf_size += real_str.size();
464  }
465  if (double_array_literals.size() > 0) {
466  lit_buf_size = align(lit_buf_size, sizeof(double));
467  }
468  int16_t crt_double_arr_lit_off = lit_buf_size;
469  for (const auto& double_array_literal : double_array_literals) {
470  CHECK_LE(double_array_literal.size(),
471  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
472  lit_buf_size += double_array_literal.size() * sizeof(double);
473  }
474  if (align64_int8_array_literals.size() > 0) {
475  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
476  }
477  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
478  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
479  CHECK_LE(align64_int8_array_literals.size(),
480  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
481  lit_buf_size += align64_int8_array_literal.size();
482  }
483  if (int32_array_literals.size() > 0) {
484  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
485  }
486  int16_t crt_int32_arr_lit_off = lit_buf_size;
487  for (const auto& int32_array_literal : int32_array_literals) {
488  CHECK_LE(int32_array_literal.size(),
489  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
490  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
491  }
492  if (align32_int8_array_literals.size() > 0) {
493  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
494  }
495  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
496  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
497  CHECK_LE(align32_int8_array_literals.size(),
498  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
499  lit_buf_size += align32_int8_array_literal.size();
500  }
501  int16_t crt_int8_arr_lit_off = lit_buf_size;
502  for (const auto& int8_array_literal : int8_array_literals) {
503  CHECK_LE(int8_array_literal.size(),
504  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
505  lit_buf_size += int8_array_literal.size();
506  }
507  unsigned crt_real_str_idx = 0;
508  unsigned crt_double_arr_lit_idx = 0;
509  unsigned crt_align64_int8_arr_lit_idx = 0;
510  unsigned crt_int32_arr_lit_idx = 0;
511  unsigned crt_align32_int8_arr_lit_idx = 0;
512  unsigned crt_int8_arr_lit_idx = 0;
513  std::vector<int8_t> serialized(lit_buf_size);
514  size_t off{0};
515  for (const auto& lit : dev_literals) {
516  const auto lit_bytes = CgenState::literalBytes(lit);
517  off = CgenState::addAligned(off, lit_bytes);
518  switch (lit.which()) {
519  case 0: {
520  const auto p = boost::get<int8_t>(&lit);
521  CHECK(p);
522  serialized[off - lit_bytes] = *p;
523  break;
524  }
525  case 1: {
526  const auto p = boost::get<int16_t>(&lit);
527  CHECK(p);
528  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
529  break;
530  }
531  case 2: {
532  const auto p = boost::get<int32_t>(&lit);
533  CHECK(p);
534  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
535  break;
536  }
537  case 3: {
538  const auto p = boost::get<int64_t>(&lit);
539  CHECK(p);
540  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
541  break;
542  }
543  case 4: {
544  const auto p = boost::get<float>(&lit);
545  CHECK(p);
546  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
547  break;
548  }
549  case 5: {
550  const auto p = boost::get<double>(&lit);
551  CHECK(p);
552  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
553  break;
554  }
555  case 6: {
556  const auto p = boost::get<std::pair<std::string, int>>(&lit);
557  CHECK(p);
558  const auto str_id =
560  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
561  ->getOrAddTransient(p->first)
562  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
563  ->getIdOfString(p->first);
564  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
565  break;
566  }
567  case 7: {
568  const auto p = boost::get<std::string>(&lit);
569  CHECK(p);
570  int32_t off_and_len = crt_real_str_off << 16;
571  const auto& crt_real_str = real_strings[crt_real_str_idx];
572  off_and_len |= static_cast<int16_t>(crt_real_str.size());
573  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
574  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
575  ++crt_real_str_idx;
576  crt_real_str_off += crt_real_str.size();
577  break;
578  }
579  case 8: {
580  const auto p = boost::get<std::vector<double>>(&lit);
581  CHECK(p);
582  int32_t off_and_len = crt_double_arr_lit_off << 16;
583  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
584  int32_t len = crt_double_arr_lit.size();
585  CHECK_EQ((len >> 16), 0);
586  off_and_len |= static_cast<int16_t>(len);
587  int32_t double_array_bytesize = len * sizeof(double);
588  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
589  memcpy(&serialized[crt_double_arr_lit_off],
590  crt_double_arr_lit.data(),
591  double_array_bytesize);
592  ++crt_double_arr_lit_idx;
593  crt_double_arr_lit_off += double_array_bytesize;
594  break;
595  }
596  case 9: {
597  const auto p = boost::get<std::vector<int32_t>>(&lit);
598  CHECK(p);
599  int32_t off_and_len = crt_int32_arr_lit_off << 16;
600  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
601  int32_t len = crt_int32_arr_lit.size();
602  CHECK_EQ((len >> 16), 0);
603  off_and_len |= static_cast<int16_t>(len);
604  int32_t int32_array_bytesize = len * sizeof(int32_t);
605  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
606  memcpy(&serialized[crt_int32_arr_lit_off],
607  crt_int32_arr_lit.data(),
608  int32_array_bytesize);
609  ++crt_int32_arr_lit_idx;
610  crt_int32_arr_lit_off += int32_array_bytesize;
611  break;
612  }
613  case 10: {
614  const auto p = boost::get<std::vector<int8_t>>(&lit);
615  CHECK(p);
616  int32_t off_and_len = crt_int8_arr_lit_off << 16;
617  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
618  int32_t len = crt_int8_arr_lit.size();
619  CHECK_EQ((len >> 16), 0);
620  off_and_len |= static_cast<int16_t>(len);
621  int32_t int8_array_bytesize = len;
622  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
623  memcpy(&serialized[crt_int8_arr_lit_off],
624  crt_int8_arr_lit.data(),
625  int8_array_bytesize);
626  ++crt_int8_arr_lit_idx;
627  crt_int8_arr_lit_off += int8_array_bytesize;
628  break;
629  }
630  case 11: {
631  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
632  CHECK(p);
633  if (p->second == 64) {
634  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
635  const auto& crt_align64_int8_arr_lit =
636  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
637  int32_t len = crt_align64_int8_arr_lit.size();
638  CHECK_EQ((len >> 16), 0);
639  off_and_len |= static_cast<int16_t>(len);
640  int32_t align64_int8_array_bytesize = len;
641  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
642  memcpy(&serialized[crt_align64_int8_arr_lit_off],
643  crt_align64_int8_arr_lit.data(),
644  align64_int8_array_bytesize);
645  ++crt_align64_int8_arr_lit_idx;
646  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
647  } else if (p->second == 32) {
648  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
649  const auto& crt_align32_int8_arr_lit =
650  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
651  int32_t len = crt_align32_int8_arr_lit.size();
652  CHECK_EQ((len >> 16), 0);
653  off_and_len |= static_cast<int16_t>(len);
654  int32_t align32_int8_array_bytesize = len;
655  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
656  memcpy(&serialized[crt_align32_int8_arr_lit_off],
657  crt_align32_int8_arr_lit.data(),
658  align32_int8_array_bytesize);
659  ++crt_align32_int8_arr_lit_idx;
660  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
661  } else {
662  CHECK(false);
663  }
664  break;
665  }
666  default:
667  CHECK(false);
668  }
669  }
670  return serialized;
671 }
672 
673 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
674  if (device_type == ExecutorDeviceType::GPU) {
675  return cudaMgr()->getDeviceCount();
676  } else {
677  return 1;
678  }
679 }
680 
682  const Data_Namespace::MemoryLevel memory_level) const {
683  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
684  : deviceCount(ExecutorDeviceType::CPU);
685 }
686 
687 // TODO(alex): remove or split
688 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
689  const SQLTypeInfo& ti,
690  const int64_t agg_init_val,
691  const int8_t out_byte_width,
692  const int64_t* out_vec,
693  const size_t out_vec_sz,
694  const bool is_group_by,
695  const bool float_argument_input) {
696  switch (agg) {
697  case kAVG:
698  case kSUM:
699  if (0 != agg_init_val) {
700  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
701  int64_t agg_result = agg_init_val;
702  for (size_t i = 0; i < out_vec_sz; ++i) {
703  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
704  }
705  return {agg_result, 0};
706  } else {
707  CHECK(ti.is_fp());
708  switch (out_byte_width) {
709  case 4: {
710  int agg_result = static_cast<int32_t>(agg_init_val);
711  for (size_t i = 0; i < out_vec_sz; ++i) {
713  &agg_result,
714  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
715  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
716  }
717  const int64_t converted_bin =
718  float_argument_input
719  ? static_cast<int64_t>(agg_result)
720  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
721  return {converted_bin, 0};
722  break;
723  }
724  case 8: {
725  int64_t agg_result = agg_init_val;
726  for (size_t i = 0; i < out_vec_sz; ++i) {
728  &agg_result,
729  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
730  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
731  }
732  return {agg_result, 0};
733  break;
734  }
735  default:
736  CHECK(false);
737  }
738  }
739  }
740  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
741  int64_t agg_result = 0;
742  for (size_t i = 0; i < out_vec_sz; ++i) {
743  agg_result += out_vec[i];
744  }
745  return {agg_result, 0};
746  } else {
747  CHECK(ti.is_fp());
748  switch (out_byte_width) {
749  case 4: {
750  float r = 0.;
751  for (size_t i = 0; i < out_vec_sz; ++i) {
752  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
753  }
754  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
755  const int64_t converted_bin =
756  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
757  return {converted_bin, 0};
758  }
759  case 8: {
760  double r = 0.;
761  for (size_t i = 0; i < out_vec_sz; ++i) {
762  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
763  }
764  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
765  }
766  default:
767  CHECK(false);
768  }
769  }
770  break;
771  case kCOUNT: {
772  uint64_t agg_result = 0;
773  for (size_t i = 0; i < out_vec_sz; ++i) {
774  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
775  agg_result += out;
776  }
777  return {static_cast<int64_t>(agg_result), 0};
778  }
779  case kMIN: {
780  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
781  int64_t agg_result = agg_init_val;
782  for (size_t i = 0; i < out_vec_sz; ++i) {
783  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
784  }
785  return {agg_result, 0};
786  } else {
787  switch (out_byte_width) {
788  case 4: {
789  int32_t agg_result = static_cast<int32_t>(agg_init_val);
790  for (size_t i = 0; i < out_vec_sz; ++i) {
792  &agg_result,
793  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
794  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
795  }
796  const int64_t converted_bin =
797  float_argument_input
798  ? static_cast<int64_t>(agg_result)
799  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
800  return {converted_bin, 0};
801  }
802  case 8: {
803  int64_t agg_result = agg_init_val;
804  for (size_t i = 0; i < out_vec_sz; ++i) {
806  &agg_result,
807  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
808  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
809  }
810  return {agg_result, 0};
811  }
812  default:
813  CHECK(false);
814  }
815  }
816  }
817  case kMAX:
818  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
819  int64_t agg_result = agg_init_val;
820  for (size_t i = 0; i < out_vec_sz; ++i) {
821  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
822  }
823  return {agg_result, 0};
824  } else {
825  switch (out_byte_width) {
826  case 4: {
827  int32_t agg_result = static_cast<int32_t>(agg_init_val);
828  for (size_t i = 0; i < out_vec_sz; ++i) {
830  &agg_result,
831  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
832  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
833  }
834  const int64_t converted_bin =
835  float_argument_input ? static_cast<int64_t>(agg_result)
836  : float_to_double_bin(agg_result, !ti.get_notnull());
837  return {converted_bin, 0};
838  }
839  case 8: {
840  int64_t agg_result = agg_init_val;
841  for (size_t i = 0; i < out_vec_sz; ++i) {
843  &agg_result,
844  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
845  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
846  }
847  return {agg_result, 0};
848  }
849  default:
850  CHECK(false);
851  }
852  }
853  case kSINGLE_VALUE: {
854  int64_t agg_result = agg_init_val;
855  for (size_t i = 0; i < out_vec_sz; ++i) {
856  if (out_vec[i] != agg_init_val) {
857  if (agg_result == agg_init_val) {
858  agg_result = out_vec[i];
859  } else if (out_vec[i] != agg_result) {
861  }
862  }
863  }
864  return {agg_result, 0};
865  }
866  case kSAMPLE: {
867  int64_t agg_result = agg_init_val;
868  for (size_t i = 0; i < out_vec_sz; ++i) {
869  if (out_vec[i] != agg_init_val) {
870  agg_result = out_vec[i];
871  break;
872  }
873  }
874  return {agg_result, 0};
875  }
876  default:
877  CHECK(false);
878  }
879  abort();
880 }
881 
882 namespace {
883 
885  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
886  auto& first = results_per_device.front().first;
887  CHECK(first);
888  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
889  const auto& next = results_per_device[dev_idx].first;
890  CHECK(next);
891  first->append(*next);
892  }
893  return std::move(first);
894 }
895 
896 } // namespace
897 
899  const RelAlgExecutionUnit& ra_exe_unit) {
900  auto& results_per_device = shared_context.getFragmentResults();
901  if (results_per_device.empty()) {
902  std::vector<TargetInfo> targets;
903  for (const auto target_expr : ra_exe_unit.target_exprs) {
904  targets.push_back(get_target_info(target_expr, g_bigint_count));
905  }
906  return std::make_shared<ResultSet>(targets,
909  row_set_mem_owner_,
910  catalog_,
911  blockSize(),
912  gridSize());
913  }
914  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
915  std::sort(results_per_device.begin(),
916  results_per_device.end(),
917  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
918  CHECK_GE(lhs.second.size(), size_t(1));
919  CHECK_GE(rhs.second.size(), size_t(1));
920  return lhs.second.front() < rhs.second.front();
921  });
922 
923  return get_merged_result(results_per_device);
924 }
925 
927  const RelAlgExecutionUnit& ra_exe_unit,
928  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
929  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
930  const QueryMemoryDescriptor& query_mem_desc) const {
931  auto timer = DEBUG_TIMER(__func__);
932  if (ra_exe_unit.estimator) {
933  return reduce_estimator_results(ra_exe_unit, results_per_device);
934  }
935 
936  if (results_per_device.empty()) {
937  std::vector<TargetInfo> targets;
938  for (const auto target_expr : ra_exe_unit.target_exprs) {
939  targets.push_back(get_target_info(target_expr, g_bigint_count));
940  }
941  return std::make_shared<ResultSet>(targets,
944  nullptr,
945  catalog_,
946  blockSize(),
947  gridSize());
948  }
949 
950  return reduceMultiDeviceResultSets(
951  results_per_device,
952  row_set_mem_owner,
953  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
954 }
955 
956 namespace {
957 
959  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
960  int64_t* compilation_queue_time) {
961  auto clock_begin = timer_start();
962  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
963  *compilation_queue_time = timer_stop(clock_begin);
964  const auto& this_result_set = results_per_device[0].first;
965  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
966  this_result_set->getTargetInfos(),
967  this_result_set->getTargetInitVals());
968  return reduction_jit.codegen();
969 };
970 
971 } // namespace
972 
974  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
975  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
976  const QueryMemoryDescriptor& query_mem_desc) const {
977  auto timer = DEBUG_TIMER(__func__);
978  std::shared_ptr<ResultSet> reduced_results;
979 
980  const auto& first = results_per_device.front().first;
981 
982  if (query_mem_desc.getQueryDescriptionType() ==
984  results_per_device.size() > 1) {
985  const auto total_entry_count = std::accumulate(
986  results_per_device.begin(),
987  results_per_device.end(),
988  size_t(0),
989  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
990  const auto& r = rs.first;
991  return init + r->getQueryMemDesc().getEntryCount();
992  });
993  CHECK(total_entry_count);
994  auto query_mem_desc = first->getQueryMemDesc();
995  query_mem_desc.setEntryCount(total_entry_count);
996  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
999  row_set_mem_owner,
1000  catalog_,
1001  blockSize(),
1002  gridSize());
1003  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1004  reduced_results->initializeStorage();
1005  switch (query_mem_desc.getEffectiveKeyWidth()) {
1006  case 4:
1007  first->getStorage()->moveEntriesToBuffer<int32_t>(
1008  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1009  break;
1010  case 8:
1011  first->getStorage()->moveEntriesToBuffer<int64_t>(
1012  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1013  break;
1014  default:
1015  CHECK(false);
1016  }
1017  } else {
1018  reduced_results = first;
1019  }
1020 
1021  int64_t compilation_queue_time = 0;
1022  const auto reduction_code =
1023  get_reduction_code(results_per_device, &compilation_queue_time);
1024 
1025  for (size_t i = 1; i < results_per_device.size(); ++i) {
1026  reduced_results->getStorage()->reduce(
1027  *(results_per_device[i].first->getStorage()), {}, reduction_code);
1028  }
1029  reduced_results->addCompilationQueueTime(compilation_queue_time);
1030  return reduced_results;
1031 }
1032 
1034  const RelAlgExecutionUnit& ra_exe_unit,
1035  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1036  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1037  const QueryMemoryDescriptor& query_mem_desc) const {
1038  if (results_per_device.size() == 1) {
1039  return std::move(results_per_device.front().first);
1040  }
1041  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1043  for (const auto& result : results_per_device) {
1044  auto rows = result.first;
1045  CHECK(rows);
1046  if (!rows) {
1047  continue;
1048  }
1049  SpeculativeTopNMap that(
1050  *rows,
1051  ra_exe_unit.target_exprs,
1052  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1053  m.reduce(that);
1054  }
1055  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1056  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1057  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1058 }
1059 
1060 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1061  CHECK(data_mgr);
1062  std::unordered_set<int> available_gpus;
1063  if (data_mgr->gpusPresent()) {
1064  CHECK(data_mgr->getCudaMgr());
1065  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1066  CHECK_GT(gpu_count, 0);
1067  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1068  available_gpus.insert(gpu_id);
1069  }
1070  }
1071  return available_gpus;
1072 }
1073 
1074 size_t get_context_count(const ExecutorDeviceType device_type,
1075  const size_t cpu_count,
1076  const size_t gpu_count) {
1077  return device_type == ExecutorDeviceType::GPU ? gpu_count
1078  : static_cast<size_t>(cpu_count);
1079 }
1080 
1081 namespace {
1082 
1083 // Compute a very conservative entry count for the output buffer entry count using no
1084 // other information than the number of tuples in each table and multiplying them
1085 // together.
1086 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1088  // Check for overflows since we're multiplying potentially big table sizes.
1089  using checked_size_t = boost::multiprecision::number<
1090  boost::multiprecision::cpp_int_backend<64,
1091  64,
1092  boost::multiprecision::unsigned_magnitude,
1093  boost::multiprecision::checked,
1094  void>>;
1095  checked_size_t max_groups_buffer_entry_guess = 1;
1096  for (const auto& query_info : query_infos) {
1097  CHECK(!query_info.info.fragments.empty());
1098  auto it = std::max_element(query_info.info.fragments.begin(),
1099  query_info.info.fragments.end(),
1100  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1101  return f1.getNumTuples() < f2.getNumTuples();
1102  });
1103  max_groups_buffer_entry_guess *= it->getNumTuples();
1104  }
1105  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1106  // baseline group layout with that many entries anyway.
1107  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1108  try {
1109  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1110  max_groups_buffer_entry_guess_cap);
1111  } catch (...) {
1112  return max_groups_buffer_entry_guess_cap;
1113  }
1114 }
1115 
1116 std::string get_table_name(const InputDescriptor& input_desc,
1118  const auto source_type = input_desc.getSourceType();
1119  if (source_type == InputSourceType::TABLE) {
1120  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1121  CHECK(td);
1122  return td->tableName;
1123  } else {
1124  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1125  }
1126 }
1127 
1128 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1129  const int device_count) {
1130  if (device_type == ExecutorDeviceType::GPU) {
1131  return device_count * Executor::high_scan_limit;
1132  }
1134 }
1135 
1137  const std::vector<InputTableInfo>& table_infos,
1139  const ExecutorDeviceType device_type,
1140  const int device_count) {
1141  for (const auto target_expr : ra_exe_unit.target_exprs) {
1142  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1143  return;
1144  }
1145  }
1146  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1147  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1148  // Allow a query with no scan limit to run on small tables
1149  return;
1150  }
1151  if (ra_exe_unit.use_bump_allocator) {
1152  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1153  // relative to the size of the input), so we bypass this check for now
1154  return;
1155  }
1156  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1157  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1158  (!ra_exe_unit.scan_limit ||
1159  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1160  std::vector<std::string> table_names;
1161  const auto& input_descs = ra_exe_unit.input_descs;
1162  for (const auto& input_desc : input_descs) {
1163  table_names.push_back(get_table_name(input_desc, cat));
1164  }
1165  if (!ra_exe_unit.scan_limit) {
1166  throw WatchdogException(
1167  "Projection query would require a scan without a limit on table(s): " +
1168  boost::algorithm::join(table_names, ", "));
1169  } else {
1170  throw WatchdogException(
1171  "Projection query output result set on table(s): " +
1172  boost::algorithm::join(table_names, ", ") + " would contain " +
1173  std::to_string(ra_exe_unit.scan_limit) +
1174  " rows, which is more than the current system limit of " +
1175  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1176  }
1177  }
1178 }
1179 
1180 } // namespace
1181 
1182 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1183  const RelAlgExecutionUnit& ra_exe_unit) {
1184  if (ra_exe_unit.input_descs.size() < 2) {
1185  return false;
1186  }
1187 
1188  // We only support loop join at the end of folded joins
1189  // where ra_exe_unit.input_descs.size() > 2 for now.
1190  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1191 
1192  std::optional<size_t> inner_table_idx;
1193  for (size_t i = 0; i < query_infos.size(); ++i) {
1194  if (query_infos[i].table_id == inner_table_id) {
1195  inner_table_idx = i;
1196  break;
1197  }
1198  }
1199  CHECK(inner_table_idx);
1200  return query_infos[*inner_table_idx].info.getNumTuples() <=
1202 }
1203 
1204 namespace {
1205 
1206 template <typename T>
1207 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1208  std::vector<std::string> expr_strs;
1209  for (const auto& expr : expr_container) {
1210  if (!expr) {
1211  expr_strs.emplace_back("NULL");
1212  } else {
1213  expr_strs.emplace_back(expr->toString());
1214  }
1215  }
1216  return expr_strs;
1217 }
1218 
1219 template <>
1220 std::vector<std::string> expr_container_to_string(
1221  const std::list<Analyzer::OrderEntry>& expr_container) {
1222  std::vector<std::string> expr_strs;
1223  for (const auto& expr : expr_container) {
1224  expr_strs.emplace_back(expr.toString());
1225  }
1226  return expr_strs;
1227 }
1228 
1229 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1230  switch (algorithm) {
1232  return "ResultSet";
1234  return "Speculative Top N";
1236  return "Streaming Top N";
1237  }
1238  UNREACHABLE();
1239  return "";
1240 }
1241 
1242 } // namespace
1243 
1244 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1245  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1246  // instead of ra_exec_unit description if possible
1247  std::ostringstream os;
1248  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1249  const auto& scan_desc = input_col_desc->getScanDesc();
1250  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1251  << scan_desc.getNestLevel();
1252  }
1253  if (!ra_exe_unit.simple_quals.empty()) {
1254  for (const auto& qual : ra_exe_unit.simple_quals) {
1255  if (qual) {
1256  os << qual->toString() << ",";
1257  }
1258  }
1259  }
1260  if (!ra_exe_unit.quals.empty()) {
1261  for (const auto& qual : ra_exe_unit.quals) {
1262  if (qual) {
1263  os << qual->toString() << ",";
1264  }
1265  }
1266  }
1267  if (!ra_exe_unit.join_quals.empty()) {
1268  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1269  const auto& join_condition = ra_exe_unit.join_quals[i];
1270  os << std::to_string(i) << ::toString(join_condition.type);
1271  for (const auto& qual : join_condition.quals) {
1272  if (qual) {
1273  os << qual->toString() << ",";
1274  }
1275  }
1276  }
1277  }
1278  if (!ra_exe_unit.groupby_exprs.empty()) {
1279  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1280  if (qual) {
1281  os << qual->toString() << ",";
1282  }
1283  }
1284  }
1285  for (const auto& expr : ra_exe_unit.target_exprs) {
1286  if (expr) {
1287  os << expr->toString() << ",";
1288  }
1289  }
1290  os << ::toString(ra_exe_unit.estimator == nullptr);
1291  os << std::to_string(ra_exe_unit.scan_limit);
1292  return os.str();
1293 }
1294 
1295 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1296  auto query_plan_dag =
1297  ra_exe_unit.query_plan_dag == EMPTY_QUERY_PLAN ? "N/A" : ra_exe_unit.query_plan_dag;
1298  os << "\n\tExtracted Query Plan Dag: " << query_plan_dag;
1299  os << "\n\tTable/Col/Levels: ";
1300  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1301  const auto& scan_desc = input_col_desc->getScanDesc();
1302  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1303  << scan_desc.getNestLevel() << ") ";
1304  }
1305  if (!ra_exe_unit.simple_quals.empty()) {
1306  os << "\n\tSimple Quals: "
1308  ", ");
1309  }
1310  if (!ra_exe_unit.quals.empty()) {
1311  os << "\n\tQuals: "
1312  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1313  }
1314  if (!ra_exe_unit.join_quals.empty()) {
1315  os << "\n\tJoin Quals: ";
1316  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1317  const auto& join_condition = ra_exe_unit.join_quals[i];
1318  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1319  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1320  }
1321  }
1322  if (!ra_exe_unit.groupby_exprs.empty()) {
1323  os << "\n\tGroup By: "
1325  ", ");
1326  }
1327  os << "\n\tProjected targets: "
1329  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1330  os << "\n\tSort Info: ";
1331  const auto& sort_info = ra_exe_unit.sort_info;
1332  os << "\n\t Order Entries: "
1333  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1334  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1335  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1336  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1337  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1338  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1339  if (ra_exe_unit.union_all) {
1340  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1341  }
1342  return os;
1343 }
1344 
1345 namespace {
1346 
1348  const size_t new_scan_limit) {
1349  return {ra_exe_unit_in.input_descs,
1350  ra_exe_unit_in.input_col_descs,
1351  ra_exe_unit_in.simple_quals,
1352  ra_exe_unit_in.quals,
1353  ra_exe_unit_in.join_quals,
1354  ra_exe_unit_in.groupby_exprs,
1355  ra_exe_unit_in.target_exprs,
1356  ra_exe_unit_in.estimator,
1357  ra_exe_unit_in.sort_info,
1358  new_scan_limit,
1359  ra_exe_unit_in.query_hint,
1360  ra_exe_unit_in.query_plan_dag,
1361  ra_exe_unit_in.hash_table_build_plan_dag,
1362  ra_exe_unit_in.table_id_to_node_map,
1363  ra_exe_unit_in.use_bump_allocator,
1364  ra_exe_unit_in.union_all,
1365  ra_exe_unit_in.query_state};
1366 }
1367 
1368 } // namespace
1369 
1370 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1371  const bool is_agg,
1372  const std::vector<InputTableInfo>& query_infos,
1373  const RelAlgExecutionUnit& ra_exe_unit_in,
1374  const CompilationOptions& co,
1375  const ExecutionOptions& eo,
1377  RenderInfo* render_info,
1378  const bool has_cardinality_estimation,
1379  ColumnCacheMap& column_cache) {
1380  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1381 
1382  ScopeGuard cleanup_post_execution = [this] {
1383  // cleanup/unpin GPU buffer allocations
1384  // TODO: separate out this state into a single object
1385  plan_state_.reset(nullptr);
1386  if (cgen_state_) {
1387  cgen_state_->in_values_bitmaps_.clear();
1388  }
1389  };
1390 
1391  try {
1392  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1393  is_agg,
1394  true,
1395  query_infos,
1396  ra_exe_unit_in,
1397  co,
1398  eo,
1399  cat,
1400  row_set_mem_owner_,
1401  render_info,
1402  has_cardinality_estimation,
1403  column_cache);
1404  if (result) {
1405  result->setKernelQueueTime(kernel_queue_time_ms_);
1406  result->addCompilationQueueTime(compilation_queue_time_ms_);
1407  if (eo.just_validate) {
1408  result->setValidationOnlyRes();
1409  }
1410  }
1411  return result;
1412  } catch (const CompilationRetryNewScanLimit& e) {
1413  auto result =
1414  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1415  is_agg,
1416  false,
1417  query_infos,
1418  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1419  co,
1420  eo,
1421  cat,
1422  row_set_mem_owner_,
1423  render_info,
1424  has_cardinality_estimation,
1425  column_cache);
1426  if (result) {
1427  result->setKernelQueueTime(kernel_queue_time_ms_);
1428  result->addCompilationQueueTime(compilation_queue_time_ms_);
1429  if (eo.just_validate) {
1430  result->setValidationOnlyRes();
1431  }
1432  }
1433  return result;
1434  }
1435 }
1436 
1438  size_t& max_groups_buffer_entry_guess,
1439  const bool is_agg,
1440  const bool allow_single_frag_table_opt,
1441  const std::vector<InputTableInfo>& query_infos,
1442  const RelAlgExecutionUnit& ra_exe_unit_in,
1443  const CompilationOptions& co,
1444  const ExecutionOptions& eo,
1446  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1447  RenderInfo* render_info,
1448  const bool has_cardinality_estimation,
1449  ColumnCacheMap& column_cache) {
1450  INJECT_TIMER(Exec_executeWorkUnit);
1451  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1452  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1453  CHECK(!query_infos.empty());
1454  if (!max_groups_buffer_entry_guess) {
1455  // The query has failed the first execution attempt because of running out
1456  // of group by slots. Make the conservative choice: allocate fragment size
1457  // slots and run on the CPU.
1458  CHECK(device_type == ExecutorDeviceType::CPU);
1459  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1460  }
1461 
1462  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1463  do {
1464  SharedKernelContext shared_context(query_infos);
1465  ColumnFetcher column_fetcher(this, column_cache);
1466  ScopeGuard scope_guard = [&column_fetcher] {
1467  column_fetcher.freeLinearizedBuf();
1468  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1469  };
1470  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1471  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1472 
1473  if (eo.executor_type == ExecutorType::Native) {
1474  try {
1475  INJECT_TIMER(query_step_compilation);
1476  auto clock_begin = timer_start();
1477  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1478  compilation_queue_time_ms_ += timer_stop(clock_begin);
1479 
1480  query_mem_desc_owned =
1481  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1482  crt_min_byte_width,
1483  has_cardinality_estimation,
1484  ra_exe_unit,
1485  query_infos,
1486  deleted_cols_map,
1487  column_fetcher,
1488  {device_type,
1489  co.hoist_literals,
1490  co.opt_level,
1492  co.allow_lazy_fetch,
1494  co.explain_type,
1496  eo,
1497  render_info,
1498  this);
1499  CHECK(query_mem_desc_owned);
1500  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1501  } catch (CompilationRetryNoCompaction&) {
1502  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1503  continue;
1504  }
1505  } else {
1506  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1507  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1508  CHECK(!query_mem_desc_owned);
1509  query_mem_desc_owned.reset(
1511  }
1512  if (eo.just_explain) {
1513  return executeExplain(*query_comp_desc_owned);
1514  }
1515 
1516  for (const auto target_expr : ra_exe_unit.target_exprs) {
1517  plan_state_->target_exprs_.push_back(target_expr);
1518  }
1519 
1520  if (!eo.just_validate) {
1521  int available_cpus = cpu_threads();
1522  auto available_gpus = get_available_gpus(data_mgr_);
1523 
1524  const auto context_count =
1525  get_context_count(device_type, available_cpus, available_gpus.size());
1526  try {
1527  auto kernels = createKernels(shared_context,
1528  ra_exe_unit,
1529  column_fetcher,
1530  query_infos,
1531  eo,
1532  is_agg,
1533  allow_single_frag_table_opt,
1534  context_count,
1535  *query_comp_desc_owned,
1536  *query_mem_desc_owned,
1537  render_info,
1538  available_gpus,
1539  available_cpus);
1540  launchKernels(
1541  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1542  } catch (QueryExecutionError& e) {
1543  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1544  e.getErrorCode() == ERR_OUT_OF_TIME) {
1545  throw QueryExecutionError(ERR_INTERRUPTED);
1546  }
1547  if (e.getErrorCode() == ERR_INTERRUPTED) {
1548  throw QueryExecutionError(ERR_INTERRUPTED);
1549  }
1550  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1551  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1552  crt_min_byte_width <<= 1;
1553  continue;
1554  }
1555  throw;
1556  }
1557  }
1558  if (is_agg) {
1559  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1560  // update query status to let user know we are now in the reduction phase
1561  std::string curRunningSession{""};
1562  std::string curRunningQuerySubmittedTime{""};
1563  bool sessionEnrolled = false;
1564  {
1565  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
1566  curRunningSession = getCurrentQuerySession(session_read_lock);
1567  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1568  sessionEnrolled =
1569  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1570  }
1571  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1572  sessionEnrolled) {
1573  updateQuerySessionStatus(curRunningSession,
1574  curRunningQuerySubmittedTime,
1576  }
1577  }
1578  try {
1579  return collectAllDeviceResults(shared_context,
1580  ra_exe_unit,
1581  *query_mem_desc_owned,
1582  query_comp_desc_owned->getDeviceType(),
1583  row_set_mem_owner);
1584  } catch (ReductionRanOutOfSlots&) {
1585  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1586  } catch (OverflowOrUnderflow&) {
1587  crt_min_byte_width <<= 1;
1588  continue;
1589  } catch (QueryExecutionError& e) {
1590  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1591  << ", what(): " << e.what();
1592  throw QueryExecutionError(e.getErrorCode());
1593  }
1594  }
1595  return resultsUnion(shared_context, ra_exe_unit);
1596 
1597  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1598 
1599  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1602  nullptr,
1603  catalog_,
1604  blockSize(),
1605  gridSize());
1606 }
1607 
1609  const RelAlgExecutionUnit& ra_exe_unit_in,
1610  const InputTableInfo& table_info,
1611  const CompilationOptions& co,
1612  const ExecutionOptions& eo,
1614  PerFragmentCallBack& cb,
1615  const std::set<size_t>& fragment_indexes_param) {
1616  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1617  ColumnCacheMap column_cache;
1618 
1619  std::vector<InputTableInfo> table_infos{table_info};
1620  SharedKernelContext kernel_context(table_infos);
1621 
1622  ColumnFetcher column_fetcher(this, column_cache);
1623  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1624  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1625  {
1626  auto clock_begin = timer_start();
1627  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1628  compilation_queue_time_ms_ += timer_stop(clock_begin);
1629  query_mem_desc_owned =
1630  query_comp_desc_owned->compile(0,
1631  8,
1632  /*has_cardinality_estimation=*/false,
1633  ra_exe_unit,
1634  table_infos,
1635  deleted_cols_map,
1636  column_fetcher,
1637  co,
1638  eo,
1639  nullptr,
1640  this);
1641  }
1642  CHECK(query_mem_desc_owned);
1643  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1644  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1645  const auto& outer_fragments = table_info.info.fragments;
1646 
1647  std::set<size_t> fragment_indexes;
1648  if (fragment_indexes_param.empty()) {
1649  // An empty `fragment_indexes_param` set implies executing
1650  // the query for all fragments in the table. In this
1651  // case, populate `fragment_indexes` with all fragment indexes.
1652  for (size_t i = 0; i < outer_fragments.size(); i++) {
1653  fragment_indexes.emplace(i);
1654  }
1655  } else {
1656  fragment_indexes = fragment_indexes_param;
1657  }
1658 
1659  {
1660  auto clock_begin = timer_start();
1661  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1662  kernel_queue_time_ms_ += timer_stop(clock_begin);
1663 
1664  for (auto fragment_index : fragment_indexes) {
1665  // We may want to consider in the future allowing this to execute on devices other
1666  // than CPU
1667  FragmentsList fragments_list{{table_id, {fragment_index}}};
1668  ExecutionKernel kernel(ra_exe_unit,
1669  co.device_type,
1670  /*device_id=*/0,
1671  eo,
1672  column_fetcher,
1673  *query_comp_desc_owned,
1674  *query_mem_desc_owned,
1675  fragments_list,
1677  /*render_info=*/nullptr,
1678  /*rowid_lookup_key=*/-1);
1679  kernel.run(this, 0, kernel_context);
1680  }
1681  }
1682 
1683  const auto& all_fragment_results = kernel_context.getFragmentResults();
1684 
1685  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
1686  CHECK_EQ(result_fragment_indexes.size(), 1);
1687  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
1688  }
1689 }
1690 
1692  const TableFunctionExecutionUnit exe_unit,
1693  const std::vector<InputTableInfo>& table_infos,
1694  const CompilationOptions& co,
1695  const ExecutionOptions& eo,
1697  INJECT_TIMER(Exec_executeTableFunction);
1698 
1699  if (eo.just_validate) {
1701  /*entry_count=*/0,
1703  /*is_table_function=*/true);
1704  query_mem_desc.setOutputColumnar(true);
1705  return std::make_shared<ResultSet>(
1706  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
1707  co.device_type,
1708  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
1709  this->getRowSetMemoryOwner(),
1710  this->getCatalog(),
1711  this->blockSize(),
1712  this->gridSize());
1713  }
1714 
1715  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1716 
1717  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1718  // framework, we may want to move this up a level
1719 
1720  ColumnFetcher column_fetcher(this, column_cache);
1721 
1722  TableFunctionCompilationContext compilation_context;
1723  {
1724  auto clock_begin = timer_start();
1725  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1726  compilation_queue_time_ms_ += timer_stop(clock_begin);
1727  compilation_context.compile(exe_unit, co, this);
1728  }
1729 
1730  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1731  return exe_context.execute(
1732  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1733 }
1734 
1736  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1737 }
1738 
1740  const RelAlgExecutionUnit& ra_exe_unit,
1741  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
1742  TransientDictIdVisitor dict_id_visitor;
1743 
1744  auto visit_expr =
1745  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
1746  if (!expr) {
1747  return;
1748  }
1749  const auto dict_id = dict_id_visitor.visit(expr);
1750  if (dict_id >= 0) {
1751  auto sdp = getStringDictionaryProxy(dict_id, row_set_mem_owner, true);
1752  CHECK(sdp);
1753  TransientStringLiteralsVisitor visitor(sdp);
1754  visitor.visit(expr);
1755  }
1756  };
1757 
1758  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
1759  visit_expr(group_expr.get());
1760  }
1761 
1762  for (const auto& group_expr : ra_exe_unit.quals) {
1763  visit_expr(group_expr.get());
1764  }
1765 
1766  for (const auto& group_expr : ra_exe_unit.simple_quals) {
1767  visit_expr(group_expr.get());
1768  }
1769 
1770  for (const auto target_expr : ra_exe_unit.target_exprs) {
1771  const auto& target_type = target_expr->get_type_info();
1772  if (target_type.is_string() && target_type.get_compression() != kENCODING_DICT) {
1773  continue;
1774  }
1775  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
1776  if (agg_expr) {
1777  if (agg_expr->get_aggtype() == kSINGLE_VALUE ||
1778  agg_expr->get_aggtype() == kSAMPLE) {
1779  visit_expr(agg_expr->get_arg());
1780  }
1781  } else {
1782  visit_expr(target_expr);
1783  }
1784  }
1785 }
1786 
1788  const RelAlgExecutionUnit& ra_exe_unit,
1789  const ExecutorDeviceType requested_device_type) {
1790  for (const auto target_expr : ra_exe_unit.target_exprs) {
1791  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1792  if (!ra_exe_unit.groupby_exprs.empty() &&
1793  !isArchPascalOrLater(requested_device_type)) {
1794  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1795  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1796  return ExecutorDeviceType::CPU;
1797  }
1798  }
1799  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1800  return ExecutorDeviceType::CPU;
1801  }
1802  }
1803  return requested_device_type;
1804 }
1805 
1806 namespace {
1807 
1808 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1809  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1810  if (ti.is_fp()) {
1811  if (float_argument_input && ti.get_type() == kFLOAT) {
1812  int64_t float_null_val = 0;
1813  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1814  static_cast<float>(inline_fp_null_val(ti));
1815  return float_null_val;
1816  }
1817  const auto double_null_val = inline_fp_null_val(ti);
1818  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1819  }
1820  return inline_int_null_val(ti);
1821 }
1822 
1823 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1824  std::vector<int64_t>& entry,
1825  const std::vector<Analyzer::Expr*>& target_exprs,
1827  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1828  const auto target_expr = target_exprs[target_idx];
1829  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1830  CHECK(agg_info.is_agg);
1831  target_infos.push_back(agg_info);
1832  if (g_cluster) {
1833  const auto executor = query_mem_desc.getExecutor();
1834  CHECK(executor);
1835  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1836  CHECK(row_set_mem_owner);
1837  const auto& count_distinct_desc =
1838  query_mem_desc.getCountDistinctDescriptor(target_idx);
1839  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1840  CHECK(row_set_mem_owner);
1841  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1842  count_distinct_desc.bitmapPaddedSizeBytes(),
1843  /*thread_idx=*/0); // TODO: can we detect thread idx here?
1844  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1845  continue;
1846  }
1847  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1848  auto count_distinct_set = new std::set<int64_t>();
1849  CHECK(row_set_mem_owner);
1850  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1851  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1852  continue;
1853  }
1854  }
1855  const bool float_argument_input = takes_float_argument(agg_info);
1856  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1857  entry.push_back(0);
1858  } else if (agg_info.agg_kind == kAVG) {
1859  entry.push_back(0);
1860  entry.push_back(0);
1861  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1862  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
1863  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1864  entry.push_back(0);
1865  }
1866  } else if (agg_info.sql_type.is_varlen()) {
1867  entry.push_back(0);
1868  entry.push_back(0);
1869  } else {
1870  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1871  }
1872  } else {
1873  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1874  }
1875  }
1876 }
1877 
1879  const std::vector<Analyzer::Expr*>& target_exprs_in,
1881  const ExecutorDeviceType device_type) {
1882  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1883  std::vector<Analyzer::Expr*> target_exprs;
1884  for (const auto target_expr : target_exprs_in) {
1885  const auto target_expr_copy =
1886  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1887  CHECK(target_expr_copy);
1888  auto ti = target_expr->get_type_info();
1889  ti.set_notnull(false);
1890  target_expr_copy->set_type_info(ti);
1891  if (target_expr_copy->get_arg()) {
1892  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1893  arg_ti.set_notnull(false);
1894  target_expr_copy->get_arg()->set_type_info(arg_ti);
1895  }
1896  target_exprs_owned_copies.push_back(target_expr_copy);
1897  target_exprs.push_back(target_expr_copy.get());
1898  }
1899  std::vector<TargetInfo> target_infos;
1900  std::vector<int64_t> entry;
1901  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1902  const auto executor = query_mem_desc.getExecutor();
1903  CHECK(executor);
1904  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1905  CHECK(row_set_mem_owner);
1906  auto rs = std::make_shared<ResultSet>(target_infos,
1907  device_type,
1909  row_set_mem_owner,
1910  executor->getCatalog(),
1911  executor->blockSize(),
1912  executor->gridSize());
1913  rs->allocateStorage();
1914  rs->fillOneEntry(entry);
1915  return rs;
1916 }
1917 
1918 } // namespace
1919 
1921  SharedKernelContext& shared_context,
1922  const RelAlgExecutionUnit& ra_exe_unit,
1924  const ExecutorDeviceType device_type,
1925  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1926  auto timer = DEBUG_TIMER(__func__);
1927  auto& result_per_device = shared_context.getFragmentResults();
1928  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1931  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1932  }
1933  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1934  try {
1935  return reduceSpeculativeTopN(
1936  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1937  } catch (const std::bad_alloc&) {
1938  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1939  }
1940  }
1941  const auto shard_count =
1942  device_type == ExecutorDeviceType::GPU
1944  : 0;
1945 
1946  if (shard_count && !result_per_device.empty()) {
1947  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1948  }
1949  return reduceMultiDeviceResults(
1950  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1951 }
1952 
1953 namespace {
1964 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1965  const QueryMemoryDescriptor& input_query_mem_desc,
1966  const ResultSetStorage* output_storage,
1967  size_t output_row_index,
1968  const QueryMemoryDescriptor& output_query_mem_desc,
1969  const std::vector<uint32_t>& top_permutation) {
1970  const auto output_buffer = output_storage->getUnderlyingBuffer();
1971  const auto input_buffer = input_storage->getUnderlyingBuffer();
1972  for (const auto sorted_idx : top_permutation) {
1973  // permuting all group-columns in this result set into the final buffer:
1974  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1975  group_idx++) {
1976  const auto input_column_ptr =
1977  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1978  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1979  const auto output_column_ptr =
1980  output_buffer +
1981  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1982  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1983  memcpy(output_column_ptr,
1984  input_column_ptr,
1985  output_query_mem_desc.groupColWidth(group_idx));
1986  }
1987  // permuting all agg-columns in this result set into the final buffer:
1988  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1989  slot_idx++) {
1990  const auto input_column_ptr =
1991  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1992  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1993  const auto output_column_ptr =
1994  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1995  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1996  memcpy(output_column_ptr,
1997  input_column_ptr,
1998  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1999  }
2000  ++output_row_index;
2001  }
2002  return output_row_index;
2003 }
2004 
2014 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2015  const ResultSetStorage* output_storage,
2016  size_t output_row_index,
2017  const QueryMemoryDescriptor& output_query_mem_desc,
2018  const std::vector<uint32_t>& top_permutation) {
2019  const auto output_buffer = output_storage->getUnderlyingBuffer();
2020  const auto input_buffer = input_storage->getUnderlyingBuffer();
2021  for (const auto sorted_idx : top_permutation) {
2022  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2023  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2024  row_ptr,
2025  output_query_mem_desc.getRowSize());
2026  ++output_row_index;
2027  }
2028  return output_row_index;
2029 }
2030 } // namespace
2031 
2032 // Collect top results from each device, stitch them together and sort. Partial
2033 // results from each device are guaranteed to be disjunct because we only go on
2034 // this path when one of the columns involved is a shard key.
2036  SharedKernelContext& shared_context,
2037  const RelAlgExecutionUnit& ra_exe_unit) const {
2038  auto& result_per_device = shared_context.getFragmentResults();
2039  const auto first_result_set = result_per_device.front().first;
2040  CHECK(first_result_set);
2041  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2042  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2043  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2044  top_query_mem_desc.setEntryCount(0);
2045  for (auto& result : result_per_device) {
2046  const auto result_set = result.first;
2047  CHECK(result_set);
2048  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2049  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2050  top_query_mem_desc.setEntryCount(new_entry_cnt);
2051  }
2052  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2053  first_result_set->getDeviceType(),
2054  top_query_mem_desc,
2055  first_result_set->getRowSetMemOwner(),
2056  catalog_,
2057  blockSize(),
2058  gridSize());
2059  auto top_storage = top_result_set->allocateStorage();
2060  size_t top_output_row_idx{0};
2061  for (auto& result : result_per_device) {
2062  const auto result_set = result.first;
2063  CHECK(result_set);
2064  const auto& top_permutation = result_set->getPermutationBuffer();
2065  CHECK_LE(top_permutation.size(), top_n);
2066  if (top_query_mem_desc.didOutputColumnar()) {
2067  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2068  result_set->getQueryMemDesc(),
2069  top_storage,
2070  top_output_row_idx,
2071  top_query_mem_desc,
2072  top_permutation);
2073  } else {
2074  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2075  top_storage,
2076  top_output_row_idx,
2077  top_query_mem_desc,
2078  top_permutation);
2079  }
2080  }
2081  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2082  return top_result_set;
2083 }
2084 
2085 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2086  const {
2087  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2088  const auto& join_info = plan_state_->join_info_;
2089  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2090  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2091  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2092  id_to_cond.insert(
2093  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2094  }
2095  return id_to_cond;
2096 }
2097 
2098 namespace {
2099 
2100 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2101  for (const auto& col : fetched_cols) {
2102  if (col.is_lazily_fetched) {
2103  return true;
2104  }
2105  }
2106  return false;
2107 }
2108 
2109 } // namespace
2110 
2111 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2112  SharedKernelContext& shared_context,
2113  const RelAlgExecutionUnit& ra_exe_unit,
2114  ColumnFetcher& column_fetcher,
2115  const std::vector<InputTableInfo>& table_infos,
2116  const ExecutionOptions& eo,
2117  const bool is_agg,
2118  const bool allow_single_frag_table_opt,
2119  const size_t context_count,
2120  const QueryCompilationDescriptor& query_comp_desc,
2122  RenderInfo* render_info,
2123  std::unordered_set<int>& available_gpus,
2124  int& available_cpus) {
2125  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2126 
2127  QueryFragmentDescriptor fragment_descriptor(
2128  ra_exe_unit,
2129  table_infos,
2130  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2132  : std::vector<Data_Namespace::MemoryInfo>{},
2135  CHECK(!ra_exe_unit.input_descs.empty());
2136 
2137  const auto device_type = query_comp_desc.getDeviceType();
2138  const bool uses_lazy_fetch =
2139  plan_state_->allow_lazy_fetch_ &&
2140  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
2141  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2142  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2143  const auto device_count = deviceCount(device_type);
2144  CHECK_GT(device_count, 0);
2145 
2146  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2147  shared_context.getFragOffsets(),
2148  device_count,
2149  device_type,
2150  use_multifrag_kernel,
2152  this);
2153  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2154  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2155  }
2156 
2157  if (use_multifrag_kernel) {
2158  VLOG(1) << "Creating multifrag execution kernels";
2159  VLOG(1) << query_mem_desc.toString();
2160 
2161  // NB: We should never be on this path when the query is retried because of running
2162  // out of group by slots; also, for scan only queries on CPU we want the
2163  // high-granularity, fragment by fragment execution instead. For scan only queries on
2164  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2165  // buffer per fragment.
2166  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2167  &execution_kernels,
2168  &column_fetcher,
2169  &eo,
2170  &query_comp_desc,
2171  &query_mem_desc,
2172  render_info](const int device_id,
2173  const FragmentsList& frag_list,
2174  const int64_t rowid_lookup_key) {
2175  execution_kernels.emplace_back(
2176  std::make_unique<ExecutionKernel>(ra_exe_unit,
2178  device_id,
2179  eo,
2180  column_fetcher,
2181  query_comp_desc,
2182  query_mem_desc,
2183  frag_list,
2185  render_info,
2186  rowid_lookup_key));
2187  };
2188  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2189  } else {
2190  VLOG(1) << "Creating one execution kernel per fragment";
2191  VLOG(1) << query_mem_desc.toString();
2192 
2193  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2195  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2196  const auto max_frag_size =
2197  table_infos.front().info.getFragmentNumTuplesUpperBound();
2198  if (max_frag_size < query_mem_desc.getEntryCount()) {
2199  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2200  << " to match max fragment size " << max_frag_size
2201  << " for kernel per fragment execution path.";
2202  throw CompilationRetryNewScanLimit(max_frag_size);
2203  }
2204  }
2205 
2206  size_t frag_list_idx{0};
2207  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2208  &execution_kernels,
2209  &column_fetcher,
2210  &eo,
2211  &frag_list_idx,
2212  &device_type,
2213  &query_comp_desc,
2214  &query_mem_desc,
2215  render_info](const int device_id,
2216  const FragmentsList& frag_list,
2217  const int64_t rowid_lookup_key) {
2218  if (!frag_list.size()) {
2219  return;
2220  }
2221  CHECK_GE(device_id, 0);
2222 
2223  execution_kernels.emplace_back(
2224  std::make_unique<ExecutionKernel>(ra_exe_unit,
2225  device_type,
2226  device_id,
2227  eo,
2228  column_fetcher,
2229  query_comp_desc,
2230  query_mem_desc,
2231  frag_list,
2233  render_info,
2234  rowid_lookup_key));
2235  ++frag_list_idx;
2236  };
2237 
2238  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2239  ra_exe_unit);
2240  }
2241 
2242  return execution_kernels;
2243 }
2244 
2246  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2247  const ExecutorDeviceType device_type) {
2248  auto clock_begin = timer_start();
2249  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2250  kernel_queue_time_ms_ += timer_stop(clock_begin);
2251 
2253  // A hack to have unused unit for results collection.
2254  const RelAlgExecutionUnit* ra_exe_unit =
2255  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
2256 
2257 #ifdef HAVE_TBB
2258  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
2259  shared_context.setThreadPool(&tg);
2260  }
2261  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
2262 #endif // HAVE_TBB
2263 
2264  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
2265  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
2266  size_t kernel_idx = 1;
2267  for (auto& kernel : kernels) {
2268  CHECK(kernel.get());
2269  tg.run([this,
2270  &kernel,
2271  &shared_context,
2272  parent_thread_id = logger::thread_id(),
2273  crt_kernel_idx = kernel_idx++] {
2274  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2275  const size_t thread_i = crt_kernel_idx % cpu_threads();
2276  kernel->run(this, thread_i, shared_context);
2277  });
2278  }
2279  tg.wait();
2280 
2281  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2282  // The first arg is used for GPU only, it's not our case.
2283  // TODO: add QueryExecutionContext::getRowSet() interface
2284  // for our case.
2285  if (exec_ctx) {
2286  ResultSetPtr results;
2287  if (ra_exe_unit->estimator) {
2288  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2289  } else {
2290  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2291  }
2292  shared_context.addDeviceResults(std::move(results), {});
2293  }
2294  }
2295 }
2296 
2298  const RelAlgExecutionUnit& ra_exe_unit,
2299  const ExecutorDeviceType device_type,
2300  const size_t table_idx,
2301  const size_t outer_frag_idx,
2302  std::map<int, const TableFragments*>& selected_tables_fragments,
2303  const std::unordered_map<int, const Analyzer::BinOper*>&
2304  inner_table_id_to_join_condition) {
2305  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2306  auto table_frags_it = selected_tables_fragments.find(table_id);
2307  CHECK(table_frags_it != selected_tables_fragments.end());
2308  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2309  const auto outer_table_fragments_it =
2310  selected_tables_fragments.find(outer_input_desc.getTableId());
2311  const auto outer_table_fragments = outer_table_fragments_it->second;
2312  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2313  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2314  if (!table_idx) {
2315  return {outer_frag_idx};
2316  }
2317  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2318  auto& inner_frags = table_frags_it->second;
2319  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2320  std::vector<size_t> all_frag_ids;
2321  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2322  ++inner_frag_idx) {
2323  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2324  if (skipFragmentPair(outer_fragment_info,
2325  inner_frag_info,
2326  table_idx,
2327  inner_table_id_to_join_condition,
2328  ra_exe_unit,
2329  device_type)) {
2330  continue;
2331  }
2332  all_frag_ids.push_back(inner_frag_idx);
2333  }
2334  return all_frag_ids;
2335 }
2336 
2337 // Returns true iff the join between two fragments cannot yield any results, per
2338 // shard information. The pair can be skipped to avoid full broadcast.
2340  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2341  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2342  const int table_idx,
2343  const std::unordered_map<int, const Analyzer::BinOper*>&
2344  inner_table_id_to_join_condition,
2345  const RelAlgExecutionUnit& ra_exe_unit,
2346  const ExecutorDeviceType device_type) {
2347  if (device_type != ExecutorDeviceType::GPU) {
2348  return false;
2349  }
2350  CHECK(table_idx >= 0 &&
2351  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2352  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2353  // Both tables need to be sharded the same way.
2354  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2355  outer_fragment_info.shard == inner_fragment_info.shard) {
2356  return false;
2357  }
2358  const Analyzer::BinOper* join_condition{nullptr};
2359  if (ra_exe_unit.join_quals.empty()) {
2360  CHECK(!inner_table_id_to_join_condition.empty());
2361  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2362  CHECK(condition_it != inner_table_id_to_join_condition.end());
2363  join_condition = condition_it->second;
2364  CHECK(join_condition);
2365  } else {
2366  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2367  plan_state_->join_info_.join_hash_tables_.size());
2368  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2369  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2370  table_idx) {
2371  CHECK(!join_condition);
2372  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2373  }
2374  }
2375  }
2376  if (!join_condition) {
2377  return false;
2378  }
2379  // TODO(adb): support fragment skipping based on the overlaps operator
2380  if (join_condition->is_overlaps_oper()) {
2381  return false;
2382  }
2383  size_t shard_count{0};
2384  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2385  join_condition->get_left_operand())) {
2386  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
2387  join_condition, *getCatalog(), getTemporaryTables());
2389  join_condition, this, inner_outer_pairs);
2390  } else {
2391  shard_count = get_shard_count(join_condition, this);
2392  }
2393  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2394  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2395  }
2396  return shard_count;
2397 }
2398 
2399 namespace {
2400 
2403  const int table_id = col_desc->getScanDesc().getTableId();
2404  const int col_id = col_desc->getColId();
2405  return get_column_descriptor_maybe(col_id, table_id, cat);
2406 }
2407 
2408 } // namespace
2409 
2410 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2411  const std::vector<InputDescriptor>& input_descs,
2412  const std::map<int, const TableFragments*>& all_tables_fragments) {
2413  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2414  for (auto& desc : input_descs) {
2415  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2416  CHECK(fragments_it != all_tables_fragments.end());
2417  const auto& fragments = *fragments_it->second;
2418  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2419  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2420  frag_offsets[i] = off;
2421  off += fragments[i].getNumTuples();
2422  }
2423  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2424  }
2425  return tab_id_to_frag_offsets;
2426 }
2427 
2428 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2430  const RelAlgExecutionUnit& ra_exe_unit,
2431  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2432  const std::vector<InputDescriptor>& input_descs,
2433  const std::map<int, const TableFragments*>& all_tables_fragments) {
2434  std::vector<std::vector<int64_t>> all_num_rows;
2435  std::vector<std::vector<uint64_t>> all_frag_offsets;
2436  const auto tab_id_to_frag_offsets =
2437  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2438  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2439  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2440  std::vector<int64_t> num_rows;
2441  std::vector<uint64_t> frag_offsets;
2442  if (!ra_exe_unit.union_all) {
2443  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2444  }
2445  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2446  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2447  const auto fragments_it =
2448  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2449  CHECK(fragments_it != all_tables_fragments.end());
2450  const auto& fragments = *fragments_it->second;
2451  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2452  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2453  const auto& fragment = fragments[frag_id];
2454  num_rows.push_back(fragment.getNumTuples());
2455  } else {
2456  size_t total_row_count{0};
2457  for (const auto& fragment : fragments) {
2458  total_row_count += fragment.getNumTuples();
2459  }
2460  num_rows.push_back(total_row_count);
2461  }
2462  const auto frag_offsets_it =
2463  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2464  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2465  const auto& offsets = frag_offsets_it->second;
2466  CHECK_LT(frag_id, offsets.size());
2467  frag_offsets.push_back(offsets[frag_id]);
2468  }
2469  all_num_rows.push_back(num_rows);
2470  // Fragment offsets of outer table should be ONLY used by rowid for now.
2471  all_frag_offsets.push_back(frag_offsets);
2472  }
2473  return {all_num_rows, all_frag_offsets};
2474 }
2475 
2476 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2477 // all the table fragments.
2479  const RelAlgExecutionUnit& ra_exe_unit,
2480  const FragmentsList& selected_fragments) const {
2481  const auto& input_descs = ra_exe_unit.input_descs;
2482  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2483  if (nest_level < 1 ||
2484  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2485  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2486  (ra_exe_unit.join_quals.empty() &&
2487  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2488  return false;
2489  }
2490  const int table_id = inner_col_desc.getScanDesc().getTableId();
2491  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2492  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2493  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2494  return fragments.size() > 1;
2495 }
2496 
2498  const ColumnDescriptor* cd,
2499  const InputColDescriptor& inner_col_desc,
2500  const RelAlgExecutionUnit& ra_exe_unit,
2501  const FragmentsList& selected_fragments,
2502  const Data_Namespace::MemoryLevel memory_level) const {
2503  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2504  const int table_id = inner_col_desc.getScanDesc().getTableId();
2505  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2506  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2507  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2508  auto need_linearize =
2509  cd->columnType.is_array() ||
2511  return table_id > 0 && need_linearize && fragments.size() > 1;
2512 }
2513 
2514 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2515  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2516  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2517  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2518 }
2519 
2521  const ColumnFetcher& column_fetcher,
2522  const RelAlgExecutionUnit& ra_exe_unit,
2523  const int device_id,
2524  const Data_Namespace::MemoryLevel memory_level,
2525  const std::map<int, const TableFragments*>& all_tables_fragments,
2526  const FragmentsList& selected_fragments,
2528  std::list<ChunkIter>& chunk_iterators,
2529  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2530  DeviceAllocator* device_allocator,
2531  const size_t thread_idx,
2532  const bool allow_runtime_interrupt) {
2533  auto timer = DEBUG_TIMER(__func__);
2534  INJECT_TIMER(fetchChunks);
2535  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2536  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2537  std::vector<size_t> local_col_to_frag_pos;
2538  buildSelectedFragsMapping(selected_fragments_crossjoin,
2539  local_col_to_frag_pos,
2540  col_global_ids,
2541  selected_fragments,
2542  ra_exe_unit);
2543 
2545  selected_fragments_crossjoin);
2546  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2547  std::vector<std::vector<int64_t>> all_num_rows;
2548  std::vector<std::vector<uint64_t>> all_frag_offsets;
2549  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2550  std::vector<const int8_t*> frag_col_buffers(
2551  plan_state_->global_to_local_col_ids_.size());
2552  for (const auto& col_id : col_global_ids) {
2553  if (allow_runtime_interrupt) {
2554  bool isInterrupted = false;
2555  {
2556  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2557  const auto query_session = getCurrentQuerySession(session_read_lock);
2558  isInterrupted =
2559  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2560  }
2561  if (isInterrupted) {
2562  throw QueryExecutionError(ERR_INTERRUPTED);
2563  }
2564  }
2565  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2566  throw QueryExecutionError(ERR_INTERRUPTED);
2567  }
2568  CHECK(col_id);
2569  const int table_id = col_id->getScanDesc().getTableId();
2570  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2571  if (cd && cd->isVirtualCol) {
2572  CHECK_EQ("rowid", cd->columnName);
2573  continue;
2574  }
2575  const auto fragments_it = all_tables_fragments.find(table_id);
2576  CHECK(fragments_it != all_tables_fragments.end());
2577  const auto fragments = fragments_it->second;
2578  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2579  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2580  CHECK_LT(static_cast<size_t>(it->second),
2581  plan_state_->global_to_local_col_ids_.size());
2582  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2583  if (!fragments->size()) {
2584  return {};
2585  }
2586  CHECK_LT(frag_id, fragments->size());
2587  auto memory_level_for_column = memory_level;
2588  auto tbl_col_ids =
2589  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId());
2590  if (plan_state_->columns_to_fetch_.find(tbl_col_ids) ==
2591  plan_state_->columns_to_fetch_.end()) {
2592  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2593  }
2594  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2595  frag_col_buffers[it->second] =
2596  column_fetcher.getResultSetColumn(col_id.get(),
2597  memory_level_for_column,
2598  device_id,
2599  device_allocator,
2600  thread_idx);
2601  } else {
2602  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2603  // determine if we need special treatment to linearlize multi-frag table
2604  // i.e., a column that is classified as varlen type, i.e., array
2605  // for now, we only support fixed-length array that contains
2606  // geo point coordianates but we can support more types in this way
2607  if (needLinearizeAllFragments(
2608  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
2609  bool for_lazy_fetch = false;
2610  if (plan_state_->columns_to_not_fetch_.find(tbl_col_ids) !=
2611  plan_state_->columns_to_not_fetch_.end()) {
2612  for_lazy_fetch = true;
2613  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
2614  << ", col_name: " << cd->columnName << ")";
2615  }
2616  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
2617  table_id,
2618  col_id->getColId(),
2619  all_tables_fragments,
2620  chunks,
2621  chunk_iterators,
2622  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
2623  for_lazy_fetch ? 0 : device_id,
2624  device_allocator,
2625  thread_idx);
2626  } else {
2627  frag_col_buffers[it->second] =
2628  column_fetcher.getAllTableColumnFragments(table_id,
2629  col_id->getColId(),
2630  all_tables_fragments,
2631  memory_level_for_column,
2632  device_id,
2633  device_allocator,
2634  thread_idx);
2635  }
2636  } else {
2637  frag_col_buffers[it->second] =
2638  column_fetcher.getOneTableColumnFragment(table_id,
2639  frag_id,
2640  col_id->getColId(),
2641  all_tables_fragments,
2642  chunks,
2643  chunk_iterators,
2644  memory_level_for_column,
2645  device_id,
2646  device_allocator);
2647  }
2648  }
2649  }
2650  all_frag_col_buffers.push_back(frag_col_buffers);
2651  }
2652  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2653  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2654  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2655 }
2656 
2657 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2658 // This is written under the assumption that multiple inputs implies a UNION ALL.
2660  const ColumnFetcher& column_fetcher,
2661  const RelAlgExecutionUnit& ra_exe_unit,
2662  const int device_id,
2663  const Data_Namespace::MemoryLevel memory_level,
2664  const std::map<int, const TableFragments*>& all_tables_fragments,
2665  const FragmentsList& selected_fragments,
2667  std::list<ChunkIter>& chunk_iterators,
2668  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2669  DeviceAllocator* device_allocator,
2670  const size_t thread_idx,
2671  const bool allow_runtime_interrupt) {
2672  auto timer = DEBUG_TIMER(__func__);
2673  INJECT_TIMER(fetchUnionChunks);
2674 
2675  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2676  std::vector<std::vector<int64_t>> all_num_rows;
2677  std::vector<std::vector<uint64_t>> all_frag_offsets;
2678 
2679  CHECK(!selected_fragments.empty());
2680  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2681  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2682  using TableId = int;
2683  TableId const selected_table_id = selected_fragments.front().table_id;
2684  bool const input_descs_index =
2685  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2686  if (!input_descs_index) {
2687  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2688  }
2689  bool const input_col_descs_index =
2690  selected_table_id ==
2691  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2692  if (!input_col_descs_index) {
2693  CHECK_EQ(selected_table_id,
2694  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2695  }
2696  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2697  << " selected_table_id=" << selected_table_id
2698  << " input_descs_index=" << int(input_descs_index)
2699  << " input_col_descs_index=" << int(input_col_descs_index)
2700  << " ra_exe_unit.input_descs="
2701  << shared::printContainer(ra_exe_unit.input_descs)
2702  << " ra_exe_unit.input_col_descs="
2703  << shared::printContainer(ra_exe_unit.input_col_descs);
2704 
2705  // Partition col_global_ids by table_id
2706  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2707  table_id_to_input_col_descs;
2708  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2709  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2710  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2711  }
2712  for (auto const& pair : table_id_to_input_col_descs) {
2713  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2714  std::vector<size_t> local_col_to_frag_pos;
2715 
2716  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2717  local_col_to_frag_pos,
2718  pair.second,
2719  selected_fragments,
2720  ra_exe_unit);
2721 
2723  selected_fragments_crossjoin);
2724 
2725  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2726  if (allow_runtime_interrupt) {
2727  bool isInterrupted = false;
2728  {
2729  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2730  const auto query_session = getCurrentQuerySession(session_read_lock);
2731  isInterrupted =
2732  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2733  }
2734  if (isInterrupted) {
2735  throw QueryExecutionError(ERR_INTERRUPTED);
2736  }
2737  }
2738  std::vector<const int8_t*> frag_col_buffers(
2739  plan_state_->global_to_local_col_ids_.size());
2740  for (const auto& col_id : pair.second) {
2741  CHECK(col_id);
2742  const int table_id = col_id->getScanDesc().getTableId();
2743  CHECK_EQ(table_id, pair.first);
2744  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2745  if (cd && cd->isVirtualCol) {
2746  CHECK_EQ("rowid", cd->columnName);
2747  continue;
2748  }
2749  const auto fragments_it = all_tables_fragments.find(table_id);
2750  CHECK(fragments_it != all_tables_fragments.end());
2751  const auto fragments = fragments_it->second;
2752  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2753  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2754  CHECK_LT(static_cast<size_t>(it->second),
2755  plan_state_->global_to_local_col_ids_.size());
2756  const size_t frag_id = ra_exe_unit.union_all
2757  ? 0
2758  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2759  if (!fragments->size()) {
2760  return {};
2761  }
2762  CHECK_LT(frag_id, fragments->size());
2763  auto memory_level_for_column = memory_level;
2764  if (plan_state_->columns_to_fetch_.find(
2765  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2766  plan_state_->columns_to_fetch_.end()) {
2767  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2768  }
2769  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2770  frag_col_buffers[it->second] =
2771  column_fetcher.getResultSetColumn(col_id.get(),
2772  memory_level_for_column,
2773  device_id,
2774  device_allocator,
2775  thread_idx);
2776  } else {
2777  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2778  frag_col_buffers[it->second] =
2779  column_fetcher.getAllTableColumnFragments(table_id,
2780  col_id->getColId(),
2781  all_tables_fragments,
2782  memory_level_for_column,
2783  device_id,
2784  device_allocator,
2785  thread_idx);
2786  } else {
2787  frag_col_buffers[it->second] =
2788  column_fetcher.getOneTableColumnFragment(table_id,
2789  frag_id,
2790  col_id->getColId(),
2791  all_tables_fragments,
2792  chunks,
2793  chunk_iterators,
2794  memory_level_for_column,
2795  device_id,
2796  device_allocator);
2797  }
2798  }
2799  }
2800  all_frag_col_buffers.push_back(frag_col_buffers);
2801  }
2802  std::vector<std::vector<int64_t>> num_rows;
2803  std::vector<std::vector<uint64_t>> frag_offsets;
2804  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2805  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2806  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2807  all_frag_offsets.insert(
2808  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2809  }
2810  // The hack below assumes a particular table traversal order which is not
2811  // always achieved due to unordered map in the outermost loop. According
2812  // to the code below we expect NULLs in even positions of all_frag_col_buffers[0]
2813  // and odd positions of all_frag_col_buffers[1]. As an additional hack we
2814  // swap these vectors if NULLs are not on expected positions.
2815  if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
2816  !all_frag_col_buffers[0][1]) {
2817  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2818  }
2819  // UNION ALL hacks.
2820  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2821  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2822  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2823  }
2824  if (input_descs_index == input_col_descs_index) {
2825  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2826  }
2827 
2828  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2829  << " all_num_rows=" << shared::printContainer(all_num_rows)
2830  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2831  << " input_col_descs_index=" << input_col_descs_index;
2832  return {{all_frag_col_buffers[input_descs_index]},
2833  {{all_num_rows[0][input_descs_index]}},
2834  {{all_frag_offsets[0][input_descs_index]}}};
2835 }
2836 
2837 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2838  const size_t scan_idx,
2839  const RelAlgExecutionUnit& ra_exe_unit) {
2840  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2841  scan_idx > 0 &&
2842  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2843  !selected_fragments[scan_idx].fragment_ids.empty()) {
2844  // Fetch all fragments
2845  return {size_t(0)};
2846  }
2847 
2848  return selected_fragments[scan_idx].fragment_ids;
2849 }
2850 
2852  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2853  std::vector<size_t>& local_col_to_frag_pos,
2854  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2855  const FragmentsList& selected_fragments,
2856  const RelAlgExecutionUnit& ra_exe_unit) {
2857  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2858  size_t frag_pos{0};
2859  const auto& input_descs = ra_exe_unit.input_descs;
2860  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2861  const int table_id = input_descs[scan_idx].getTableId();
2862  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2863  selected_fragments_crossjoin.push_back(
2864  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2865  for (const auto& col_id : col_global_ids) {
2866  CHECK(col_id);
2867  const auto& input_desc = col_id->getScanDesc();
2868  if (input_desc.getTableId() != table_id ||
2869  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2870  continue;
2871  }
2872  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2873  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2874  CHECK_LT(static_cast<size_t>(it->second),
2875  plan_state_->global_to_local_col_ids_.size());
2876  local_col_to_frag_pos[it->second] = frag_pos;
2877  }
2878  ++frag_pos;
2879  }
2880 }
2881 
2883  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2884  std::vector<size_t>& local_col_to_frag_pos,
2885  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2886  const FragmentsList& selected_fragments,
2887  const RelAlgExecutionUnit& ra_exe_unit) {
2888  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2889  size_t frag_pos{0};
2890  const auto& input_descs = ra_exe_unit.input_descs;
2891  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2892  const int table_id = input_descs[scan_idx].getTableId();
2893  // selected_fragments here is from assignFragsToKernelDispatch
2894  // execution_kernel.fragments
2895  if (selected_fragments[0].table_id != table_id) { // TODO 0
2896  continue;
2897  }
2898  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2899  selected_fragments_crossjoin.push_back(
2900  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2901  {size_t(1)}); // TODO
2902  for (const auto& col_id : col_global_ids) {
2903  CHECK(col_id);
2904  const auto& input_desc = col_id->getScanDesc();
2905  if (input_desc.getTableId() != table_id ||
2906  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2907  continue;
2908  }
2909  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2910  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2911  CHECK_LT(static_cast<size_t>(it->second),
2912  plan_state_->global_to_local_col_ids_.size());
2913  local_col_to_frag_pos[it->second] = frag_pos;
2914  }
2915  ++frag_pos;
2916  }
2917 }
2918 
2919 namespace {
2920 
2922  public:
2923  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2925  for (auto out : out_vec_) {
2926  delete[] out;
2927  }
2928  }
2929 
2930  private:
2931  std::vector<int64_t*> out_vec_;
2932 };
2933 } // namespace
2934 
2936  const RelAlgExecutionUnit& ra_exe_unit,
2937  const CompilationResult& compilation_result,
2938  const bool hoist_literals,
2939  ResultSetPtr* results,
2940  const std::vector<Analyzer::Expr*>& target_exprs,
2941  const ExecutorDeviceType device_type,
2942  std::vector<std::vector<const int8_t*>>& col_buffers,
2943  QueryExecutionContext* query_exe_context,
2944  const std::vector<std::vector<int64_t>>& num_rows,
2945  const std::vector<std::vector<uint64_t>>& frag_offsets,
2946  Data_Namespace::DataMgr* data_mgr,
2947  const int device_id,
2948  const uint32_t start_rowid,
2949  const uint32_t num_tables,
2950  const bool allow_runtime_interrupt,
2951  RenderInfo* render_info,
2952  const int64_t rows_to_process) {
2953  INJECT_TIMER(executePlanWithoutGroupBy);
2954  auto timer = DEBUG_TIMER(__func__);
2955  CHECK(!results || !(*results));
2956  if (col_buffers.empty()) {
2957  return 0;
2958  }
2959 
2960  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2961  if (render_info) {
2962  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2963  // here, we are in non-insitu mode.
2964  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2965  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2966  "currently unsupported.";
2967  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2968  }
2969 
2970  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2971  std::vector<int64_t*> out_vec;
2972  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2973  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2974  std::unique_ptr<OutVecOwner> output_memory_scope;
2975  if (allow_runtime_interrupt) {
2976  bool isInterrupted = false;
2977  {
2978  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2979  const auto query_session = getCurrentQuerySession(session_read_lock);
2980  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2981  }
2982  if (isInterrupted) {
2983  throw QueryExecutionError(ERR_INTERRUPTED);
2984  }
2985  }
2986  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2987  throw QueryExecutionError(ERR_INTERRUPTED);
2988  }
2989  if (device_type == ExecutorDeviceType::CPU) {
2990  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2991  compilation_result.generated_code);
2992  CHECK(cpu_generated_code);
2993  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2994  cpu_generated_code.get(),
2995  hoist_literals,
2996  hoist_buf,
2997  col_buffers,
2998  num_rows,
2999  frag_offsets,
3000  0,
3001  &error_code,
3002  num_tables,
3003  join_hash_table_ptrs,
3004  rows_to_process);
3005  output_memory_scope.reset(new OutVecOwner(out_vec));
3006  } else {
3007  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3008  compilation_result.generated_code);
3009  CHECK(gpu_generated_code);
3010  try {
3011  out_vec = query_exe_context->launchGpuCode(
3012  ra_exe_unit,
3013  gpu_generated_code.get(),
3014  hoist_literals,
3015  hoist_buf,
3016  col_buffers,
3017  num_rows,
3018  frag_offsets,
3019  0,
3020  data_mgr,
3021  blockSize(),
3022  gridSize(),
3023  device_id,
3024  compilation_result.gpu_smem_context.getSharedMemorySize(),
3025  &error_code,
3026  num_tables,
3027  allow_runtime_interrupt,
3028  join_hash_table_ptrs,
3029  render_allocator_map_ptr);
3030  output_memory_scope.reset(new OutVecOwner(out_vec));
3031  } catch (const OutOfMemory&) {
3032  return ERR_OUT_OF_GPU_MEM;
3033  } catch (const std::exception& e) {
3034  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3035  }
3036  }
3037  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3038  error_code == Executor::ERR_DIV_BY_ZERO ||
3039  error_code == Executor::ERR_OUT_OF_TIME ||
3040  error_code == Executor::ERR_INTERRUPTED ||
3042  error_code == Executor::ERR_GEOS ||
3044  return error_code;
3045  }
3046  if (ra_exe_unit.estimator) {
3047  CHECK(!error_code);
3048  if (results) {
3049  *results =
3050  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3051  }
3052  return 0;
3053  }
3054  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3055  CHECK(results);
3056  std::vector<int64_t> reduced_outs;
3057  const auto num_frags = col_buffers.size();
3058  const size_t entry_count =
3059  device_type == ExecutorDeviceType::GPU
3060  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3061  ? 1
3062  : blockSize() * gridSize() * num_frags)
3063  : num_frags;
3064  if (size_t(1) == entry_count) {
3065  for (auto out : out_vec) {
3066  CHECK(out);
3067  reduced_outs.push_back(*out);
3068  }
3069  } else {
3070  size_t out_vec_idx = 0;
3071 
3072  for (const auto target_expr : target_exprs) {
3073  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3074  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3075  << target_expr->toString();
3076 
3077  const int num_iterations = agg_info.sql_type.is_geometry()
3078  ? agg_info.sql_type.get_physical_coord_cols()
3079  : 1;
3080 
3081  for (int i = 0; i < num_iterations; i++) {
3082  int64_t val1;
3083  const bool float_argument_input = takes_float_argument(agg_info);
3084  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_QUANTILE) {
3085  CHECK(agg_info.agg_kind == kCOUNT ||
3086  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
3087  agg_info.agg_kind == kAPPROX_QUANTILE);
3088  val1 = out_vec[out_vec_idx][0];
3089  error_code = 0;
3090  } else {
3091  const auto chosen_bytes = static_cast<size_t>(
3092  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3093  std::tie(val1, error_code) = Executor::reduceResults(
3094  agg_info.agg_kind,
3095  agg_info.sql_type,
3096  query_exe_context->getAggInitValForIndex(out_vec_idx),
3097  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3098  out_vec[out_vec_idx],
3099  entry_count,
3100  false,
3101  float_argument_input);
3102  }
3103  if (error_code) {
3104  break;
3105  }
3106  reduced_outs.push_back(val1);
3107  if (agg_info.agg_kind == kAVG ||
3108  (agg_info.agg_kind == kSAMPLE &&
3109  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3110  const auto chosen_bytes = static_cast<size_t>(
3111  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3112  1));
3113  int64_t val2;
3114  std::tie(val2, error_code) = Executor::reduceResults(
3115  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3116  agg_info.sql_type,
3117  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3118  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3119  out_vec[out_vec_idx + 1],
3120  entry_count,
3121  false,
3122  false);
3123  if (error_code) {
3124  break;
3125  }
3126  reduced_outs.push_back(val2);
3127  ++out_vec_idx;
3128  }
3129  ++out_vec_idx;
3130  }
3131  }
3132  }
3133 
3134  if (error_code) {
3135  return error_code;
3136  }
3137 
3138  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3139  auto rows_ptr = std::shared_ptr<ResultSet>(
3140  query_exe_context->query_buffers_->result_sets_[0].release());
3141  rows_ptr->fillOneEntry(reduced_outs);
3142  *results = std::move(rows_ptr);
3143  return error_code;
3144 }
3145 
3146 namespace {
3147 
3148 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3149  CHECK(scan_limit);
3150  return results && results->rowCount() < scan_limit;
3151 }
3152 
3153 } // namespace
3154 
3156  const RelAlgExecutionUnit& ra_exe_unit,
3157  const CompilationResult& compilation_result,
3158  const bool hoist_literals,
3159  ResultSetPtr* results,
3160  const ExecutorDeviceType device_type,
3161  std::vector<std::vector<const int8_t*>>& col_buffers,
3162  const std::vector<size_t> outer_tab_frag_ids,
3163  QueryExecutionContext* query_exe_context,
3164  const std::vector<std::vector<int64_t>>& num_rows,
3165  const std::vector<std::vector<uint64_t>>& frag_offsets,
3166  Data_Namespace::DataMgr* data_mgr,
3167  const int device_id,
3168  const int outer_table_id,
3169  const int64_t scan_limit,
3170  const uint32_t start_rowid,
3171  const uint32_t num_tables,
3172  const bool allow_runtime_interrupt,
3173  RenderInfo* render_info,
3174  const int64_t rows_to_process) {
3175  auto timer = DEBUG_TIMER(__func__);
3176  INJECT_TIMER(executePlanWithGroupBy);
3177  // TODO: get results via a separate method, but need to do something with literals.
3178  CHECK(!results || !(*results));
3179  if (col_buffers.empty()) {
3180  return 0;
3181  }
3182  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3183  // TODO(alex):
3184  // 1. Optimize size (make keys more compact).
3185  // 2. Resize on overflow.
3186  // 3. Optimize runtime.
3187  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3188  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3189  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3190  if (allow_runtime_interrupt) {
3191  bool isInterrupted = false;
3192  {
3193  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3194  const auto query_session = getCurrentQuerySession(session_read_lock);
3195  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3196  }
3197  if (isInterrupted) {
3198  throw QueryExecutionError(ERR_INTERRUPTED);
3199  }
3200  }
3201  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3202  return ERR_INTERRUPTED;
3203  }
3204 
3205  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3206  if (render_info && render_info->useCudaBuffers()) {
3207  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3208  }
3209 
3210  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3211  << " ra_exe_unit.input_descs="
3212  << shared::printContainer(ra_exe_unit.input_descs)
3213  << " ra_exe_unit.input_col_descs="
3214  << shared::printContainer(ra_exe_unit.input_col_descs)
3215  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3216  << " num_rows=" << shared::printContainer(num_rows)
3217  << " frag_offsets=" << shared::printContainer(frag_offsets)
3218  << " query_exe_context->query_buffers_->num_rows_="
3219  << query_exe_context->query_buffers_->num_rows_
3220  << " query_exe_context->query_mem_desc_.getEntryCount()="
3221  << query_exe_context->query_mem_desc_.getEntryCount()
3222  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3223  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3224  << " num_tables=" << num_tables;
3225 
3226  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3227  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3228  // with outer_table_id.
3229  if (ra_exe_unit_copy.union_all) {
3230  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3231  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3232  ra_exe_unit_copy.input_descs.end(),
3233  [outer_table_id](auto const& a, auto const& b) {
3234  return a.getTableId() == outer_table_id &&
3235  b.getTableId() != outer_table_id;
3236  });
3237  while (!ra_exe_unit_copy.input_descs.empty() &&
3238  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3239  ra_exe_unit_copy.input_descs.pop_back();
3240  }
3241  // Filter ra_exe_unit_copy.input_col_descs.
3242  ra_exe_unit_copy.input_col_descs.remove_if(
3243  [outer_table_id](auto const& input_col_desc) {
3244  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3245  });
3246  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3247  }
3248 
3249  if (device_type == ExecutorDeviceType::CPU) {
3250  const int32_t scan_limit_for_query =
3251  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3252  const int32_t max_matched = scan_limit_for_query == 0
3253  ? query_exe_context->query_mem_desc_.getEntryCount()
3254  : scan_limit_for_query;
3255 
3256  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3257  compilation_result.generated_code);
3258  CHECK(cpu_generated_code);
3259  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3260  cpu_generated_code.get(),
3261  hoist_literals,
3262  hoist_buf,
3263  col_buffers,
3264  num_rows,
3265  frag_offsets,
3266  max_matched,
3267  &error_code,
3268  num_tables,
3269  join_hash_table_ptrs,
3270  rows_to_process);
3271  } else {
3272  try {
3273  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3274  compilation_result.generated_code);
3275  CHECK(gpu_generated_code);
3276  query_exe_context->launchGpuCode(
3277  ra_exe_unit_copy,
3278  gpu_generated_code.get(),
3279  hoist_literals,
3280  hoist_buf,
3281  col_buffers,
3282  num_rows,
3283  frag_offsets,
3284  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3285  data_mgr,
3286  blockSize(),
3287  gridSize(),
3288  device_id,
3289  compilation_result.gpu_smem_context.getSharedMemorySize(),
3290  &error_code,
3291  num_tables,
3292  allow_runtime_interrupt,
3293  join_hash_table_ptrs,
3294  render_allocator_map_ptr);
3295  } catch (const OutOfMemory&) {
3296  return ERR_OUT_OF_GPU_MEM;
3297  } catch (const OutOfRenderMemory&) {
3298  return ERR_OUT_OF_RENDER_MEM;
3299  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3300  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
3301  } catch (const std::exception& e) {
3302  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3303  }
3304  }
3305 
3306  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3307  error_code == Executor::ERR_DIV_BY_ZERO ||
3308  error_code == Executor::ERR_OUT_OF_TIME ||
3309  error_code == Executor::ERR_INTERRUPTED ||
3311  error_code == Executor::ERR_GEOS ||
3313  return error_code;
3314  }
3315 
3316  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3317  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3318  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
3319  query_exe_context->query_mem_desc_);
3320  CHECK(*results);
3321  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
3322  (*results)->holdLiterals(hoist_buf);
3323  }
3324  if (error_code < 0 && render_allocator_map_ptr) {
3325  auto const adjusted_scan_limit =
3326  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3327  // More rows passed the filter than available slots. We don't have a count to check,
3328  // so assume we met the limit if a scan limit is set
3329  if (adjusted_scan_limit != 0) {
3330  return 0;
3331  } else {
3332  return error_code;
3333  }
3334  }
3335  if (results && error_code &&
3336  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
3337  return error_code; // unlucky, not enough results and we ran out of slots
3338  }
3339 
3340  return 0;
3341 }
3342 
3343 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3344  const int device_id) {
3345  std::vector<int8_t*> table_ptrs;
3346  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3347  for (auto hash_table : join_hash_tables) {
3348  if (!hash_table) {
3349  CHECK(table_ptrs.empty());
3350  return {};
3351  }
3352  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3353  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3354  }
3355  return table_ptrs;
3356 }
3357 
3358 void Executor::nukeOldState(const bool allow_lazy_fetch,
3359  const std::vector<InputTableInfo>& query_infos,
3360  const PlanState::DeletedColumnsMap& deleted_cols_map,
3361  const RelAlgExecutionUnit* ra_exe_unit) {
3362  kernel_queue_time_ms_ = 0;
3363  compilation_queue_time_ms_ = 0;
3364  const bool contains_left_deep_outer_join =
3365  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3366  ra_exe_unit->join_quals.end(),
3367  [](const JoinCondition& join_condition) {
3368  return join_condition.type == JoinType::LEFT;
3369  }) != ra_exe_unit->join_quals.end();
3370  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3371  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3372  query_infos,
3373  deleted_cols_map,
3374  this));
3375 }
3376 
3377 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3378  const std::vector<InputTableInfo>& query_infos) {
3379  AUTOMATIC_IR_METADATA(cgen_state_.get());
3380  const auto ld_count = input_descs.size();
3381  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3382  for (size_t i = 0; i < ld_count; ++i) {
3383  CHECK_LT(i, query_infos.size());
3384  const auto frag_count = query_infos[i].info.fragments.size();
3385  if (i > 0) {
3386  cgen_state_->frag_offsets_.push_back(nullptr);
3387  } else {
3388  if (frag_count > 1) {
3389  cgen_state_->frag_offsets_.push_back(
3390  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3391  } else {
3392  cgen_state_->frag_offsets_.push_back(nullptr);
3393  }
3394  }
3395  }
3396 }
3397 
3399  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3400  const std::vector<InputTableInfo>& query_infos,
3401  const MemoryLevel memory_level,
3402  const JoinType join_type,
3403  const HashType preferred_hash_type,
3404  ColumnCacheMap& column_cache,
3405  const HashTableBuildDagMap& hashtable_build_dag_map,
3406  const RegisteredQueryHint& query_hint,
3407  const TableIdToNodeMap& table_id_to_node_map) {
3408  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3409  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3410  }
3411  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3412  throw QueryExecutionError(ERR_INTERRUPTED);
3413  }
3414  try {
3415  auto tbl = HashJoin::getInstance(qual_bin_oper,
3416  query_infos,
3417  memory_level,
3418  join_type,
3419  preferred_hash_type,
3420  deviceCountForMemoryLevel(memory_level),
3421  column_cache,
3422  this,
3423  hashtable_build_dag_map,
3424  query_hint,
3425  table_id_to_node_map);
3426  return {tbl, ""};
3427  } catch (const HashJoinFail& e) {
3428  return {nullptr, e.what()};
3429  }
3430 }
3431 
3432 int8_t Executor::warpSize() const {
3433  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3434  CHECK(!dev_props.empty());
3435  return dev_props.front().warpSize;
3436 }
3437 
3438 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3439 // exist?
3440 unsigned Executor::gridSize() const {
3441  CHECK(data_mgr_);
3442  const auto cuda_mgr = data_mgr_->getCudaMgr();
3443  if (!cuda_mgr) {
3444  return 0;
3445  }
3446  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3447 }
3448 
3449 unsigned Executor::numBlocksPerMP() const {
3450  return grid_size_x_ ? std::ceil(grid_size_x_ / cudaMgr()->getMinNumMPsForAllDevices())
3451  : 2;
3452 }
3453 
3454 unsigned Executor::blockSize() const {
3455  CHECK(data_mgr_);
3456  const auto cuda_mgr = data_mgr_->getCudaMgr();
3457  if (!cuda_mgr) {
3458  return 0;
3459  }
3460  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3461  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3462 }
3463 
3465  return max_gpu_slab_size_;
3466 }
3467 
3468 int64_t Executor::deviceCycles(int milliseconds) const {
3469  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3470  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3471 }
3472 
3473 llvm::Value* Executor::castToFP(llvm::Value* value,
3474  SQLTypeInfo const& from_ti,
3475  SQLTypeInfo const& to_ti) {
3476  AUTOMATIC_IR_METADATA(cgen_state_.get());
3477  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3478  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3479  llvm::Type* fp_type{nullptr};
3480  switch (to_ti.get_size()) {
3481  case 4:
3482  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3483  break;
3484  case 8:
3485  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3486  break;
3487  default:
3488  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3489  }
3490  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3491  if (from_ti.get_scale()) {
3492  value = cgen_state_->ir_builder_.CreateFDiv(
3493  value,
3494  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3495  }
3496  }
3497  return value;
3498 }
3499 
3500 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3501  AUTOMATIC_IR_METADATA(cgen_state_.get());
3502  CHECK(val->getType()->isPointerTy());
3503 
3504  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3505  const auto val_type = val_ptr_type->getElementType();
3506  size_t val_width = 0;
3507  if (val_type->isIntegerTy()) {
3508  val_width = val_type->getIntegerBitWidth();
3509  } else {
3510  if (val_type->isFloatTy()) {
3511  val_width = 32;
3512  } else {
3513  CHECK(val_type->isDoubleTy());
3514  val_width = 64;
3515  }
3516  }
3517  CHECK_LT(size_t(0), val_width);
3518  if (bitWidth == val_width) {
3519  return val;
3520  }
3521  return cgen_state_->ir_builder_.CreateBitCast(
3522  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3523 }
3524 
3525 #define EXECUTE_INCLUDE
3526 #include "ArrayOps.cpp"
3527 #include "DateAdd.cpp"
3528 #include "GeoOps.cpp"
3529 #include "StringFunctions.cpp"
3531 #undef EXECUTE_INCLUDE
3532 
3533 namespace {
3535  const ColumnDescriptor* deleted_cd) {
3536  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3537  if (deleted_cols_it == deleted_cols_map.end()) {
3538  CHECK(
3539  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3540  } else {
3541  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3542  }
3543 }
3544 } // namespace
3545 
3546 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3547  const RelAlgExecutionUnit& ra_exe_unit,
3548  const CompilationOptions& co) {
3549  if (!co.filter_on_deleted_column) {
3550  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3551  }
3552  auto ra_exe_unit_with_deleted = ra_exe_unit;
3553  PlanState::DeletedColumnsMap deleted_cols_map;
3554  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3555  if (input_table.getSourceType() != InputSourceType::TABLE) {
3556  continue;
3557  }
3558  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3559  CHECK(td);
3560  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3561  if (!deleted_cd) {
3562  continue;
3563  }
3564  CHECK(deleted_cd->columnType.is_boolean());
3565  // check deleted column is not already present
3566  bool found = false;
3567  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3568  if (input_col.get()->getColId() == deleted_cd->columnId &&
3569  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3570  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3571  found = true;
3572  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3573  break;
3574  }
3575  }
3576  if (!found) {
3577  // add deleted column
3578  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3579  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3580  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3581  }
3582  }
3583  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3584 }
3585 
3586 namespace {
3587 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3588 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3589 // return type.
3590 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3591  const int64_t chunk_min,
3592  const int64_t chunk_max,
3593  const SQLTypeInfo& lhs_type,
3594  const SQLTypeInfo& rhs_type) {
3595  const int32_t ldim = lhs_type.get_dimension();
3596  const int32_t rdim = rhs_type.get_dimension();
3597  CHECK(ldim != rdim);
3598  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3599  if (ldim > rdim) {
3600  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3601  return {true, chunk_min / scale, chunk_max / scale};
3602  }
3603 
3604  using checked_int64_t = boost::multiprecision::number<
3605  boost::multiprecision::cpp_int_backend<64,
3606  64,
3607  boost::multiprecision::signed_magnitude,
3608  boost::multiprecision::checked,
3609  void>>;
3610 
3611  try {
3612  auto ret =
3613  std::make_tuple(true,
3614  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3615  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3616  return ret;
3617  } catch (const std::overflow_error& e) {
3618  // noop
3619  }
3620  return std::make_tuple(false, chunk_min, chunk_max);
3621 }
3622 
3623 } // namespace
3624 
3626  const int table_id,
3627  const Fragmenter_Namespace::FragmentInfo& fragment) {
3628  // Skip temporary tables
3629  if (table_id < 0) {
3630  return false;
3631  }
3632 
3633  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
3634  CHECK(td);
3635  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3636  if (!deleted_cd) {
3637  return false;
3638  }
3639 
3640  const auto& chunk_type = deleted_cd->columnType;
3641  CHECK(chunk_type.is_boolean());
3642 
3643  const auto deleted_col_id = deleted_cd->columnId;
3644  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
3645  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
3646  const int64_t chunk_min =
3647  extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3648  const int64_t chunk_max =
3649  extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3650  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
3651  // is true (signifying all rows deleted)
3652  return true;
3653  }
3654  }
3655  return false;
3656 }
3657 
3658 std::pair<bool, int64_t> Executor::skipFragment(
3659  const InputDescriptor& table_desc,
3660  const Fragmenter_Namespace::FragmentInfo& fragment,
3661  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3662  const std::vector<uint64_t>& frag_offsets,
3663  const size_t frag_idx) {
3664  const int table_id = table_desc.getTableId();
3665 
3666  // First check to see if all of fragment is deleted, in which case we know we can skip
3667  if (isFragmentFullyDeleted(table_id, fragment)) {
3668  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
3669  << ", fragment id: " << frag_idx;
3670  return {true, -1};
3671  }
3672 
3673  for (const auto& simple_qual : simple_quals) {
3674  const auto comp_expr =
3675  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3676  if (!comp_expr) {
3677  // is this possible?
3678  return {false, -1};
3679  }
3680  const auto lhs = comp_expr->get_left_operand();
3681  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3682  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3683  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3684  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3685  if (lhs_uexpr) {
3686  CHECK(lhs_uexpr->get_optype() ==
3687  kCAST); // We should have only been passed a cast expression
3688  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3689  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3690  continue;
3691  }
3692  } else {
3693  continue;
3694  }
3695  }
3696  const auto rhs = comp_expr->get_right_operand();
3697  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3698  if (!rhs_const) {
3699  // is this possible?
3700  return {false, -1};
3701  }
3702  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3703  continue;
3704  }
3705  const int col_id = lhs_col->get_column_id();
3706  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3707  int64_t chunk_min{0};
3708  int64_t chunk_max{0};
3709  bool is_rowid{false};
3710  size_t start_rowid{0};
3711  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3712  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3713  if (cd->isVirtualCol) {
3714  CHECK(cd->columnName == "rowid");
3715  const auto& table_generation = getTableGeneration(table_id);
3716  start_rowid = table_generation.start_rowid;
3717  chunk_min = frag_offsets[frag_idx] + start_rowid;
3718  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3719  is_rowid = true;
3720  }
3721  } else {
3722  const auto& chunk_type = lhs_col->get_type_info();
3723  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3724  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3725  }
3726  if (chunk_min > chunk_max) {
3727  // invalid metadata range, do not skip fragment
3728  return {false, -1};
3729  }
3730  if (lhs->get_type_info().is_timestamp() &&
3731  (lhs_col->get_type_info().get_dimension() !=
3732  rhs_const->get_type_info().get_dimension()) &&
3733  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3734  rhs_const->get_type_info().is_high_precision_timestamp())) {
3735  // If original timestamp lhs col has different precision,
3736  // column metadata holds value in original precision
3737  // therefore adjust rhs value to match lhs precision
3738 
3739  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3740  // artificially limit the lhs column range. RHS overflow/underflow is already
3741  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3742  bool is_valid;
3743  std::tie(is_valid, chunk_min, chunk_max) =
3745  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3746  if (!is_valid) {
3747  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3748  "value: "
3749  << std::to_string(chunk_min)
3750  << "\nChunk max value: " << std::to_string(chunk_max)
3751  << "\nLHS col precision is: "
3752  << std::to_string(lhs_col->get_type_info().get_dimension())
3753  << "\nRHS precision is: "
3754  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3755  return {false, -1};
3756  }
3757  }
3758  llvm::LLVMContext local_context;
3759  CgenState local_cgen_state(local_context);
3760  CodeGenerator code_generator(&local_cgen_state, nullptr);
3761 
3762  const auto rhs_val =
3763  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3764 
3765  switch (comp_expr->get_optype()) {
3766  case kGE:
3767  if (chunk_max < rhs_val) {
3768  return {true, -1};
3769  }
3770  break;
3771  case kGT:
3772  if (chunk_max <= rhs_val) {
3773  return {true, -1};
3774  }
3775  break;
3776  case kLE:
3777  if (chunk_min > rhs_val) {
3778  return {true, -1};
3779  }
3780  break;
3781  case kLT:
3782  if (chunk_min >= rhs_val) {
3783  return {true, -1};
3784  }
3785  break;
3786  case kEQ:
3787  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3788  return {true, -1};
3789  } else if (is_rowid) {
3790  return {false, rhs_val - start_rowid};
3791  }
3792  break;
3793  default:
3794  break;
3795  }
3796  }
3797  return {false, -1};
3798 }
3799 
3800 /*
3801  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3802  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3803  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3804  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3805  * if at least one of these skipFragment calls return a true statment in its first value.
3806  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3807  * second value only if its first value is "false".
3808  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3809  * i.e., we only skip if none of the quals trigger the code to update the
3810  * rowid_lookup_key
3811  * - Only AND operations are valid and considered:
3812  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3813  * the skip
3814  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3815  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3816  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3817  * implemented differently, which cause the first one to skip correctly, but the second
3818  * one will not skip.
3819  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3820  * possible
3821  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3822  * projection, no skipping
3823  */
3824 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3825  const InputDescriptor& table_desc,
3826  const RelAlgExecutionUnit& ra_exe_unit,
3827  const Fragmenter_Namespace::FragmentInfo& fragment,
3828  const std::vector<uint64_t>& frag_offsets,
3829  const size_t frag_idx) {
3830  std::pair<bool, int64_t> skip_frag{false, -1};
3831  for (auto& inner_join : ra_exe_unit.join_quals) {
3832  if (inner_join.type != JoinType::INNER) {
3833  continue;
3834  }
3835 
3836  // extracting all the conjunctive simple_quals from the quals stored for the inner
3837  // join
3838  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3839  for (auto& qual : inner_join.quals) {
3840  auto temp_qual = qual_to_conjunctive_form(qual);
3841  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3842  temp_qual.simple_quals.begin(),
3843  temp_qual.simple_quals.end());
3844  }
3845  auto temp_skip_frag = skipFragment(
3846  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3847  if (temp_skip_frag.second != -1) {
3848  skip_frag.second = temp_skip_frag.second;
3849  return skip_frag;
3850  } else {
3851  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3852  }
3853  }
3854  return skip_frag;
3855 }
3856 
3858  const std::unordered_set<PhysicalInput>& phys_inputs) {
3859  AggregatedColRange agg_col_range_cache;
3860  CHECK(catalog_);
3861  std::unordered_set<int> phys_table_ids;
3862  for (const auto& phys_input : phys_inputs) {
3863  phys_table_ids.insert(phys_input.table_id);
3864  }
3865  std::vector<InputTableInfo> query_infos;
3866  for (const int table_id : phys_table_ids) {
3867  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3868  }
3869  for (const auto& phys_input : phys_inputs) {
3870  const auto cd =
3871  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3872  CHECK(cd);
3873  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3874  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3875  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3876  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3877  agg_col_range_cache.setColRange(phys_input, col_range);
3878  }
3879  }
3880  return agg_col_range_cache;
3881 }
3882 
3884  const std::unordered_set<PhysicalInput>& phys_inputs) {
3885  StringDictionaryGenerations string_dictionary_generations;
3886  CHECK(catalog_);
3887  for (const auto& phys_input : phys_inputs) {
3888  const auto cd =
3889  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3890  CHECK(cd);
3891  const auto& col_ti =
3892  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3893  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3894  const int dict_id = col_ti.get_comp_param();
3895  const auto dd = catalog_->getMetadataForDict(dict_id);
3896  CHECK(dd && dd->stringDict);
3897  string_dictionary_generations.setGeneration(dict_id,
3898  dd->stringDict->storageEntryCount());
3899  }
3900  }
3901  return string_dictionary_generations;
3902 }
3903 
3905  std::unordered_set<int> phys_table_ids) {
3906  TableGenerations table_generations;
3907  for (const int table_id : phys_table_ids) {
3908  const auto table_info = getTableInfo(table_id);
3909  table_generations.setGeneration(
3910  table_id,
3911  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3912  }
3913  return table_generations;
3914 }
3915 
3916 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3917  const std::unordered_set<int>& phys_table_ids) {
3918  CHECK(catalog_);
3919  row_set_mem_owner_ =
3920  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
3921  row_set_mem_owner_->setDictionaryGenerations(
3922  computeStringDictionaryGenerations(phys_inputs));
3923  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3924  table_generations_ = computeTableGenerations(phys_table_ids);
3925 }
3926 
3928  return recycler_mutex_;
3929 }
3930 
3932  return query_plan_dag_cache_;
3933 }
3934 
3936  JoinColumnSide target_side,
3937  bool extract_only_col_id) {
3938  return query_plan_dag_cache_.getJoinColumnsInfoString(
3939  join_expr, target_side, extract_only_col_id);
3940 }
3941 
3943  return executor_session_mutex_;
3944 }
3945 
3947  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3948  return current_query_session_;
3949 }
3950 
3951 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
3952  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3953  // if current_query_session is equal to the candidate_query_session,
3954  // or it is empty session we consider
3955  return !candidate_query_session.empty() &&
3956  (current_query_session_ == candidate_query_session);
3957 }
3958 
3959 // used only for testing
3961  const QuerySessionId& candidate_query_session,
3962  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3963  if (queries_session_map_.count(candidate_query_session) &&
3964  !queries_session_map_.at(candidate_query_session).empty()) {
3965  return queries_session_map_.at(candidate_query_session)
3966  .begin()
3967  ->second.getQueryStatus();
3968  }
3969  return QuerySessionStatus::QueryStatus::UNDEFINED;
3970 }
3971 
3973  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3974  current_query_session_ = "";
3975 }
3976 
3978  const QuerySessionId& query_session_id,
3979  const std::string& query_str,
3980  const std::string& query_submitted_time) {
3981  if (!query_session_id.empty()) {
3982  // if session is valid, do update 1) the exact executor id and 2) query status
3983  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3984  updateQuerySessionExecutorAssignment(
3985  query_session_id, query_submitted_time, executor_id_, write_lock);
3986  updateQuerySessionStatusWithLock(query_session_id,
3987  query_submitted_time,
3988  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3989  write_lock);
3990  }
3991  return {query_session_id, query_str};
3992 }
3993 
3995  // check whether we are okay to execute the "pending" query
3996  // i.e., before running the query check if this query session is "ALREADY" interrupted
3997  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3998  if (query_session.empty()) {
3999  return;
4000  }
4001  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
4002  // something goes wrong since we assume this is caller's responsibility
4003  // (call this function only for enrolled query session)
4004  if (!queries_session_map_.count(query_session)) {
4005  VLOG(1) << "Interrupting pending query is not available since the query session is "
4006  "not enrolled";
4007  } else {
4008  // here the query session is enrolled but the interrupt flag is not registered
4009  VLOG(1)
4010  << "Interrupting pending query is not available since its interrupt flag is "
4011  "not registered";
4012  }
4013  return;
4014  }
4015  if (queries_interrupt_flag_[query_session]) {
4017  }
4018 }
4019 
4021  const std::string& submitted_time_str) {
4022  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4023  // clear the interrupt-related info for a finished query
4024  if (query_session.empty()) {
4025  return;
4026  }
4027  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
4028  if (query_session.compare(current_query_session_) == 0) {
4029  invalidateRunningQuerySession(session_write_lock);
4030  resetInterrupt();
4031  }
4032 }
4033 
4035  const QuerySessionId& query_session,
4036  const std::string& submitted_time_str,
4037  const QuerySessionStatus::QueryStatus new_query_status) {
4038  // update the running query session's the current status
4039  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4040  if (query_session.empty()) {
4041  return;
4042  }
4043  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4044  current_query_session_ = query_session;
4045  }
4046  updateQuerySessionStatusWithLock(
4047  query_session, submitted_time_str, new_query_status, session_write_lock);
4048 }
4049 
4051  const QuerySessionId& query_session,
4052  const std::string& query_str,
4053  const std::string& submitted_time_str,
4054  const size_t executor_id,
4055  const QuerySessionStatus::QueryStatus query_session_status) {
4056  // enroll the query session into the Executor's session map
4057  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4058  if (query_session.empty()) {
4059  return;
4060  }
4061 
4062  addToQuerySessionList(query_session,
4063  query_str,
4064  submitted_time_str,
4065  executor_id,
4066  query_session_status,
4067  session_write_lock);
4068 
4069  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4070  current_query_session_ = query_session;
4071  }
4072 }
4073 
4075  const std::string& query_str,
4076  const std::string& submitted_time_str,
4077  const size_t executor_id,
4078  const QuerySessionStatus::QueryStatus query_status,
4079  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4080  // an internal API that enrolls the query session into the Executor's session map
4081  if (queries_session_map_.count(query_session)) {
4082  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4083  queries_session_map_.at(query_session).erase(submitted_time_str);
4084  queries_session_map_.at(query_session)
4085  .emplace(submitted_time_str,
4086  QuerySessionStatus(query_session,
4087  executor_id,
4088  query_str,
4089  submitted_time_str,
4090  query_status));
4091  } else {
4092  queries_session_map_.at(query_session)
4093  .emplace(submitted_time_str,
4094  QuerySessionStatus(query_session,
4095  executor_id,
4096  query_str,
4097  submitted_time_str,
4098  query_status));
4099  }
4100  } else {
4101  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4102  executor_per_query_map.emplace(
4103  submitted_time_str,
4105  query_session, executor_id, query_str, submitted_time_str, query_status));
4106  queries_session_map_.emplace(query_session, executor_per_query_map);
4107  }
4108  return queries_interrupt_flag_.emplace(query_session, false).second;
4109 }
4110 
4112  const QuerySessionId& query_session,
4113  const std::string& submitted_time_str,
4114  const QuerySessionStatus::QueryStatus updated_query_status,
4115  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4116  // an internal API that updates query session status
4117  if (query_session.empty()) {
4118  return false;
4119  }
4120  if (queries_session_map_.count(query_session)) {
4121  for (auto& query_status : queries_session_map_.at(query_session)) {
4122  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4123  // no time difference --> found the target query status
4124  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4125  auto prev_status = query_status.second.getQueryStatus();
4126  if (prev_status == updated_query_status) {
4127  return false;
4128  }
4129  query_status.second.setQueryStatus(updated_query_status);
4130  return true;
4131  }
4132  }
4133  }
4134  return false;
4135 }
4136 
4138  const QuerySessionId& query_session,
4139  const std::string& submitted_time_str,
4140  const size_t executor_id,
4141  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4142  // update the executor id of the query session
4143  if (query_session.empty()) {
4144  return false;
4145  }
4146  if (queries_session_map_.count(query_session)) {
4147  auto storage = queries_session_map_.at(query_session);
4148  for (auto it = storage.begin(); it != storage.end(); it++) {
4149  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4150  // no time difference --> found the target query status
4151  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4152  queries_session_map_.at(query_session)
4153  .at(submitted_time_str)
4154  .setExecutorId(executor_id);
4155  return true;
4156  }
4157  }
4158  }
4159  return false;
4160 }
4161 
4163  const QuerySessionId& query_session,
4164  const std::string& submitted_time_str,
4165  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4166  if (query_session.empty()) {
4167  return false;
4168  }
4169  if (queries_session_map_.count(query_session)) {
4170  auto& storage = queries_session_map_.at(query_session);
4171  if (storage.size() > 1) {
4172  // in this case we only remove query executor info
4173  for (auto it = storage.begin(); it != storage.end(); it++) {
4174  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4175  // no time difference && have the same executor id--> found the target query
4176  if (it->second.getExecutorId() == executor_id_ &&
4177  submitted_time_str.compare(target_submitted_t_str) == 0) {
4178  storage.erase(it);
4179  return true;
4180  }
4181  }
4182  } else if (storage.size() == 1) {
4183  // here this session only has a single query executor
4184  // so we clear both executor info and its interrupt flag
4185  queries_session_map_.erase(query_session);
4186  queries_interrupt_flag_.erase(query_session);
4187  if (interrupted_.load()) {
4188  interrupted_.store(false);
4189  }
4190  return true;
4191  }
4192  }
4193  return false;
4194 }
4195 
4197  const QuerySessionId& query_session,
4198  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4199  if (query_session.empty()) {
4200  return;
4201  }
4202  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4203  queries_interrupt_flag_[query_session] = true;
4204  }
4205 }
4206 
4208  const QuerySessionId& query_session,
4209  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4210  if (query_session.empty()) {
4211  return false;
4212  }
4213  auto flag_it = queries_interrupt_flag_.find(query_session);
4214  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4215  flag_it->second;
4216 }
4217 
4219  const QuerySessionId& query_session,
4220  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4221  if (query_session.empty()) {
4222  return false;
4223  }
4224  return !query_session.empty() && queries_session_map_.count(query_session);
4225 }
4226 
4228  const double runtime_query_check_freq,
4229  const unsigned pending_query_check_freq) const {
4230  // The only one scenario that we intentionally call this function is
4231  // to allow runtime query interrupt in QueryRunner for test cases.
4232  // Because test machine's default setting does not allow runtime query interrupt,
4233  // so we have to turn it on within test code if necessary.
4235  g_pending_query_interrupt_freq = pending_query_check_freq;
4236  g_running_query_interrupt_freq = runtime_query_check_freq;
4239  }
4240 }
4241 
4242 void Executor::addToCardinalityCache(const std::string& cache_key,
4243  const size_t cache_value) {
4245  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4246  cardinality_cache_[cache_key] = cache_value;
4247  VLOG(1) << "Put estimated cardinality to the cache";
4248  }
4249 }
4250 
4252  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4254  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4255  VLOG(1) << "Reuse cached cardinality";
4256  return {true, cardinality_cache_[cache_key]};
4257  }
4258  return {false, -1};
4259 }
4260 
4261 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4262  const QuerySessionId& query_session,
4263  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4264  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4265  auto& query_infos = queries_session_map_.at(query_session);
4266  std::vector<QuerySessionStatus> ret;
4267  for (auto& info : query_infos) {
4268  ret.push_back(QuerySessionStatus(query_session,
4269  info.second.getExecutorId(),
4270  info.second.getQueryStr(),
4271  info.second.getQuerySubmittedTime(),
4272  info.second.getQueryStatus()));
4273  }
4274  return ret;
4275  }
4276  return {};
4277 }
4278 
4279 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
4280  const QuerySessionId& interrupt_session) const {
4281  std::vector<size_t> res;
4282  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4283  auto it = queries_session_map_.find(interrupt_session);
4284  if (it != queries_session_map_.end()) {
4285  for (auto& kv : it->second) {
4286  if (kv.second.getQueryStatus() ==
4287  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4288  res.push_back(kv.second.getExecutorId());
4289  }
4290  }
4291  }
4292  return res;
4293 }
4294 
4296  // this function should be called within an executor which is assigned
4297  // to the specific query thread (that indicates we already enroll the session)
4298  // check whether this is called from non unitary executor
4299  if (executor_id_ == UNITARY_EXECUTOR_ID) {
4300  return false;
4301  };
4302  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4303  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
4304  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
4305  flag_it->second;
4306 }
4307 
4308 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4309 
4310 // contain the interrupt flag's status per query session
4312 // contain a list of queries per query session
4314 // session lock
4316 
4319 
4322 void* Executor::gpu_active_modules_[max_gpu_count];
4323 
4324 std::mutex Executor::compilation_mutex_;
4325 std::mutex Executor::kernel_mutex_;
4326 
4329 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4111
int get_table_id() const
Definition: Analyzer.h:193
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:218
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3946
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1608
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1119
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3857
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4227
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2111
double g_running_query_interrupt_freq
Definition: Execute.cpp:122
std::string JoinColumnsInfo
void reduce(SpeculativeTopNMap &that)
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2401
static QuerySessionMap queries_session_map_
Definition: Execute.h:1125
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1130
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3155
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3464
bool useCudaBuffers() const
Definition: RenderInfo.cpp:68
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
ExecutorDeviceType getDeviceType() const
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:79
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3972
block_size_x_(block_size_x)
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:3994
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1244
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3343
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:174
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1182
grid_size_x_(grid_size_x)
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:303
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2297
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3590
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1116
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1691
std::string toString() const
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2923
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:121
int64_t float_to_double_bin(int32_t val, bool nullable=false)
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:149
#define LOG(tag)
Definition: Logger.h:203
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
void freeLinearizedBuf()
std::vector< size_t > outer_fragment_indices
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:513
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:81
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1147
Definition: sqldefs.h:35
int64_t getGeneration(const uint32_t id) const
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:364
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:4034
JoinColumnSide
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:431
Definition: sqldefs.h:36
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1060
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3546
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3904
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:353
#define UNREACHABLE()
Definition: Logger.h:253
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:222
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:688
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Functions to support geospatial operations used by the executor.
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:357
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3951
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:2935
static const int32_t ERR_GEOS
Definition: Execute.h:1169
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1080
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:162
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1823
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4050
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1086
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:85
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1079
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2245
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2882
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:673
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:265
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3500
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:323
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1173
bool g_is_test_env
Definition: Execute.cpp:134
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3449
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:514
#define CHECK_GT(x, y)
Definition: Logger.h:221
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:273
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4261
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:1739
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2014
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:515
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2478
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:371
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4207
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3942
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:98
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:4295
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:87
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:311
constexpr double a
Definition: Utm.h:38
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:479
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:884
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3377
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:3625
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4218
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2659
This file contains the class specification and related data structures for Catalog.
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:217
int8_t warpSize() const
Definition: Execute.cpp:3432
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:82
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1033
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1920
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:284
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1155
static SysCatalog & instance()
Definition: SysCatalog.h:325
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4074
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:164
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
int getDeviceCount() const
Definition: CudaMgr.h:86
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1074
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:218
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3468
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:290
bool is_integer() const
Definition: sqltypes.h:511
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:402
#define CHECK_NE(x, y)
Definition: Logger.h:218
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4196
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:241
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:926
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1207
std::shared_timed_mutex mapd_shared_mutex
SortAlgorithm
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:2035
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:4251
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:299
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2410
ExecutorExplainType explain_type
TableIdToNodeMap table_id_to_node_map
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:421
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1808
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1161
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1152
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1123
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1162
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:315
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:3473
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:1031
bool is_dict_encoded_type() const
Definition: sqltypes.h:554
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:153
size_t g_approx_quantile_buffer
Definition: Execute.cpp:144
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1136
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:278
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1537
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:516
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1168
StringDictionaryGenerations string_dictionary_generations_
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1126
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:416
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1347
QueryDescriptionType getQueryDescriptionType() const
void freeTemporaryCpuLinearizedIdxBuf()
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:295
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
executor_id_(executor_id)
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4137
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1229
InputSourceType getSourceType() const
const std::vector< size_t > getExecutorIdsRunningQuery(const QuerySessionId &interrupt_session) const
Definition: Execute.cpp:4279
#define CHECK_LT(x, y)
Definition: Logger.h:219
Definition: sqltypes.h:52
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:898
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:261
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:89
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Definition: Execute.cpp:3398
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2837
#define CHECK_LE(x, y)
Definition: Logger.h:220
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:411
mapd_shared_mutex & getDataRecyclerLock()
Definition: Execute.cpp:3927
Speculative top N algorithm.
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
void setGeneration(const uint32_t id, const TableGeneration &generation)
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3658
bool g_enable_experimental_string_functions
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3440
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2429
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:31
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1964
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3883
static const int32_t ERR_WIDTH_BUCKET_INVALID_ARGUMENT
Definition: Execute.h:1170
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:2497
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:491
bool g_cache_string_hash
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3358
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3148
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
QuerySessionStatus::QueryStatus getQuerySessionStatus(const QuerySessionId &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3960
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3824
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2851
data_mgr_(data_mgr)
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1878
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:573
ThreadId thread_id()
Definition: Logger.cpp:791
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1128
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:120
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4162
static std::vector< InnerOuter > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:744
std::string QuerySessionId
Definition: Execute.h:80
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:973
RegisteredQueryHint query_hint
void set_notnull(bool n)
Definition: sqltypes.h:426
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:4242
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:63
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:209
QueryPlanDagCache & getQueryPlanDagCache()
Definition: Execute.cpp:3931
#define DEBUG_TIMER(name)
Definition: Logger.h:352
char * t
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
bool gpusPresent() const
Definition: DataMgr.h:213
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_enable_cpu_sub_tasks
Definition: Execute.cpp:78
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:1172
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
Definition: sqldefs.h:33
size_t g_approx_quantile_centroids
Definition: Execute.cpp:145
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:441
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2520
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1437
JoinColumnsInfo getJoinColumnsInfo(const Analyzer::Expr *join_expr, JoinColumnSide target_side, bool extract_only_col_id)
Definition: Execute.cpp:3935
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2085
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1787
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:958
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:319
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
Definition: Execute.cpp:3977
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:105
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
int get_column_id() const
Definition: Analyzer.h:194
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:509
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
unsigned blockSize() const
Definition: Execute.cpp:3454
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
constexpr double n
Definition: Utm.h:46
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str)
Definition: Execute.cpp:4020
bool is_decimal() const
Definition: sqltypes.h:512
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:790
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3534
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:681
temporary_tables_(nullptr)
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:226
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1370
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2100
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
int getNestLevel() const
HashType
Definition: HashTable.h:19
const InputDescriptor & getScanDesc() const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:251
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:118
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1151
bool is_array() const
Definition: sqltypes.h:517
#define VLOG(n)
Definition: Logger.h:303
Type timer_start()
Definition: measure.h:42
static QueryPlanDagCache query_plan_dag_cache_
Definition: Execute.h:1149
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1078
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3916
void clearMetaInfoCache()
Definition: Execute.cpp:405
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:189
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
HashTableBuildDagMap hash_table_build_plan_dag
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2339
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1735
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)