OmniSciDB  2e3a973ef4
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "CodeGenerator.h"
21 #include "ColumnFetcher.h"
24 #include "DynamicWatchdog.h"
25 #include "EquiJoinCondition.h"
26 #include "ErrorHandling.h"
27 #include "ExpressionRewrite.h"
29 #include "GpuMemUtils.h"
30 #include "InPlaceSort.h"
33 #include "JsonAccessors.h"
36 #include "QueryRewrite.h"
37 #include "QueryTemplateGenerator.h"
38 #include "ResultSetReductionJIT.h"
39 #include "RuntimeFunctions.h"
40 #include "SpeculativeTopN.h"
41 
44 
45 #include "CudaMgr/CudaMgr.h"
47 #include "Parser/ParserNode.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/misc.h"
53 #include "Shared/scope.h"
54 #include "Shared/shard_key.h"
56 #include "Shared/threadpool.h"
57 
58 #include "AggregatedColRange.h"
60 
61 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
62 #include <boost/filesystem/operations.hpp>
63 #include <boost/filesystem/path.hpp>
64 
65 #ifdef HAVE_CUDA
66 #include <cuda.h>
67 #endif // HAVE_CUDA
68 #include <future>
69 #include <memory>
70 #include <numeric>
71 #include <set>
72 #include <thread>
73 
74 bool g_enable_watchdog{false};
76 bool g_use_tbb_pool{false};
79 bool g_allow_cpu_retry{true};
80 bool g_null_div_by_zero{false};
84 extern bool g_enable_smem_group_by;
85 extern std::unique_ptr<llvm::Module> udf_gpu_module;
86 extern std::unique_ptr<llvm::Module> udf_cpu_module;
94 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
97 size_t g_big_group_threshold{20000};
100 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
102  256}; // minimum memory allocation required for projection query output buffer
103  // without pre-flight count
112  4096}; // GPU shared memory threshold (in bytes), if larger
113  // buffer sizes are required we do not use GPU shared
114  // memory optimizations Setting this to 0 means unlimited
115  // (subject to other dynamically calculated caps)
117  true}; // enable use of shared memory when performing group-by with select non-count
118  // aggregates
120  true}; // enable optimizations for using GPU shared memory in implementation of
121  // non-grouped aggregates
122 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
123  // limits the allocation for the output buffer arena
124 
125 extern bool g_cache_string_hash;
126 
127 int const Executor::max_gpu_count;
128 
130 
131 Executor::Executor(const ExecutorId executor_id,
132  const size_t block_size_x,
133  const size_t grid_size_x,
134  const size_t max_gpu_slab_size,
135  const std::string& debug_dir,
136  const std::string& debug_file)
137  : cgen_state_(new CgenState({}, false))
140  , block_size_x_(block_size_x)
141  , grid_size_x_(grid_size_x)
142  , max_gpu_slab_size_(max_gpu_slab_size)
143  , debug_dir_(debug_dir)
144  , debug_file_(debug_file)
145  , executor_id_(executor_id)
146  , catalog_(nullptr)
147  , temporary_tables_(nullptr)
148  , input_table_info_cache_(this) {}
149 
150 std::shared_ptr<Executor> Executor::getExecutor(
151  const ExecutorId executor_id,
152  const std::string& debug_dir,
153  const std::string& debug_file,
154  const SystemParameters system_parameters) {
156 
157  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
158  auto it = executors_.find(executor_id);
159  if (it != executors_.end()) {
160  return it->second;
161  }
162  auto executor = std::make_shared<Executor>(executor_id,
163  system_parameters.cuda_block_size,
164  system_parameters.cuda_grid_size,
165  system_parameters.max_gpu_slab_size,
166  debug_dir,
167  debug_file);
168  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
169  return executor;
170 }
171 
173  switch (memory_level) {
176  mapd_unique_lock<mapd_shared_mutex> flush_lock(
177  execute_mutex_); // Don't flush memory while queries are running
178 
180  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
181  // The hash table cache uses CPU memory not managed by the buffer manager. In the
182  // future, we should manage these allocations with the buffer manager directly.
183  // For now, assume the user wants to purge the hash table cache when they clear
184  // CPU memory (currently used in ExecuteTest to lower memory pressure)
186  }
187  break;
188  }
189  default: {
190  throw std::runtime_error(
191  "Clearing memory levels other than the CPU level or GPU level is not "
192  "supported.");
193  }
194  }
195 }
196 
198  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
199 }
200 
202  const int dict_id_in,
203  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
204  const bool with_generation) const {
205  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
206  CHECK(catalog_);
207  const auto dd = catalog_->getMetadataForDict(dict_id);
208  std::lock_guard<std::mutex> lock(str_dict_mutex_);
209  if (dd) {
210  CHECK(dd->stringDict);
211  CHECK_LE(dd->dictNBits, 32);
212  CHECK(row_set_mem_owner);
213  const int64_t generation =
214  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
215  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
216  }
217  CHECK_EQ(0, dict_id);
218  if (!lit_str_dict_proxy_) {
219  std::shared_ptr<StringDictionary> tsd =
220  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
221  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
222  }
223  return lit_str_dict_proxy_.get();
224 }
225 
226 bool Executor::isCPUOnly() const {
227  CHECK(catalog_);
228  return !catalog_->getDataMgr().getCudaMgr();
229 }
230 
232  const Analyzer::ColumnVar* col_var) const {
234  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
235 }
236 
238  const Analyzer::ColumnVar* col_var,
239  int n) const {
240  const auto cd = getColumnDescriptor(col_var);
241  if (!cd || n > cd->columnType.get_physical_cols()) {
242  return nullptr;
243  }
245  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
246 }
247 
249  return catalog_;
250 }
251 
253  catalog_ = catalog;
254 }
255 
256 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
257  return row_set_mem_owner_;
258 }
259 
261  return temporary_tables_;
262 }
263 
265  return input_table_info_cache_.getTableInfo(table_id);
266 }
267 
268 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
269  return table_generations_.getGeneration(table_id);
270 }
271 
273  return agg_col_range_cache_.getColRange(phys_input);
274 }
275 
276 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
277  size_t num_bytes = 0;
278  if (!plan_state_) {
279  return 0;
280  }
281  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
282  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
283  continue;
284  }
285 
286  if (fetched_col_pair.first < 0) {
287  num_bytes += 8;
288  } else {
289  const auto cd =
290  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
291  const auto& ti = cd->columnType;
292  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
293  ? 4
294  : ti.get_size();
295  if (sz < 0) {
296  // for varlen types, only account for the pointer/size for each row, for now
297  num_bytes += 16;
298  } else {
299  num_bytes += sz;
300  }
301  }
302  }
303  return num_bytes;
304 }
305 
306 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
307  const std::vector<Analyzer::Expr*>& target_exprs) const {
309  CHECK(catalog_);
310  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
311  for (const auto target_expr : target_exprs) {
312  if (!plan_state_->isLazyFetchColumn(target_expr)) {
313  col_lazy_fetch_info.emplace_back(
314  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
315  } else {
316  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
317  CHECK(col_var);
318  auto col_id = col_var->get_column_id();
319  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
320  auto cd = (col_var->get_table_id() > 0)
321  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
322  : nullptr;
323  if (cd && IS_GEO(cd->columnType.get_type())) {
324  // Geo coords cols will be processed in sequence. So we only need to track the
325  // first coords col in lazy fetch info.
326  {
327  auto cd0 =
328  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
329  auto col0_ti = cd0->columnType;
330  CHECK(!cd0->isVirtualCol);
331  auto col0_var = makeExpr<Analyzer::ColumnVar>(
332  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
333  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
334  col_lazy_fetch_info.emplace_back(
335  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
336  }
337  } else {
338  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
339  const auto& col_ti = col_var->get_type_info();
340  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
341  }
342  }
343  }
344  return col_lazy_fetch_info;
345 }
346 
352 }
353 
354 std::vector<int8_t> Executor::serializeLiterals(
355  const std::unordered_map<int, CgenState::LiteralValues>& literals,
356  const int device_id) {
357  if (literals.empty()) {
358  return {};
359  }
360  const auto dev_literals_it = literals.find(device_id);
361  CHECK(dev_literals_it != literals.end());
362  const auto& dev_literals = dev_literals_it->second;
363  size_t lit_buf_size{0};
364  std::vector<std::string> real_strings;
365  std::vector<std::vector<double>> double_array_literals;
366  std::vector<std::vector<int8_t>> align64_int8_array_literals;
367  std::vector<std::vector<int32_t>> int32_array_literals;
368  std::vector<std::vector<int8_t>> align32_int8_array_literals;
369  std::vector<std::vector<int8_t>> int8_array_literals;
370  for (const auto& lit : dev_literals) {
371  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
372  if (lit.which() == 7) {
373  const auto p = boost::get<std::string>(&lit);
374  CHECK(p);
375  real_strings.push_back(*p);
376  } else if (lit.which() == 8) {
377  const auto p = boost::get<std::vector<double>>(&lit);
378  CHECK(p);
379  double_array_literals.push_back(*p);
380  } else if (lit.which() == 9) {
381  const auto p = boost::get<std::vector<int32_t>>(&lit);
382  CHECK(p);
383  int32_array_literals.push_back(*p);
384  } else if (lit.which() == 10) {
385  const auto p = boost::get<std::vector<int8_t>>(&lit);
386  CHECK(p);
387  int8_array_literals.push_back(*p);
388  } else if (lit.which() == 11) {
389  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
390  CHECK(p);
391  if (p->second == 64) {
392  align64_int8_array_literals.push_back(p->first);
393  } else if (p->second == 32) {
394  align32_int8_array_literals.push_back(p->first);
395  } else {
396  CHECK(false);
397  }
398  }
399  }
400  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
401  throw TooManyLiterals();
402  }
403  int16_t crt_real_str_off = lit_buf_size;
404  for (const auto& real_str : real_strings) {
405  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
406  lit_buf_size += real_str.size();
407  }
408  if (double_array_literals.size() > 0) {
409  lit_buf_size = align(lit_buf_size, sizeof(double));
410  }
411  int16_t crt_double_arr_lit_off = lit_buf_size;
412  for (const auto& double_array_literal : double_array_literals) {
413  CHECK_LE(double_array_literal.size(),
414  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
415  lit_buf_size += double_array_literal.size() * sizeof(double);
416  }
417  if (align64_int8_array_literals.size() > 0) {
418  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
419  }
420  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
421  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
422  CHECK_LE(align64_int8_array_literals.size(),
423  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
424  lit_buf_size += align64_int8_array_literal.size();
425  }
426  if (int32_array_literals.size() > 0) {
427  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
428  }
429  int16_t crt_int32_arr_lit_off = lit_buf_size;
430  for (const auto& int32_array_literal : int32_array_literals) {
431  CHECK_LE(int32_array_literal.size(),
432  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
433  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
434  }
435  if (align32_int8_array_literals.size() > 0) {
436  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
437  }
438  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
439  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
440  CHECK_LE(align32_int8_array_literals.size(),
441  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
442  lit_buf_size += align32_int8_array_literal.size();
443  }
444  int16_t crt_int8_arr_lit_off = lit_buf_size;
445  for (const auto& int8_array_literal : int8_array_literals) {
446  CHECK_LE(int8_array_literal.size(),
447  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
448  lit_buf_size += int8_array_literal.size();
449  }
450  unsigned crt_real_str_idx = 0;
451  unsigned crt_double_arr_lit_idx = 0;
452  unsigned crt_align64_int8_arr_lit_idx = 0;
453  unsigned crt_int32_arr_lit_idx = 0;
454  unsigned crt_align32_int8_arr_lit_idx = 0;
455  unsigned crt_int8_arr_lit_idx = 0;
456  std::vector<int8_t> serialized(lit_buf_size);
457  size_t off{0};
458  for (const auto& lit : dev_literals) {
459  const auto lit_bytes = CgenState::literalBytes(lit);
460  off = CgenState::addAligned(off, lit_bytes);
461  switch (lit.which()) {
462  case 0: {
463  const auto p = boost::get<int8_t>(&lit);
464  CHECK(p);
465  serialized[off - lit_bytes] = *p;
466  break;
467  }
468  case 1: {
469  const auto p = boost::get<int16_t>(&lit);
470  CHECK(p);
471  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
472  break;
473  }
474  case 2: {
475  const auto p = boost::get<int32_t>(&lit);
476  CHECK(p);
477  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
478  break;
479  }
480  case 3: {
481  const auto p = boost::get<int64_t>(&lit);
482  CHECK(p);
483  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
484  break;
485  }
486  case 4: {
487  const auto p = boost::get<float>(&lit);
488  CHECK(p);
489  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
490  break;
491  }
492  case 5: {
493  const auto p = boost::get<double>(&lit);
494  CHECK(p);
495  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
496  break;
497  }
498  case 6: {
499  const auto p = boost::get<std::pair<std::string, int>>(&lit);
500  CHECK(p);
501  const auto str_id =
503  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
504  ->getOrAddTransient(p->first)
505  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
506  ->getIdOfString(p->first);
507  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
508  break;
509  }
510  case 7: {
511  const auto p = boost::get<std::string>(&lit);
512  CHECK(p);
513  int32_t off_and_len = crt_real_str_off << 16;
514  const auto& crt_real_str = real_strings[crt_real_str_idx];
515  off_and_len |= static_cast<int16_t>(crt_real_str.size());
516  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
517  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
518  ++crt_real_str_idx;
519  crt_real_str_off += crt_real_str.size();
520  break;
521  }
522  case 8: {
523  const auto p = boost::get<std::vector<double>>(&lit);
524  CHECK(p);
525  int32_t off_and_len = crt_double_arr_lit_off << 16;
526  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
527  int32_t len = crt_double_arr_lit.size();
528  CHECK_EQ((len >> 16), 0);
529  off_and_len |= static_cast<int16_t>(len);
530  int32_t double_array_bytesize = len * sizeof(double);
531  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
532  memcpy(&serialized[crt_double_arr_lit_off],
533  crt_double_arr_lit.data(),
534  double_array_bytesize);
535  ++crt_double_arr_lit_idx;
536  crt_double_arr_lit_off += double_array_bytesize;
537  break;
538  }
539  case 9: {
540  const auto p = boost::get<std::vector<int32_t>>(&lit);
541  CHECK(p);
542  int32_t off_and_len = crt_int32_arr_lit_off << 16;
543  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
544  int32_t len = crt_int32_arr_lit.size();
545  CHECK_EQ((len >> 16), 0);
546  off_and_len |= static_cast<int16_t>(len);
547  int32_t int32_array_bytesize = len * sizeof(int32_t);
548  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
549  memcpy(&serialized[crt_int32_arr_lit_off],
550  crt_int32_arr_lit.data(),
551  int32_array_bytesize);
552  ++crt_int32_arr_lit_idx;
553  crt_int32_arr_lit_off += int32_array_bytesize;
554  break;
555  }
556  case 10: {
557  const auto p = boost::get<std::vector<int8_t>>(&lit);
558  CHECK(p);
559  int32_t off_and_len = crt_int8_arr_lit_off << 16;
560  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
561  int32_t len = crt_int8_arr_lit.size();
562  CHECK_EQ((len >> 16), 0);
563  off_and_len |= static_cast<int16_t>(len);
564  int32_t int8_array_bytesize = len;
565  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
566  memcpy(&serialized[crt_int8_arr_lit_off],
567  crt_int8_arr_lit.data(),
568  int8_array_bytesize);
569  ++crt_int8_arr_lit_idx;
570  crt_int8_arr_lit_off += int8_array_bytesize;
571  break;
572  }
573  case 11: {
574  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
575  CHECK(p);
576  if (p->second == 64) {
577  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
578  const auto& crt_align64_int8_arr_lit =
579  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
580  int32_t len = crt_align64_int8_arr_lit.size();
581  CHECK_EQ((len >> 16), 0);
582  off_and_len |= static_cast<int16_t>(len);
583  int32_t align64_int8_array_bytesize = len;
584  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
585  memcpy(&serialized[crt_align64_int8_arr_lit_off],
586  crt_align64_int8_arr_lit.data(),
587  align64_int8_array_bytesize);
588  ++crt_align64_int8_arr_lit_idx;
589  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
590  } else if (p->second == 32) {
591  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
592  const auto& crt_align32_int8_arr_lit =
593  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
594  int32_t len = crt_align32_int8_arr_lit.size();
595  CHECK_EQ((len >> 16), 0);
596  off_and_len |= static_cast<int16_t>(len);
597  int32_t align32_int8_array_bytesize = len;
598  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
599  memcpy(&serialized[crt_align32_int8_arr_lit_off],
600  crt_align32_int8_arr_lit.data(),
601  align32_int8_array_bytesize);
602  ++crt_align32_int8_arr_lit_idx;
603  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
604  } else {
605  CHECK(false);
606  }
607  break;
608  }
609  default:
610  CHECK(false);
611  }
612  }
613  return serialized;
614 }
615 
616 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
617  if (device_type == ExecutorDeviceType::GPU) {
618  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
619  CHECK(cuda_mgr);
620  return cuda_mgr->getDeviceCount();
621  } else {
622  return 1;
623  }
624 }
625 
627  const Data_Namespace::MemoryLevel memory_level) const {
628  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
630 }
631 
632 // TODO(alex): remove or split
633 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
634  const SQLTypeInfo& ti,
635  const int64_t agg_init_val,
636  const int8_t out_byte_width,
637  const int64_t* out_vec,
638  const size_t out_vec_sz,
639  const bool is_group_by,
640  const bool float_argument_input) {
641  switch (agg) {
642  case kAVG:
643  case kSUM:
644  if (0 != agg_init_val) {
645  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
646  int64_t agg_result = agg_init_val;
647  for (size_t i = 0; i < out_vec_sz; ++i) {
648  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
649  }
650  return {agg_result, 0};
651  } else {
652  CHECK(ti.is_fp());
653  switch (out_byte_width) {
654  case 4: {
655  int agg_result = static_cast<int32_t>(agg_init_val);
656  for (size_t i = 0; i < out_vec_sz; ++i) {
658  &agg_result,
659  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
660  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
661  }
662  const int64_t converted_bin =
663  float_argument_input
664  ? static_cast<int64_t>(agg_result)
665  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
666  return {converted_bin, 0};
667  break;
668  }
669  case 8: {
670  int64_t agg_result = agg_init_val;
671  for (size_t i = 0; i < out_vec_sz; ++i) {
673  &agg_result,
674  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
675  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
676  }
677  return {agg_result, 0};
678  break;
679  }
680  default:
681  CHECK(false);
682  }
683  }
684  }
685  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
686  int64_t agg_result = 0;
687  for (size_t i = 0; i < out_vec_sz; ++i) {
688  agg_result += out_vec[i];
689  }
690  return {agg_result, 0};
691  } else {
692  CHECK(ti.is_fp());
693  switch (out_byte_width) {
694  case 4: {
695  float r = 0.;
696  for (size_t i = 0; i < out_vec_sz; ++i) {
697  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
698  }
699  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
700  const int64_t converted_bin =
701  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
702  return {converted_bin, 0};
703  }
704  case 8: {
705  double r = 0.;
706  for (size_t i = 0; i < out_vec_sz; ++i) {
707  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
708  }
709  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
710  }
711  default:
712  CHECK(false);
713  }
714  }
715  break;
716  case kCOUNT: {
717  uint64_t agg_result = 0;
718  for (size_t i = 0; i < out_vec_sz; ++i) {
719  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
720  agg_result += out;
721  }
722  return {static_cast<int64_t>(agg_result), 0};
723  }
724  case kMIN: {
725  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
726  int64_t agg_result = agg_init_val;
727  for (size_t i = 0; i < out_vec_sz; ++i) {
728  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
729  }
730  return {agg_result, 0};
731  } else {
732  switch (out_byte_width) {
733  case 4: {
734  int32_t agg_result = static_cast<int32_t>(agg_init_val);
735  for (size_t i = 0; i < out_vec_sz; ++i) {
737  &agg_result,
738  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
739  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
740  }
741  const int64_t converted_bin =
742  float_argument_input
743  ? static_cast<int64_t>(agg_result)
744  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
745  return {converted_bin, 0};
746  }
747  case 8: {
748  int64_t agg_result = agg_init_val;
749  for (size_t i = 0; i < out_vec_sz; ++i) {
751  &agg_result,
752  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
753  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
754  }
755  return {agg_result, 0};
756  }
757  default:
758  CHECK(false);
759  }
760  }
761  }
762  case kMAX:
763  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
764  int64_t agg_result = agg_init_val;
765  for (size_t i = 0; i < out_vec_sz; ++i) {
766  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
767  }
768  return {agg_result, 0};
769  } else {
770  switch (out_byte_width) {
771  case 4: {
772  int32_t agg_result = static_cast<int32_t>(agg_init_val);
773  for (size_t i = 0; i < out_vec_sz; ++i) {
775  &agg_result,
776  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
777  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
778  }
779  const int64_t converted_bin =
780  float_argument_input ? static_cast<int64_t>(agg_result)
781  : float_to_double_bin(agg_result, !ti.get_notnull());
782  return {converted_bin, 0};
783  }
784  case 8: {
785  int64_t agg_result = agg_init_val;
786  for (size_t i = 0; i < out_vec_sz; ++i) {
788  &agg_result,
789  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
790  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
791  }
792  return {agg_result, 0};
793  }
794  default:
795  CHECK(false);
796  }
797  }
798  case kSINGLE_VALUE: {
799  int64_t agg_result = agg_init_val;
800  for (size_t i = 0; i < out_vec_sz; ++i) {
801  if (out_vec[i] != agg_init_val) {
802  if (agg_result == agg_init_val) {
803  agg_result = out_vec[i];
804  } else if (out_vec[i] != agg_result) {
806  }
807  }
808  }
809  return {agg_result, 0};
810  }
811  case kSAMPLE: {
812  int64_t agg_result = agg_init_val;
813  for (size_t i = 0; i < out_vec_sz; ++i) {
814  if (out_vec[i] != agg_init_val) {
815  agg_result = out_vec[i];
816  break;
817  }
818  }
819  return {agg_result, 0};
820  }
821  default:
822  CHECK(false);
823  }
824  abort();
825 }
826 
827 namespace {
828 
830  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
831  auto& first = results_per_device.front().first;
832  CHECK(first);
833  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
834  const auto& next = results_per_device[dev_idx].first;
835  CHECK(next);
836  first->append(*next);
837  }
838  return std::move(first);
839 }
840 
841 } // namespace
842 
844  const RelAlgExecutionUnit& ra_exe_unit) {
845  auto& results_per_device = shared_context.getFragmentResults();
846  if (results_per_device.empty()) {
847  std::vector<TargetInfo> targets;
848  for (const auto target_expr : ra_exe_unit.target_exprs) {
849  targets.push_back(get_target_info(target_expr, g_bigint_count));
850  }
851  return std::make_shared<ResultSet>(targets,
855  this);
856  }
857  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
858  std::sort(results_per_device.begin(),
859  results_per_device.end(),
860  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
861  CHECK_GE(lhs.second.size(), size_t(1));
862  CHECK_GE(rhs.second.size(), size_t(1));
863  return lhs.second.front() < rhs.second.front();
864  });
865 
866  return get_merged_result(results_per_device);
867 }
868 
870  const RelAlgExecutionUnit& ra_exe_unit,
871  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
872  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
873  const QueryMemoryDescriptor& query_mem_desc) const {
874  auto timer = DEBUG_TIMER(__func__);
875  if (ra_exe_unit.estimator) {
876  return reduce_estimator_results(ra_exe_unit, results_per_device);
877  }
878 
879  if (results_per_device.empty()) {
880  std::vector<TargetInfo> targets;
881  for (const auto target_expr : ra_exe_unit.target_exprs) {
882  targets.push_back(get_target_info(target_expr, g_bigint_count));
883  }
884  return std::make_shared<ResultSet>(
885  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
886  }
887 
889  results_per_device,
890  row_set_mem_owner,
891  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
892 }
893 
894 namespace {
895 
897  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
898  int64_t* compilation_queue_time) {
899  auto clock_begin = timer_start();
900  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
901  *compilation_queue_time = timer_stop(clock_begin);
902  const auto& this_result_set = results_per_device[0].first;
903  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
904  this_result_set->getTargetInfos(),
905  this_result_set->getTargetInitVals());
906  return reduction_jit.codegen();
907 };
908 
909 } // namespace
910 
912  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
913  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
914  const QueryMemoryDescriptor& query_mem_desc) const {
915  auto timer = DEBUG_TIMER(__func__);
916  std::shared_ptr<ResultSet> reduced_results;
917 
918  const auto& first = results_per_device.front().first;
919 
920  if (query_mem_desc.getQueryDescriptionType() ==
922  results_per_device.size() > 1) {
923  const auto total_entry_count = std::accumulate(
924  results_per_device.begin(),
925  results_per_device.end(),
926  size_t(0),
927  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
928  const auto& r = rs.first;
929  return init + r->getQueryMemDesc().getEntryCount();
930  });
931  CHECK(total_entry_count);
932  auto query_mem_desc = first->getQueryMemDesc();
933  query_mem_desc.setEntryCount(total_entry_count);
934  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
936  query_mem_desc,
937  row_set_mem_owner,
938  this);
939  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
940  reduced_results->initializeStorage();
941  switch (query_mem_desc.getEffectiveKeyWidth()) {
942  case 4:
943  first->getStorage()->moveEntriesToBuffer<int32_t>(
944  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
945  break;
946  case 8:
947  first->getStorage()->moveEntriesToBuffer<int64_t>(
948  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
949  break;
950  default:
951  CHECK(false);
952  }
953  } else {
954  reduced_results = first;
955  }
956 
957  int64_t compilation_queue_time = 0;
958  const auto reduction_code =
959  get_reduction_code(results_per_device, &compilation_queue_time);
960 
961  for (size_t i = 1; i < results_per_device.size(); ++i) {
962  reduced_results->getStorage()->reduce(
963  *(results_per_device[i].first->getStorage()), {}, reduction_code);
964  }
965  reduced_results->addCompilationQueueTime(compilation_queue_time);
966  return reduced_results;
967 }
968 
970  const RelAlgExecutionUnit& ra_exe_unit,
971  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
972  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
973  const QueryMemoryDescriptor& query_mem_desc) const {
974  if (results_per_device.size() == 1) {
975  return std::move(results_per_device.front().first);
976  }
977  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
979  for (const auto& result : results_per_device) {
980  auto rows = result.first;
981  CHECK(rows);
982  if (!rows) {
983  continue;
984  }
985  SpeculativeTopNMap that(
986  *rows,
987  ra_exe_unit.target_exprs,
988  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
989  m.reduce(that);
990  }
991  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
992  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
993  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
994 }
995 
996 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
997  std::unordered_set<int> available_gpus;
998  if (cat.getDataMgr().gpusPresent()) {
999  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
1000  CHECK_GT(gpu_count, 0);
1001  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1002  available_gpus.insert(gpu_id);
1003  }
1004  }
1005  return available_gpus;
1006 }
1007 
1008 size_t get_context_count(const ExecutorDeviceType device_type,
1009  const size_t cpu_count,
1010  const size_t gpu_count) {
1011  return device_type == ExecutorDeviceType::GPU ? gpu_count
1012  : static_cast<size_t>(cpu_count);
1013 }
1014 
1015 namespace {
1016 
1017 // Compute a very conservative entry count for the output buffer entry count using no
1018 // other information than the number of tuples in each table and multiplying them
1019 // together.
1020 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1022  // Check for overflows since we're multiplying potentially big table sizes.
1023  using checked_size_t = boost::multiprecision::number<
1024  boost::multiprecision::cpp_int_backend<64,
1025  64,
1026  boost::multiprecision::unsigned_magnitude,
1027  boost::multiprecision::checked,
1028  void>>;
1029  checked_size_t max_groups_buffer_entry_guess = 1;
1030  for (const auto& query_info : query_infos) {
1031  CHECK(!query_info.info.fragments.empty());
1032  auto it = std::max_element(query_info.info.fragments.begin(),
1033  query_info.info.fragments.end(),
1034  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1035  return f1.getNumTuples() < f2.getNumTuples();
1036  });
1037  max_groups_buffer_entry_guess *= it->getNumTuples();
1038  }
1039  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1040  // baseline group layout with that many entries anyway.
1041  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1042  try {
1043  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1044  max_groups_buffer_entry_guess_cap);
1045  } catch (...) {
1046  return max_groups_buffer_entry_guess_cap;
1047  }
1048 }
1049 
1050 std::string get_table_name(const InputDescriptor& input_desc,
1052  const auto source_type = input_desc.getSourceType();
1053  if (source_type == InputSourceType::TABLE) {
1054  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1055  CHECK(td);
1056  return td->tableName;
1057  } else {
1058  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1059  }
1060 }
1061 
1062 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1063  const int device_count) {
1064  if (device_type == ExecutorDeviceType::GPU) {
1065  return device_count * Executor::high_scan_limit;
1066  }
1068 }
1069 
1071  const std::vector<InputTableInfo>& table_infos,
1073  const ExecutorDeviceType device_type,
1074  const int device_count) {
1075  for (const auto target_expr : ra_exe_unit.target_exprs) {
1076  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1077  return;
1078  }
1079  }
1080  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1081  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1082  // Allow a query with no scan limit to run on small tables
1083  return;
1084  }
1085  if (ra_exe_unit.use_bump_allocator) {
1086  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1087  // relative to the size of the input), so we bypass this check for now
1088  return;
1089  }
1090  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1091  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1092  (!ra_exe_unit.scan_limit ||
1093  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1094  std::vector<std::string> table_names;
1095  const auto& input_descs = ra_exe_unit.input_descs;
1096  for (const auto& input_desc : input_descs) {
1097  table_names.push_back(get_table_name(input_desc, cat));
1098  }
1099  if (!ra_exe_unit.scan_limit) {
1100  throw WatchdogException(
1101  "Projection query would require a scan without a limit on table(s): " +
1102  boost::algorithm::join(table_names, ", "));
1103  } else {
1104  throw WatchdogException(
1105  "Projection query output result set on table(s): " +
1106  boost::algorithm::join(table_names, ", ") + " would contain " +
1107  std::to_string(ra_exe_unit.scan_limit) +
1108  " rows, which is more than the current system limit of " +
1109  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1110  }
1111  }
1112 }
1113 
1114 } // namespace
1115 
1116 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1117  const RelAlgExecutionUnit& ra_exe_unit) {
1118  if (ra_exe_unit.input_descs.size() < 2) {
1119  return false;
1120  }
1121 
1122  // We only support loop join at the end of folded joins
1123  // where ra_exe_unit.input_descs.size() > 2 for now.
1124  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1125 
1126  std::optional<size_t> inner_table_idx;
1127  for (size_t i = 0; i < query_infos.size(); ++i) {
1128  if (query_infos[i].table_id == inner_table_id) {
1129  inner_table_idx = i;
1130  break;
1131  }
1132  }
1133  CHECK(inner_table_idx);
1134  return query_infos[*inner_table_idx].info.getNumTuples() <=
1136 }
1137 
1138 namespace {
1139 
1140 template <typename T>
1141 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1142  std::vector<std::string> expr_strs;
1143  for (const auto& expr : expr_container) {
1144  if (!expr) {
1145  expr_strs.emplace_back("NULL");
1146  } else {
1147  expr_strs.emplace_back(expr->toString());
1148  }
1149  }
1150  return expr_strs;
1151 }
1152 
1153 template <>
1154 std::vector<std::string> expr_container_to_string(
1155  const std::list<Analyzer::OrderEntry>& expr_container) {
1156  std::vector<std::string> expr_strs;
1157  for (const auto& expr : expr_container) {
1158  expr_strs.emplace_back(expr.toString());
1159  }
1160  return expr_strs;
1161 }
1162 
1163 std::string join_type_to_string(const JoinType type) {
1164  switch (type) {
1165  case JoinType::INNER:
1166  return "INNER";
1167  case JoinType::LEFT:
1168  return "LEFT";
1169  case JoinType::INVALID:
1170  return "INVALID";
1171  }
1172  UNREACHABLE();
1173  return "";
1174 }
1175 
1176 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1177  switch (algorithm) {
1179  return "ResultSet";
1181  return "Speculative Top N";
1183  return "Streaming Top N";
1184  }
1185  UNREACHABLE();
1186  return "";
1187 }
1188 
1189 } // namespace
1190 
1191 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1192  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1193  // instead of ra_exec_unit description if possible
1194  std::ostringstream os;
1195  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1196  const auto& scan_desc = input_col_desc->getScanDesc();
1197  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1198  << scan_desc.getNestLevel();
1199  }
1200  if (!ra_exe_unit.simple_quals.empty()) {
1201  for (const auto& qual : ra_exe_unit.simple_quals) {
1202  if (qual) {
1203  os << qual->toString() << ",";
1204  }
1205  }
1206  }
1207  if (!ra_exe_unit.quals.empty()) {
1208  for (const auto& qual : ra_exe_unit.quals) {
1209  if (qual) {
1210  os << qual->toString() << ",";
1211  }
1212  }
1213  }
1214  if (!ra_exe_unit.join_quals.empty()) {
1215  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1216  const auto& join_condition = ra_exe_unit.join_quals[i];
1217  os << std::to_string(i) << join_type_to_string(join_condition.type);
1218  for (const auto& qual : join_condition.quals) {
1219  if (qual) {
1220  os << qual->toString() << ",";
1221  }
1222  }
1223  }
1224  }
1225  if (!ra_exe_unit.groupby_exprs.empty()) {
1226  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1227  if (qual) {
1228  os << qual->toString() << ",";
1229  }
1230  }
1231  }
1232  for (const auto& expr : ra_exe_unit.target_exprs) {
1233  if (expr) {
1234  os << expr->toString() << ",";
1235  }
1236  }
1237  os << bool_to_string(ra_exe_unit.estimator == nullptr);
1238  os << std::to_string(ra_exe_unit.scan_limit);
1239  return os.str();
1240 }
1241 
1242 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1243  os << "\n\tTable/Col/Levels: ";
1244  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1245  const auto& scan_desc = input_col_desc->getScanDesc();
1246  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1247  << scan_desc.getNestLevel() << ") ";
1248  }
1249  if (!ra_exe_unit.simple_quals.empty()) {
1250  os << "\n\tSimple Quals: "
1252  ", ");
1253  }
1254  if (!ra_exe_unit.quals.empty()) {
1255  os << "\n\tQuals: "
1256  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1257  }
1258  if (!ra_exe_unit.join_quals.empty()) {
1259  os << "\n\tJoin Quals: ";
1260  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1261  const auto& join_condition = ra_exe_unit.join_quals[i];
1262  os << "\t\t" << std::to_string(i) << " "
1263  << join_type_to_string(join_condition.type);
1264  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1265  }
1266  }
1267  if (!ra_exe_unit.groupby_exprs.empty()) {
1268  os << "\n\tGroup By: "
1270  ", ");
1271  }
1272  os << "\n\tProjected targets: "
1274  os << "\n\tHas Estimator: " << bool_to_string(ra_exe_unit.estimator == nullptr);
1275  os << "\n\tSort Info: ";
1276  const auto& sort_info = ra_exe_unit.sort_info;
1277  os << "\n\t Order Entries: "
1278  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1279  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1280  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1281  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1282  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1283  os << "\n\tBump Allocator: " << bool_to_string(ra_exe_unit.use_bump_allocator);
1284  if (ra_exe_unit.union_all) {
1285  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1286  }
1287  return os;
1288 }
1289 
1290 namespace {
1291 
1293  const size_t new_scan_limit) {
1294  return {ra_exe_unit_in.input_descs,
1295  ra_exe_unit_in.input_col_descs,
1296  ra_exe_unit_in.simple_quals,
1297  ra_exe_unit_in.quals,
1298  ra_exe_unit_in.join_quals,
1299  ra_exe_unit_in.groupby_exprs,
1300  ra_exe_unit_in.target_exprs,
1301  ra_exe_unit_in.estimator,
1302  ra_exe_unit_in.sort_info,
1303  new_scan_limit,
1304  ra_exe_unit_in.use_bump_allocator,
1305  ra_exe_unit_in.union_all,
1306  ra_exe_unit_in.query_state};
1307 }
1308 
1309 } // namespace
1310 
1311 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1312  const bool is_agg,
1313  const std::vector<InputTableInfo>& query_infos,
1314  const RelAlgExecutionUnit& ra_exe_unit_in,
1315  const CompilationOptions& co,
1316  const ExecutionOptions& eo,
1318  RenderInfo* render_info,
1319  const bool has_cardinality_estimation,
1320  ColumnCacheMap& column_cache) {
1321  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1322 
1323  ScopeGuard cleanup_post_execution = [this] {
1324  // cleanup/unpin GPU buffer allocations
1325  // TODO: separate out this state into a single object
1326  plan_state_.reset(nullptr);
1327  if (cgen_state_) {
1328  cgen_state_->in_values_bitmaps_.clear();
1329  }
1330  };
1331 
1332  try {
1333  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1334  is_agg,
1335  true,
1336  query_infos,
1337  ra_exe_unit_in,
1338  co,
1339  eo,
1340  cat,
1342  render_info,
1343  has_cardinality_estimation,
1344  column_cache);
1345  if (result) {
1346  result->setKernelQueueTime(kernel_queue_time_ms_);
1347  result->addCompilationQueueTime(compilation_queue_time_ms_);
1348  }
1349  return result;
1350  } catch (const CompilationRetryNewScanLimit& e) {
1351  auto result =
1352  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1353  is_agg,
1354  false,
1355  query_infos,
1356  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1357  co,
1358  eo,
1359  cat,
1361  render_info,
1362  has_cardinality_estimation,
1363  column_cache);
1364  if (result) {
1365  result->setKernelQueueTime(kernel_queue_time_ms_);
1366  result->addCompilationQueueTime(compilation_queue_time_ms_);
1367  }
1368  return result;
1369  }
1370 }
1371 
1373  size_t& max_groups_buffer_entry_guess,
1374  const bool is_agg,
1375  const bool allow_single_frag_table_opt,
1376  const std::vector<InputTableInfo>& query_infos,
1377  const RelAlgExecutionUnit& ra_exe_unit_in,
1378  const CompilationOptions& co,
1379  const ExecutionOptions& eo,
1381  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1382  RenderInfo* render_info,
1383  const bool has_cardinality_estimation,
1384  ColumnCacheMap& column_cache) {
1385  INJECT_TIMER(Exec_executeWorkUnit);
1386  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1387  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1388  CHECK(!query_infos.empty());
1389  if (!max_groups_buffer_entry_guess) {
1390  // The query has failed the first execution attempt because of running out
1391  // of group by slots. Make the conservative choice: allocate fragment size
1392  // slots and run on the CPU.
1393  CHECK(device_type == ExecutorDeviceType::CPU);
1394  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1395  }
1396 
1397  int8_t crt_min_byte_width{get_min_byte_width()};
1398  do {
1399  SharedKernelContext shared_context(query_infos);
1400  ColumnFetcher column_fetcher(this, column_cache);
1401  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1402  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1403  if (eo.executor_type == ExecutorType::Native) {
1404  try {
1405  INJECT_TIMER(query_step_compilation);
1406  auto clock_begin = timer_start();
1407  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1408  compilation_queue_time_ms_ += timer_stop(clock_begin);
1409 
1410  query_mem_desc_owned =
1411  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1412  crt_min_byte_width,
1413  has_cardinality_estimation,
1414  ra_exe_unit,
1415  query_infos,
1416  deleted_cols_map,
1417  column_fetcher,
1418  {device_type,
1419  co.hoist_literals,
1420  co.opt_level,
1422  co.allow_lazy_fetch,
1424  co.explain_type,
1426  eo,
1427  render_info,
1428  this);
1429  CHECK(query_mem_desc_owned);
1430  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1431  } catch (CompilationRetryNoCompaction&) {
1432  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1433  continue;
1434  }
1435  } else {
1436  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1437  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1438  CHECK(!query_mem_desc_owned);
1439  query_mem_desc_owned.reset(
1441  }
1442  if (eo.just_explain) {
1443  return executeExplain(*query_comp_desc_owned);
1444  }
1445 
1446  for (const auto target_expr : ra_exe_unit.target_exprs) {
1447  plan_state_->target_exprs_.push_back(target_expr);
1448  }
1449 
1450  if (!eo.just_validate) {
1451  int available_cpus = cpu_threads();
1452  auto available_gpus = get_available_gpus(cat);
1453 
1454  const auto context_count =
1455  get_context_count(device_type, available_cpus, available_gpus.size());
1456  try {
1457  auto kernels = createKernels(shared_context,
1458  ra_exe_unit,
1459  column_fetcher,
1460  query_infos,
1461  eo,
1462  is_agg,
1463  allow_single_frag_table_opt,
1464  context_count,
1465  *query_comp_desc_owned,
1466  *query_mem_desc_owned,
1467  render_info,
1468  available_gpus,
1469  available_cpus);
1470  if (g_use_tbb_pool) {
1471 #ifdef HAVE_TBB
1472  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1473  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1474  std::move(kernels));
1475 #else
1476  throw std::runtime_error(
1477  "This build is not TBB enabled. Restart the server with "
1478  "\"enable-modern-thread-pool\" disabled.");
1479 #endif
1480  } else {
1481  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1482  std::move(kernels));
1483  }
1484  } catch (QueryExecutionError& e) {
1485  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1486  e.getErrorCode() == ERR_OUT_OF_TIME) {
1487  resetInterrupt();
1489  }
1490  if (eo.allow_runtime_query_interrupt && interrupted_.load()) {
1491  resetInterrupt();
1493  }
1495  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1496  crt_min_byte_width <<= 1;
1497  continue;
1498  }
1499  throw;
1500  }
1501  }
1502  if (is_agg) {
1503  try {
1504  return collectAllDeviceResults(shared_context,
1505  ra_exe_unit,
1506  *query_mem_desc_owned,
1507  query_comp_desc_owned->getDeviceType(),
1508  row_set_mem_owner);
1509  } catch (ReductionRanOutOfSlots&) {
1511  } catch (OverflowOrUnderflow&) {
1512  crt_min_byte_width <<= 1;
1513  continue;
1514  }
1515  }
1516  return resultsUnion(shared_context, ra_exe_unit);
1517 
1518  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1519 
1520  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1523  nullptr,
1524  this);
1525 }
1526 
1528  const InputTableInfo& table_info,
1529  const CompilationOptions& co,
1530  const ExecutionOptions& eo,
1532  PerFragmentCallBack& cb) {
1533  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1534  ColumnCacheMap column_cache;
1535 
1536  std::vector<InputTableInfo> table_infos{table_info};
1537  SharedKernelContext kernel_context(table_infos);
1538 
1539  ColumnFetcher column_fetcher(this, column_cache);
1540  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1541  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1542  {
1543  auto clock_begin = timer_start();
1544  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1545  compilation_queue_time_ms_ += timer_stop(clock_begin);
1546  query_mem_desc_owned =
1547  query_comp_desc_owned->compile(0,
1548  8,
1549  /*has_cardinality_estimation=*/false,
1550  ra_exe_unit,
1551  table_infos,
1552  deleted_cols_map,
1553  column_fetcher,
1554  co,
1555  eo,
1556  nullptr,
1557  this);
1558  }
1559  CHECK(query_mem_desc_owned);
1560  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1561  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1562  const auto& outer_fragments = table_info.info.fragments;
1563 
1564  {
1565  auto clock_begin = timer_start();
1566  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1567  kernel_queue_time_ms_ += timer_stop(clock_begin);
1568 
1569  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1570  ++fragment_index) {
1571  // We may want to consider in the future allowing this to execute on devices other
1572  // than CPU
1573  FragmentsList fragments_list{{table_id, {fragment_index}}};
1574  ExecutionKernel kernel(ra_exe_unit,
1575  co.device_type,
1576  /*device_id=*/0,
1577  eo,
1578  column_fetcher,
1579  *query_comp_desc_owned,
1580  *query_mem_desc_owned,
1581  fragments_list,
1583  /*render_info=*/nullptr,
1584  /*rowid_lookup_key=*/-1);
1585  kernel.run(this, kernel_context);
1586  }
1587  }
1588 
1589  const auto& all_fragment_results = kernel_context.getFragmentResults();
1590 
1591  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1592  ++fragment_index) {
1593  const auto fragment_results = all_fragment_results[fragment_index];
1594  cb(fragment_results.first, outer_fragments[fragment_index]);
1595  }
1596 }
1597 
1599  const TableFunctionExecutionUnit exe_unit,
1600  const std::vector<InputTableInfo>& table_infos,
1601  const CompilationOptions& co,
1602  const ExecutionOptions& eo,
1604  INJECT_TIMER(Exec_executeTableFunction);
1605  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1606 
1607  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1608  // framework, we may want to move this up a level
1609 
1610  ColumnFetcher column_fetcher(this, column_cache);
1611  TableFunctionCompilationContext compilation_context;
1612  compilation_context.compile(exe_unit, co, this);
1613 
1615  return exe_context.execute(
1616  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1617 }
1618 
1620  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1621 }
1622 
1624  const RelAlgExecutionUnit& ra_exe_unit,
1625  const ExecutorDeviceType requested_device_type) {
1626  for (const auto target_expr : ra_exe_unit.target_exprs) {
1627  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1628  if (!ra_exe_unit.groupby_exprs.empty() &&
1629  !isArchPascalOrLater(requested_device_type)) {
1630  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1631  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1632  return ExecutorDeviceType::CPU;
1633  }
1634  }
1635  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1636  return ExecutorDeviceType::CPU;
1637  }
1638  }
1639  return requested_device_type;
1640 }
1641 
1642 namespace {
1643 
1644 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1645  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1646  if (ti.is_fp()) {
1647  if (float_argument_input && ti.get_type() == kFLOAT) {
1648  int64_t float_null_val = 0;
1649  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1650  static_cast<float>(inline_fp_null_val(ti));
1651  return float_null_val;
1652  }
1653  const auto double_null_val = inline_fp_null_val(ti);
1654  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1655  }
1656  return inline_int_null_val(ti);
1657 }
1658 
1659 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1660  std::vector<int64_t>& entry,
1661  const std::vector<Analyzer::Expr*>& target_exprs,
1662  const QueryMemoryDescriptor& query_mem_desc) {
1663  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1664  const auto target_expr = target_exprs[target_idx];
1665  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1666  CHECK(agg_info.is_agg);
1667  target_infos.push_back(agg_info);
1668  if (g_cluster) {
1669  const auto executor = query_mem_desc.getExecutor();
1670  CHECK(executor);
1671  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1672  CHECK(row_set_mem_owner);
1673  const auto& count_distinct_desc =
1674  query_mem_desc.getCountDistinctDescriptor(target_idx);
1675  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1676  CHECK(row_set_mem_owner);
1677  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1678  count_distinct_desc.bitmapPaddedSizeBytes());
1679  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1680  continue;
1681  }
1682  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1683  auto count_distinct_set = new std::set<int64_t>();
1684  CHECK(row_set_mem_owner);
1685  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1686  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1687  continue;
1688  }
1689  }
1690  const bool float_argument_input = takes_float_argument(agg_info);
1691  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1692  entry.push_back(0);
1693  } else if (agg_info.agg_kind == kAVG) {
1694  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1695  entry.push_back(0);
1696  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1697  if (agg_info.sql_type.is_geometry()) {
1698  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1699  entry.push_back(0);
1700  }
1701  } else if (agg_info.sql_type.is_varlen()) {
1702  entry.push_back(0);
1703  entry.push_back(0);
1704  } else {
1705  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1706  }
1707  } else {
1708  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1709  }
1710  }
1711 }
1712 
1714  const std::vector<Analyzer::Expr*>& target_exprs_in,
1715  const QueryMemoryDescriptor& query_mem_desc,
1716  const ExecutorDeviceType device_type) {
1717  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1718  std::vector<Analyzer::Expr*> target_exprs;
1719  for (const auto target_expr : target_exprs_in) {
1720  const auto target_expr_copy =
1721  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1722  CHECK(target_expr_copy);
1723  auto ti = target_expr->get_type_info();
1724  ti.set_notnull(false);
1725  target_expr_copy->set_type_info(ti);
1726  if (target_expr_copy->get_arg()) {
1727  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1728  arg_ti.set_notnull(false);
1729  target_expr_copy->get_arg()->set_type_info(arg_ti);
1730  }
1731  target_exprs_owned_copies.push_back(target_expr_copy);
1732  target_exprs.push_back(target_expr_copy.get());
1733  }
1734  std::vector<TargetInfo> target_infos;
1735  std::vector<int64_t> entry;
1736  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1737  const auto executor = query_mem_desc.getExecutor();
1738  CHECK(executor);
1739  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1740  CHECK(row_set_mem_owner);
1741  auto rs = std::make_shared<ResultSet>(
1742  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1743  rs->allocateStorage();
1744  rs->fillOneEntry(entry);
1745  return rs;
1746 }
1747 
1748 } // namespace
1749 
1751  SharedKernelContext& shared_context,
1752  const RelAlgExecutionUnit& ra_exe_unit,
1753  const QueryMemoryDescriptor& query_mem_desc,
1754  const ExecutorDeviceType device_type,
1755  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1756  auto timer = DEBUG_TIMER(__func__);
1757  auto& result_per_device = shared_context.getFragmentResults();
1758  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1761  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1762  }
1763  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1764  try {
1765  return reduceSpeculativeTopN(
1766  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1767  } catch (const std::bad_alloc&) {
1768  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1769  }
1770  }
1771  const auto shard_count =
1772  device_type == ExecutorDeviceType::GPU
1774  : 0;
1775 
1776  if (shard_count && !result_per_device.empty()) {
1777  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1778  }
1779  return reduceMultiDeviceResults(
1780  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1781 }
1782 
1783 namespace {
1794 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1795  const QueryMemoryDescriptor& input_query_mem_desc,
1796  const ResultSetStorage* output_storage,
1797  size_t output_row_index,
1798  const QueryMemoryDescriptor& output_query_mem_desc,
1799  const std::vector<uint32_t>& top_permutation) {
1800  const auto output_buffer = output_storage->getUnderlyingBuffer();
1801  const auto input_buffer = input_storage->getUnderlyingBuffer();
1802  for (const auto sorted_idx : top_permutation) {
1803  // permuting all group-columns in this result set into the final buffer:
1804  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1805  group_idx++) {
1806  const auto input_column_ptr =
1807  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1808  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1809  const auto output_column_ptr =
1810  output_buffer +
1811  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1812  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1813  memcpy(output_column_ptr,
1814  input_column_ptr,
1815  output_query_mem_desc.groupColWidth(group_idx));
1816  }
1817  // permuting all agg-columns in this result set into the final buffer:
1818  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1819  slot_idx++) {
1820  const auto input_column_ptr =
1821  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1822  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1823  const auto output_column_ptr =
1824  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1825  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1826  memcpy(output_column_ptr,
1827  input_column_ptr,
1828  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1829  }
1830  ++output_row_index;
1831  }
1832  return output_row_index;
1833 }
1834 
1844 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1845  const ResultSetStorage* output_storage,
1846  size_t output_row_index,
1847  const QueryMemoryDescriptor& output_query_mem_desc,
1848  const std::vector<uint32_t>& top_permutation) {
1849  const auto output_buffer = output_storage->getUnderlyingBuffer();
1850  const auto input_buffer = input_storage->getUnderlyingBuffer();
1851  for (const auto sorted_idx : top_permutation) {
1852  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1853  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1854  row_ptr,
1855  output_query_mem_desc.getRowSize());
1856  ++output_row_index;
1857  }
1858  return output_row_index;
1859 }
1860 } // namespace
1861 
1862 // Collect top results from each device, stitch them together and sort. Partial
1863 // results from each device are guaranteed to be disjunct because we only go on
1864 // this path when one of the columns involved is a shard key.
1866  SharedKernelContext& shared_context,
1867  const RelAlgExecutionUnit& ra_exe_unit) const {
1868  auto& result_per_device = shared_context.getFragmentResults();
1869  const auto first_result_set = result_per_device.front().first;
1870  CHECK(first_result_set);
1871  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1872  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1873  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1874  top_query_mem_desc.setEntryCount(0);
1875  for (auto& result : result_per_device) {
1876  const auto result_set = result.first;
1877  CHECK(result_set);
1878  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1879  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1880  top_query_mem_desc.setEntryCount(new_entry_cnt);
1881  }
1882  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1883  first_result_set->getDeviceType(),
1884  top_query_mem_desc,
1885  first_result_set->getRowSetMemOwner(),
1886  this);
1887  auto top_storage = top_result_set->allocateStorage();
1888  size_t top_output_row_idx{0};
1889  for (auto& result : result_per_device) {
1890  const auto result_set = result.first;
1891  CHECK(result_set);
1892  const auto& top_permutation = result_set->getPermutationBuffer();
1893  CHECK_LE(top_permutation.size(), top_n);
1894  if (top_query_mem_desc.didOutputColumnar()) {
1895  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1896  result_set->getQueryMemDesc(),
1897  top_storage,
1898  top_output_row_idx,
1899  top_query_mem_desc,
1900  top_permutation);
1901  } else {
1902  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1903  top_storage,
1904  top_output_row_idx,
1905  top_query_mem_desc,
1906  top_permutation);
1907  }
1908  }
1909  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1910  return top_result_set;
1911 }
1912 
1913 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1914  const {
1915  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1916  const auto& join_info = plan_state_->join_info_;
1917  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1918  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1919  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1920  id_to_cond.insert(
1921  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1922  }
1923  return id_to_cond;
1924 }
1925 
1926 namespace {
1927 
1928 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1929  for (const auto& col : fetched_cols) {
1930  if (col.is_lazily_fetched) {
1931  return true;
1932  }
1933  }
1934  return false;
1935 }
1936 
1937 } // namespace
1938 
1939 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
1940  SharedKernelContext& shared_context,
1941  const RelAlgExecutionUnit& ra_exe_unit,
1942  ColumnFetcher& column_fetcher,
1943  const std::vector<InputTableInfo>& table_infos,
1944  const ExecutionOptions& eo,
1945  const bool is_agg,
1946  const bool allow_single_frag_table_opt,
1947  const size_t context_count,
1948  const QueryCompilationDescriptor& query_comp_desc,
1949  const QueryMemoryDescriptor& query_mem_desc,
1950  RenderInfo* render_info,
1951  std::unordered_set<int>& available_gpus,
1952  int& available_cpus) {
1953  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
1954 
1955  QueryFragmentDescriptor fragment_descriptor(
1956  ra_exe_unit,
1957  table_infos,
1958  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
1960  : std::vector<Data_Namespace::MemoryInfo>{},
1963  CHECK(!ra_exe_unit.input_descs.empty());
1964 
1965  const auto device_type = query_comp_desc.getDeviceType();
1966  const bool uses_lazy_fetch =
1967  plan_state_->allow_lazy_fetch_ &&
1969  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1970  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1971  const auto device_count = deviceCount(device_type);
1972  CHECK_GT(device_count, 0);
1973 
1974  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1975  shared_context.getFragOffsets(),
1976  device_count,
1977  device_type,
1978  use_multifrag_kernel,
1980  this);
1981  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1982  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1983  }
1984 
1985  if (use_multifrag_kernel) {
1986  VLOG(1) << "Creating multifrag execution kernels";
1987  VLOG(1) << query_mem_desc.toString();
1988 
1989  // NB: We should never be on this path when the query is retried because of running
1990  // out of group by slots; also, for scan only queries on CPU we want the
1991  // high-granularity, fragment by fragment execution instead. For scan only queries on
1992  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1993  // buffer per fragment.
1994  auto multifrag_kernel_dispatch = [&ra_exe_unit,
1995  &execution_kernels,
1996  &column_fetcher,
1997  &eo,
1998  &query_comp_desc,
1999  &query_mem_desc,
2000  render_info](const int device_id,
2001  const FragmentsList& frag_list,
2002  const int64_t rowid_lookup_key) {
2003  execution_kernels.emplace_back(
2004  std::make_unique<ExecutionKernel>(ra_exe_unit,
2006  device_id,
2007  eo,
2008  column_fetcher,
2009  query_comp_desc,
2010  query_mem_desc,
2011  frag_list,
2013  render_info,
2014  rowid_lookup_key));
2015  };
2016  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2017  } else {
2018  VLOG(1) << "Creating one execution kernel per fragment";
2019  VLOG(1) << query_mem_desc.toString();
2020 
2021  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2023  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2024  const auto max_frag_size =
2025  table_infos.front().info.getFragmentNumTuplesUpperBound();
2026  if (max_frag_size < query_mem_desc.getEntryCount()) {
2027  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2028  << " to match max fragment size " << max_frag_size
2029  << " for kernel per fragment execution path.";
2030  throw CompilationRetryNewScanLimit(max_frag_size);
2031  }
2032  }
2033 
2034  size_t frag_list_idx{0};
2035  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2036  &execution_kernels,
2037  &column_fetcher,
2038  &eo,
2039  &frag_list_idx,
2040  &device_type,
2041  &query_comp_desc,
2042  &query_mem_desc,
2043  render_info](const int device_id,
2044  const FragmentsList& frag_list,
2045  const int64_t rowid_lookup_key) {
2046  if (!frag_list.size()) {
2047  return;
2048  }
2049  CHECK_GE(device_id, 0);
2050 
2051  execution_kernels.emplace_back(
2052  std::make_unique<ExecutionKernel>(ra_exe_unit,
2053  device_type,
2054  device_id,
2055  eo,
2056  column_fetcher,
2057  query_comp_desc,
2058  query_mem_desc,
2059  frag_list,
2061  render_info,
2062  rowid_lookup_key));
2063  ++frag_list_idx;
2064  };
2065 
2066  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2067  ra_exe_unit);
2068  }
2069 
2070  return execution_kernels;
2071 }
2072 
2073 template <typename THREAD_POOL>
2075  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2076  auto clock_begin = timer_start();
2077  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2078  kernel_queue_time_ms_ += timer_stop(clock_begin);
2079 
2080  THREAD_POOL thread_pool;
2081  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2082  for (auto& kernel : kernels) {
2083  thread_pool.spawn(
2084  [this, &shared_context, parent_thread_id = logger::thread_id()](
2085  ExecutionKernel* kernel) {
2086  CHECK(kernel);
2087  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2088  kernel->run(this, shared_context);
2089  },
2090  kernel.get());
2091  }
2092  thread_pool.join();
2093 }
2094 
2096  const RelAlgExecutionUnit& ra_exe_unit,
2097  const ExecutorDeviceType device_type,
2098  const size_t table_idx,
2099  const size_t outer_frag_idx,
2100  std::map<int, const TableFragments*>& selected_tables_fragments,
2101  const std::unordered_map<int, const Analyzer::BinOper*>&
2102  inner_table_id_to_join_condition) {
2103  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2104  auto table_frags_it = selected_tables_fragments.find(table_id);
2105  CHECK(table_frags_it != selected_tables_fragments.end());
2106  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2107  const auto outer_table_fragments_it =
2108  selected_tables_fragments.find(outer_input_desc.getTableId());
2109  const auto outer_table_fragments = outer_table_fragments_it->second;
2110  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2111  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2112  if (!table_idx) {
2113  return {outer_frag_idx};
2114  }
2115  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2116  auto& inner_frags = table_frags_it->second;
2117  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2118  std::vector<size_t> all_frag_ids;
2119  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2120  ++inner_frag_idx) {
2121  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2122  if (skipFragmentPair(outer_fragment_info,
2123  inner_frag_info,
2124  table_idx,
2125  inner_table_id_to_join_condition,
2126  ra_exe_unit,
2127  device_type)) {
2128  continue;
2129  }
2130  all_frag_ids.push_back(inner_frag_idx);
2131  }
2132  return all_frag_ids;
2133 }
2134 
2135 // Returns true iff the join between two fragments cannot yield any results, per
2136 // shard information. The pair can be skipped to avoid full broadcast.
2138  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2139  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2140  const int table_idx,
2141  const std::unordered_map<int, const Analyzer::BinOper*>&
2142  inner_table_id_to_join_condition,
2143  const RelAlgExecutionUnit& ra_exe_unit,
2144  const ExecutorDeviceType device_type) {
2145  if (device_type != ExecutorDeviceType::GPU) {
2146  return false;
2147  }
2148  CHECK(table_idx >= 0 &&
2149  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2150  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2151  // Both tables need to be sharded the same way.
2152  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2153  outer_fragment_info.shard == inner_fragment_info.shard) {
2154  return false;
2155  }
2156  const Analyzer::BinOper* join_condition{nullptr};
2157  if (ra_exe_unit.join_quals.empty()) {
2158  CHECK(!inner_table_id_to_join_condition.empty());
2159  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2160  CHECK(condition_it != inner_table_id_to_join_condition.end());
2161  join_condition = condition_it->second;
2162  CHECK(join_condition);
2163  } else {
2164  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2165  plan_state_->join_info_.join_hash_tables_.size());
2166  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2167  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2168  table_idx) {
2169  CHECK(!join_condition);
2170  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2171  }
2172  }
2173  }
2174  if (!join_condition) {
2175  return false;
2176  }
2177  // TODO(adb): support fragment skipping based on the overlaps operator
2178  if (join_condition->is_overlaps_oper()) {
2179  return false;
2180  }
2181  size_t shard_count{0};
2182  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2183  join_condition->get_left_operand())) {
2184  auto inner_outer_pairs =
2185  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2187  join_condition, this, inner_outer_pairs);
2188  } else {
2189  shard_count = get_shard_count(join_condition, this);
2190  }
2191  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2192  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2193  }
2194  return shard_count;
2195 }
2196 
2197 namespace {
2198 
2201  const int table_id = col_desc->getScanDesc().getTableId();
2202  const int col_id = col_desc->getColId();
2203  return get_column_descriptor_maybe(col_id, table_id, cat);
2204 }
2205 
2206 } // namespace
2207 
2208 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2209  const std::vector<InputDescriptor>& input_descs,
2210  const std::map<int, const TableFragments*>& all_tables_fragments) {
2211  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2212  for (auto& desc : input_descs) {
2213  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2214  CHECK(fragments_it != all_tables_fragments.end());
2215  const auto& fragments = *fragments_it->second;
2216  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2217  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2218  frag_offsets[i] = off;
2219  off += fragments[i].getNumTuples();
2220  }
2221  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2222  }
2223  return tab_id_to_frag_offsets;
2224 }
2225 
2226 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2228  const RelAlgExecutionUnit& ra_exe_unit,
2229  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2230  const std::vector<InputDescriptor>& input_descs,
2231  const std::map<int, const TableFragments*>& all_tables_fragments) {
2232  std::vector<std::vector<int64_t>> all_num_rows;
2233  std::vector<std::vector<uint64_t>> all_frag_offsets;
2234  const auto tab_id_to_frag_offsets =
2235  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2236  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2237  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2238  std::vector<int64_t> num_rows;
2239  std::vector<uint64_t> frag_offsets;
2240  if (!ra_exe_unit.union_all) {
2241  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2242  }
2243  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2244  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2245  const auto fragments_it =
2246  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2247  CHECK(fragments_it != all_tables_fragments.end());
2248  const auto& fragments = *fragments_it->second;
2249  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2250  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2251  const auto& fragment = fragments[frag_id];
2252  num_rows.push_back(fragment.getNumTuples());
2253  } else {
2254  size_t total_row_count{0};
2255  for (const auto& fragment : fragments) {
2256  total_row_count += fragment.getNumTuples();
2257  }
2258  num_rows.push_back(total_row_count);
2259  }
2260  const auto frag_offsets_it =
2261  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2262  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2263  const auto& offsets = frag_offsets_it->second;
2264  CHECK_LT(frag_id, offsets.size());
2265  frag_offsets.push_back(offsets[frag_id]);
2266  }
2267  all_num_rows.push_back(num_rows);
2268  // Fragment offsets of outer table should be ONLY used by rowid for now.
2269  all_frag_offsets.push_back(frag_offsets);
2270  }
2271  return {all_num_rows, all_frag_offsets};
2272 }
2273 
2274 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2275 // all the table fragments.
2277  const RelAlgExecutionUnit& ra_exe_unit,
2278  const FragmentsList& selected_fragments) const {
2279  const auto& input_descs = ra_exe_unit.input_descs;
2280  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2281  if (nest_level < 1 ||
2282  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2283  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2284  (ra_exe_unit.join_quals.empty() &&
2285  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2286  return false;
2287  }
2288  const int table_id = inner_col_desc.getScanDesc().getTableId();
2289  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2290  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2291  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2292  return fragments.size() > 1;
2293 }
2294 
2295 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2296  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2297  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2298  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2299 }
2300 
2302  const ColumnFetcher& column_fetcher,
2303  const RelAlgExecutionUnit& ra_exe_unit,
2304  const int device_id,
2305  const Data_Namespace::MemoryLevel memory_level,
2306  const std::map<int, const TableFragments*>& all_tables_fragments,
2307  const FragmentsList& selected_fragments,
2309  std::list<ChunkIter>& chunk_iterators,
2310  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2311  DeviceAllocator* device_allocator) {
2312  auto timer = DEBUG_TIMER(__func__);
2314  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2315  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2316  std::vector<size_t> local_col_to_frag_pos;
2317  buildSelectedFragsMapping(selected_fragments_crossjoin,
2318  local_col_to_frag_pos,
2319  col_global_ids,
2320  selected_fragments,
2321  ra_exe_unit);
2322 
2324  selected_fragments_crossjoin);
2325 
2326  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2327  std::vector<std::vector<int64_t>> all_num_rows;
2328  std::vector<std::vector<uint64_t>> all_frag_offsets;
2329 
2330  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2331  std::vector<const int8_t*> frag_col_buffers(
2332  plan_state_->global_to_local_col_ids_.size());
2333  for (const auto& col_id : col_global_ids) {
2334  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2336  interrupted_.load()) {
2337  resetInterrupt();
2339  }
2340  CHECK(col_id);
2341  const int table_id = col_id->getScanDesc().getTableId();
2342  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2343  if (cd && cd->isVirtualCol) {
2344  CHECK_EQ("rowid", cd->columnName);
2345  continue;
2346  }
2347  const auto fragments_it = all_tables_fragments.find(table_id);
2348  CHECK(fragments_it != all_tables_fragments.end());
2349  const auto fragments = fragments_it->second;
2350  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2351  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2352  CHECK_LT(static_cast<size_t>(it->second),
2353  plan_state_->global_to_local_col_ids_.size());
2354  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2355  if (!fragments->size()) {
2356  return {};
2357  }
2358  CHECK_LT(frag_id, fragments->size());
2359  auto memory_level_for_column = memory_level;
2360  if (plan_state_->columns_to_fetch_.find(
2361  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2362  plan_state_->columns_to_fetch_.end()) {
2363  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2364  }
2365  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2366  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2367  col_id.get(), memory_level_for_column, device_id, device_allocator);
2368  } else {
2369  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2370  frag_col_buffers[it->second] =
2371  column_fetcher.getAllTableColumnFragments(table_id,
2372  col_id->getColId(),
2373  all_tables_fragments,
2374  memory_level_for_column,
2375  device_id,
2376  device_allocator);
2377  } else {
2378  frag_col_buffers[it->second] =
2379  column_fetcher.getOneTableColumnFragment(table_id,
2380  frag_id,
2381  col_id->getColId(),
2382  all_tables_fragments,
2383  chunks,
2384  chunk_iterators,
2385  memory_level_for_column,
2386  device_id,
2387  device_allocator);
2388  }
2389  }
2390  }
2391  all_frag_col_buffers.push_back(frag_col_buffers);
2392  }
2393  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2394  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2395  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2396 }
2397 
2398 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2399 // This is written under the assumption that multiple inputs implies a UNION ALL.
2401  const ColumnFetcher& column_fetcher,
2402  const RelAlgExecutionUnit& ra_exe_unit,
2403  const int device_id,
2404  const Data_Namespace::MemoryLevel memory_level,
2405  const std::map<int, const TableFragments*>& all_tables_fragments,
2406  const FragmentsList& selected_fragments,
2408  std::list<ChunkIter>& chunk_iterators,
2409  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2410  DeviceAllocator* device_allocator) {
2411  auto timer = DEBUG_TIMER(__func__);
2413 
2414  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2415  std::vector<std::vector<int64_t>> all_num_rows;
2416  std::vector<std::vector<uint64_t>> all_frag_offsets;
2417 
2418  CHECK(!selected_fragments.empty());
2419  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2420  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2421  using TableId = int;
2422  TableId const selected_table_id = selected_fragments.front().table_id;
2423  bool const input_descs_index =
2424  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2425  if (!input_descs_index) {
2426  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2427  }
2428  bool const input_col_descs_index =
2429  selected_table_id ==
2430  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2431  if (!input_col_descs_index) {
2432  CHECK_EQ(selected_table_id,
2433  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2434  }
2435  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2436  << " selected_table_id=" << selected_table_id
2437  << " input_descs_index=" << int(input_descs_index)
2438  << " input_col_descs_index=" << int(input_col_descs_index)
2439  << " ra_exe_unit.input_descs="
2440  << shared::printContainer(ra_exe_unit.input_descs)
2441  << " ra_exe_unit.input_col_descs="
2442  << shared::printContainer(ra_exe_unit.input_col_descs);
2443 
2444  // Partition col_global_ids by table_id
2445  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2446  table_id_to_input_col_descs;
2447  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2448  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2449  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2450  }
2451  for (auto const& pair : table_id_to_input_col_descs) {
2452  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2453  std::vector<size_t> local_col_to_frag_pos;
2454 
2455  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2456  local_col_to_frag_pos,
2457  pair.second,
2458  selected_fragments,
2459  ra_exe_unit);
2460 
2462  selected_fragments_crossjoin);
2463 
2464  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2465  std::vector<const int8_t*> frag_col_buffers(
2466  plan_state_->global_to_local_col_ids_.size());
2467  for (const auto& col_id : pair.second) {
2468  CHECK(col_id);
2469  const int table_id = col_id->getScanDesc().getTableId();
2470  CHECK_EQ(table_id, pair.first);
2471  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2472  if (cd && cd->isVirtualCol) {
2473  CHECK_EQ("rowid", cd->columnName);
2474  continue;
2475  }
2476  const auto fragments_it = all_tables_fragments.find(table_id);
2477  CHECK(fragments_it != all_tables_fragments.end());
2478  const auto fragments = fragments_it->second;
2479  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2480  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2481  CHECK_LT(static_cast<size_t>(it->second),
2482  plan_state_->global_to_local_col_ids_.size());
2483  const size_t frag_id = ra_exe_unit.union_all
2484  ? 0
2485  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2486  if (!fragments->size()) {
2487  return {};
2488  }
2489  CHECK_LT(frag_id, fragments->size());
2490  auto memory_level_for_column = memory_level;
2491  if (plan_state_->columns_to_fetch_.find(
2492  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2493  plan_state_->columns_to_fetch_.end()) {
2494  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2495  }
2496  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2497  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2498  col_id.get(), memory_level_for_column, device_id, device_allocator);
2499  } else {
2500  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2501  frag_col_buffers[it->second] =
2502  column_fetcher.getAllTableColumnFragments(table_id,
2503  col_id->getColId(),
2504  all_tables_fragments,
2505  memory_level_for_column,
2506  device_id,
2507  device_allocator);
2508  } else {
2509  frag_col_buffers[it->second] =
2510  column_fetcher.getOneTableColumnFragment(table_id,
2511  frag_id,
2512  col_id->getColId(),
2513  all_tables_fragments,
2514  chunks,
2515  chunk_iterators,
2516  memory_level_for_column,
2517  device_id,
2518  device_allocator);
2519  }
2520  }
2521  }
2522  all_frag_col_buffers.push_back(frag_col_buffers);
2523  }
2524  std::vector<std::vector<int64_t>> num_rows;
2525  std::vector<std::vector<uint64_t>> frag_offsets;
2526  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2527  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2528  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2529  all_frag_offsets.insert(
2530  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2531  }
2532  // UNION ALL hacks.
2533  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2534  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2535  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2536  }
2537  if (input_descs_index == input_col_descs_index) {
2538  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2539  }
2540 
2541  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2542  << " all_num_rows=" << shared::printContainer(all_num_rows)
2543  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2544  << " input_col_descs_index=" << input_col_descs_index;
2545  return {{all_frag_col_buffers[input_descs_index]},
2546  {{all_num_rows[0][input_descs_index]}},
2547  {{all_frag_offsets[0][input_descs_index]}}};
2548 }
2549 
2550 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2551  const size_t scan_idx,
2552  const RelAlgExecutionUnit& ra_exe_unit) {
2553  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2554  scan_idx > 0 &&
2555  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2556  !selected_fragments[scan_idx].fragment_ids.empty()) {
2557  // Fetch all fragments
2558  return {size_t(0)};
2559  }
2560 
2561  return selected_fragments[scan_idx].fragment_ids;
2562 }
2563 
2565  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2566  std::vector<size_t>& local_col_to_frag_pos,
2567  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2568  const FragmentsList& selected_fragments,
2569  const RelAlgExecutionUnit& ra_exe_unit) {
2570  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2571  size_t frag_pos{0};
2572  const auto& input_descs = ra_exe_unit.input_descs;
2573  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2574  const int table_id = input_descs[scan_idx].getTableId();
2575  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2576  selected_fragments_crossjoin.push_back(
2577  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2578  for (const auto& col_id : col_global_ids) {
2579  CHECK(col_id);
2580  const auto& input_desc = col_id->getScanDesc();
2581  if (input_desc.getTableId() != table_id ||
2582  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2583  continue;
2584  }
2585  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2586  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2587  CHECK_LT(static_cast<size_t>(it->second),
2588  plan_state_->global_to_local_col_ids_.size());
2589  local_col_to_frag_pos[it->second] = frag_pos;
2590  }
2591  ++frag_pos;
2592  }
2593 }
2594 
2596  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2597  std::vector<size_t>& local_col_to_frag_pos,
2598  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2599  const FragmentsList& selected_fragments,
2600  const RelAlgExecutionUnit& ra_exe_unit) {
2601  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2602  size_t frag_pos{0};
2603  const auto& input_descs = ra_exe_unit.input_descs;
2604  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2605  const int table_id = input_descs[scan_idx].getTableId();
2606  // selected_fragments here is from assignFragsToKernelDispatch
2607  // execution_kernel.fragments
2608  if (selected_fragments[0].table_id != table_id) { // TODO 0
2609  continue;
2610  }
2611  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2612  selected_fragments_crossjoin.push_back(
2613  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2614  {size_t(1)}); // TODO
2615  for (const auto& col_id : col_global_ids) {
2616  CHECK(col_id);
2617  const auto& input_desc = col_id->getScanDesc();
2618  if (input_desc.getTableId() != table_id ||
2619  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2620  continue;
2621  }
2622  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2623  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2624  CHECK_LT(static_cast<size_t>(it->second),
2625  plan_state_->global_to_local_col_ids_.size());
2626  local_col_to_frag_pos[it->second] = frag_pos;
2627  }
2628  ++frag_pos;
2629  }
2630 }
2631 
2632 namespace {
2633 
2635  public:
2636  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2638  for (auto out : out_vec_) {
2639  delete[] out;
2640  }
2641  }
2642 
2643  private:
2644  std::vector<int64_t*> out_vec_;
2645 };
2646 } // namespace
2647 
2649  const RelAlgExecutionUnit& ra_exe_unit,
2650  const CompilationResult& compilation_result,
2651  const bool hoist_literals,
2652  ResultSetPtr& results,
2653  const std::vector<Analyzer::Expr*>& target_exprs,
2654  const ExecutorDeviceType device_type,
2655  std::vector<std::vector<const int8_t*>>& col_buffers,
2656  QueryExecutionContext* query_exe_context,
2657  const std::vector<std::vector<int64_t>>& num_rows,
2658  const std::vector<std::vector<uint64_t>>& frag_offsets,
2659  Data_Namespace::DataMgr* data_mgr,
2660  const int device_id,
2661  const uint32_t start_rowid,
2662  const uint32_t num_tables,
2663  RenderInfo* render_info) {
2665  auto timer = DEBUG_TIMER(__func__);
2666  CHECK(!results);
2667  if (col_buffers.empty()) {
2668  return 0;
2669  }
2670 
2671  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2672  if (render_info) {
2673  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2674  // here, we are in non-insitu mode.
2675  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2676  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2677  "currently unsupported.";
2678  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2679  }
2680 
2681  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2682  std::vector<int64_t*> out_vec;
2683  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2684  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2685  std::unique_ptr<OutVecOwner> output_memory_scope;
2687  interrupted_.load()) {
2688  resetInterrupt();
2690  }
2691  if (device_type == ExecutorDeviceType::CPU) {
2692  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2693  compilation_result.generated_code);
2694  CHECK(cpu_generated_code);
2695  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2696  cpu_generated_code.get(),
2697  hoist_literals,
2698  hoist_buf,
2699  col_buffers,
2700  num_rows,
2701  frag_offsets,
2702  0,
2703  &error_code,
2704  num_tables,
2705  join_hash_table_ptrs);
2706  output_memory_scope.reset(new OutVecOwner(out_vec));
2707  } else {
2708  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2709  compilation_result.generated_code);
2710  CHECK(gpu_generated_code);
2711  try {
2712  out_vec = query_exe_context->launchGpuCode(
2713  ra_exe_unit,
2714  gpu_generated_code.get(),
2715  hoist_literals,
2716  hoist_buf,
2717  col_buffers,
2718  num_rows,
2719  frag_offsets,
2720  0,
2721  data_mgr,
2722  blockSize(),
2723  gridSize(),
2724  device_id,
2725  compilation_result.gpu_smem_context.getSharedMemorySize(),
2726  &error_code,
2727  num_tables,
2728  join_hash_table_ptrs,
2729  render_allocator_map_ptr);
2730  output_memory_scope.reset(new OutVecOwner(out_vec));
2731  } catch (const OutOfMemory&) {
2732  return ERR_OUT_OF_GPU_MEM;
2733  } catch (const std::exception& e) {
2734  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2735  }
2736  }
2737  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2738  error_code == Executor::ERR_DIV_BY_ZERO ||
2739  error_code == Executor::ERR_OUT_OF_TIME ||
2740  error_code == Executor::ERR_INTERRUPTED ||
2742  error_code == Executor::ERR_GEOS) {
2743  return error_code;
2744  }
2745  if (ra_exe_unit.estimator) {
2746  CHECK(!error_code);
2747  results =
2748  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2749  return 0;
2750  }
2751  std::vector<int64_t> reduced_outs;
2752  const auto num_frags = col_buffers.size();
2753  const size_t entry_count =
2754  device_type == ExecutorDeviceType::GPU
2755  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2756  ? 1
2757  : blockSize() * gridSize() * num_frags)
2758  : num_frags;
2759  if (size_t(1) == entry_count) {
2760  for (auto out : out_vec) {
2761  CHECK(out);
2762  reduced_outs.push_back(*out);
2763  }
2764  } else {
2765  size_t out_vec_idx = 0;
2766 
2767  for (const auto target_expr : target_exprs) {
2768  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2769  CHECK(agg_info.is_agg);
2770 
2771  const int num_iterations = agg_info.sql_type.is_geometry()
2772  ? agg_info.sql_type.get_physical_coord_cols()
2773  : 1;
2774 
2775  for (int i = 0; i < num_iterations; i++) {
2776  int64_t val1;
2777  const bool float_argument_input = takes_float_argument(agg_info);
2778  if (is_distinct_target(agg_info)) {
2779  CHECK(agg_info.agg_kind == kCOUNT ||
2780  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2781  val1 = out_vec[out_vec_idx][0];
2782  error_code = 0;
2783  } else {
2784  const auto chosen_bytes = static_cast<size_t>(
2785  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2786  std::tie(val1, error_code) = Executor::reduceResults(
2787  agg_info.agg_kind,
2788  agg_info.sql_type,
2789  query_exe_context->getAggInitValForIndex(out_vec_idx),
2790  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2791  out_vec[out_vec_idx],
2792  entry_count,
2793  false,
2794  float_argument_input);
2795  }
2796  if (error_code) {
2797  break;
2798  }
2799  reduced_outs.push_back(val1);
2800  if (agg_info.agg_kind == kAVG ||
2801  (agg_info.agg_kind == kSAMPLE &&
2802  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2803  const auto chosen_bytes = static_cast<size_t>(
2804  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
2805  1));
2806  int64_t val2;
2807  std::tie(val2, error_code) = Executor::reduceResults(
2808  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2809  agg_info.sql_type,
2810  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2811  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2812  out_vec[out_vec_idx + 1],
2813  entry_count,
2814  false,
2815  false);
2816  if (error_code) {
2817  break;
2818  }
2819  reduced_outs.push_back(val2);
2820  ++out_vec_idx;
2821  }
2822  ++out_vec_idx;
2823  }
2824  }
2825  }
2826 
2827  if (error_code) {
2828  return error_code;
2829  }
2830 
2831  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2832  auto rows_ptr = std::shared_ptr<ResultSet>(
2833  query_exe_context->query_buffers_->result_sets_[0].release());
2834  rows_ptr->fillOneEntry(reduced_outs);
2835  results = std::move(rows_ptr);
2836  return error_code;
2837 }
2838 
2839 namespace {
2840 
2841 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2842  CHECK(scan_limit);
2843  return results && results->rowCount() < scan_limit;
2844 }
2845 
2846 } // namespace
2847 
2849  const RelAlgExecutionUnit& ra_exe_unit,
2850  const CompilationResult& compilation_result,
2851  const bool hoist_literals,
2852  ResultSetPtr& results,
2853  const ExecutorDeviceType device_type,
2854  std::vector<std::vector<const int8_t*>>& col_buffers,
2855  const std::vector<size_t> outer_tab_frag_ids,
2856  QueryExecutionContext* query_exe_context,
2857  const std::vector<std::vector<int64_t>>& num_rows,
2858  const std::vector<std::vector<uint64_t>>& frag_offsets,
2859  Data_Namespace::DataMgr* data_mgr,
2860  const int device_id,
2861  const int outer_table_id,
2862  const int64_t scan_limit,
2863  const uint32_t start_rowid,
2864  const uint32_t num_tables,
2865  RenderInfo* render_info) {
2866  auto timer = DEBUG_TIMER(__func__);
2868  CHECK(!results);
2869  if (col_buffers.empty()) {
2870  return 0;
2871  }
2872  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2873  // TODO(alex):
2874  // 1. Optimize size (make keys more compact).
2875  // 2. Resize on overflow.
2876  // 3. Optimize runtime.
2877  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2878  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2879  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2881  interrupted_.load()) {
2882  return ERR_INTERRUPTED;
2883  }
2884 
2885  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2886  if (render_info && render_info->useCudaBuffers()) {
2887  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2888  }
2889 
2890  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
2891  << " ra_exe_unit.input_descs="
2892  << shared::printContainer(ra_exe_unit.input_descs)
2893  << " ra_exe_unit.input_col_descs="
2894  << shared::printContainer(ra_exe_unit.input_col_descs)
2895  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
2896  << " num_rows=" << shared::printContainer(num_rows)
2897  << " frag_offsets=" << shared::printContainer(frag_offsets)
2898  << " query_exe_context->query_buffers_->num_rows_="
2899  << query_exe_context->query_buffers_->num_rows_
2900  << " query_exe_context->query_mem_desc_.getEntryCount()="
2901  << query_exe_context->query_mem_desc_.getEntryCount()
2902  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
2903  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
2904  << " num_tables=" << num_tables;
2905 
2906  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
2907  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
2908  // with outer_table_id.
2909  if (ra_exe_unit_copy.union_all) {
2910  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
2911  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
2912  ra_exe_unit_copy.input_descs.end(),
2913  [outer_table_id](auto const& a, auto const& b) {
2914  return a.getTableId() == outer_table_id &&
2915  b.getTableId() != outer_table_id;
2916  });
2917  while (!ra_exe_unit_copy.input_descs.empty() &&
2918  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
2919  ra_exe_unit_copy.input_descs.pop_back();
2920  }
2921  // Filter ra_exe_unit_copy.input_col_descs.
2922  ra_exe_unit_copy.input_col_descs.remove_if(
2923  [outer_table_id](auto const& input_col_desc) {
2924  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2925  });
2926  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
2927  }
2928 
2929  if (device_type == ExecutorDeviceType::CPU) {
2930  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2931  compilation_result.generated_code);
2932  CHECK(cpu_generated_code);
2933  query_exe_context->launchCpuCode(
2934  ra_exe_unit_copy,
2935  cpu_generated_code.get(),
2936  hoist_literals,
2937  hoist_buf,
2938  col_buffers,
2939  num_rows,
2940  frag_offsets,
2941  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2942  &error_code,
2943  num_tables,
2944  join_hash_table_ptrs);
2945  } else {
2946  try {
2947  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2948  compilation_result.generated_code);
2949  CHECK(gpu_generated_code);
2950  query_exe_context->launchGpuCode(
2951  ra_exe_unit_copy,
2952  gpu_generated_code.get(),
2953  hoist_literals,
2954  hoist_buf,
2955  col_buffers,
2956  num_rows,
2957  frag_offsets,
2958  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2959  data_mgr,
2960  blockSize(),
2961  gridSize(),
2962  device_id,
2963  compilation_result.gpu_smem_context.getSharedMemorySize(),
2964  &error_code,
2965  num_tables,
2966  join_hash_table_ptrs,
2967  render_allocator_map_ptr);
2968  } catch (const OutOfMemory&) {
2969  return ERR_OUT_OF_GPU_MEM;
2970  } catch (const OutOfRenderMemory&) {
2971  return ERR_OUT_OF_RENDER_MEM;
2972  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2974  } catch (const std::exception& e) {
2975  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2976  }
2977  }
2978 
2979  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2980  error_code == Executor::ERR_DIV_BY_ZERO ||
2981  error_code == Executor::ERR_OUT_OF_TIME ||
2982  error_code == Executor::ERR_INTERRUPTED ||
2984  error_code == Executor::ERR_GEOS) {
2985  return error_code;
2986  }
2987 
2988  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2989  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2990  results = query_exe_context->getRowSet(ra_exe_unit_copy,
2991  query_exe_context->query_mem_desc_);
2992  CHECK(results);
2993  VLOG(2) << "results->rowCount()=" << results->rowCount();
2994  results->holdLiterals(hoist_buf);
2995  }
2996  if (error_code < 0 && render_allocator_map_ptr) {
2997  auto const adjusted_scan_limit =
2998  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
2999  // More rows passed the filter than available slots. We don't have a count to check,
3000  // so assume we met the limit if a scan limit is set
3001  if (adjusted_scan_limit != 0) {
3002  return 0;
3003  } else {
3004  return error_code;
3005  }
3006  }
3007  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3008  return error_code; // unlucky, not enough results and we ran out of slots
3009  }
3010 
3011  return 0;
3012 }
3013 
3014 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3015  const int device_id) {
3016  std::vector<int64_t> table_ptrs;
3017  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3018  for (auto hash_table : join_hash_tables) {
3019  if (!hash_table) {
3020  CHECK(table_ptrs.empty());
3021  return {};
3022  }
3023  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3024  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3025  }
3026  return table_ptrs;
3027 }
3028 
3029 void Executor::nukeOldState(const bool allow_lazy_fetch,
3030  const std::vector<InputTableInfo>& query_infos,
3031  const PlanState::DeletedColumnsMap& deleted_cols_map,
3032  const RelAlgExecutionUnit* ra_exe_unit) {
3035  const bool contains_left_deep_outer_join =
3036  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3037  ra_exe_unit->join_quals.end(),
3038  [](const JoinCondition& join_condition) {
3039  return join_condition.type == JoinType::LEFT;
3040  }) != ra_exe_unit->join_quals.end();
3041  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3042  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3043  query_infos,
3044  deleted_cols_map,
3045  this));
3046 }
3047 
3048 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3049  const std::vector<InputTableInfo>& query_infos) {
3051  const auto ld_count = input_descs.size();
3052  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3053  for (size_t i = 0; i < ld_count; ++i) {
3054  CHECK_LT(i, query_infos.size());
3055  const auto frag_count = query_infos[i].info.fragments.size();
3056  if (i > 0) {
3057  cgen_state_->frag_offsets_.push_back(nullptr);
3058  } else {
3059  if (frag_count > 1) {
3060  cgen_state_->frag_offsets_.push_back(
3061  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3062  } else {
3063  cgen_state_->frag_offsets_.push_back(nullptr);
3064  }
3065  }
3066  }
3067 }
3068 
3070  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3071  const std::vector<InputTableInfo>& query_infos,
3072  const MemoryLevel memory_level,
3073  const JoinHashTableInterface::HashType preferred_hash_type,
3074  ColumnCacheMap& column_cache) {
3075  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3076  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3077  }
3078  // check whether the interrupt flag turns on (non kernel-time query interrupt)
3080  interrupted_.load()) {
3081  resetInterrupt();
3083  }
3084  try {
3085  auto tbl =
3087  query_infos,
3088  memory_level,
3089  preferred_hash_type,
3090  deviceCountForMemoryLevel(memory_level),
3091  column_cache,
3092  this);
3093  return {tbl, ""};
3094  } catch (const HashJoinFail& e) {
3095  return {nullptr, e.what()};
3096  }
3097 }
3098 
3099 int8_t Executor::warpSize() const {
3100  CHECK(catalog_);
3101  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3102  CHECK(cuda_mgr);
3103  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3104  CHECK(!dev_props.empty());
3105  return dev_props.front().warpSize;
3106 }
3107 
3108 unsigned Executor::gridSize() const {
3109  CHECK(catalog_);
3110  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3111  CHECK(cuda_mgr);
3112  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3113 }
3114 
3115 unsigned Executor::numBlocksPerMP() const {
3116  CHECK(catalog_);
3117  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3118  CHECK(cuda_mgr);
3119  return grid_size_x_ ? std::ceil(grid_size_x_ / cuda_mgr->getMinNumMPsForAllDevices())
3120  : 2;
3121 }
3122 
3123 unsigned Executor::blockSize() const {
3124  CHECK(catalog_);
3125  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3126  CHECK(cuda_mgr);
3127  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3128  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3129 }
3130 
3132  return max_gpu_slab_size_;
3133 }
3134 
3135 int64_t Executor::deviceCycles(int milliseconds) const {
3136  CHECK(catalog_);
3137  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3138  CHECK(cuda_mgr);
3139  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3140  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3141 }
3142 
3143 llvm::Value* Executor::castToFP(llvm::Value* val) {
3145  if (!val->getType()->isIntegerTy()) {
3146  return val;
3147  }
3148 
3149  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
3150  llvm::Type* dest_ty{nullptr};
3151  switch (val_width) {
3152  case 32:
3153  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
3154  break;
3155  case 64:
3156  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
3157  break;
3158  default:
3159  LOG(FATAL) << "Unsupported FP width: " << std::to_string(val_width);
3160  }
3161  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3162 }
3163 
3164 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3166  CHECK(val->getType()->isPointerTy());
3167 
3168  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3169  const auto val_type = val_ptr_type->getElementType();
3170  size_t val_width = 0;
3171  if (val_type->isIntegerTy()) {
3172  val_width = val_type->getIntegerBitWidth();
3173  } else {
3174  if (val_type->isFloatTy()) {
3175  val_width = 32;
3176  } else {
3177  CHECK(val_type->isDoubleTy());
3178  val_width = 64;
3179  }
3180  }
3181  CHECK_LT(size_t(0), val_width);
3182  if (bitWidth == val_width) {
3183  return val;
3184  }
3185  return cgen_state_->ir_builder_.CreateBitCast(
3186  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3187 }
3188 
3189 #define EXECUTE_INCLUDE
3190 #include "ArrayOps.cpp"
3191 #include "DateAdd.cpp"
3192 #include "StringFunctions.cpp"
3193 #undef EXECUTE_INCLUDE
3194 
3195 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3196  const RelAlgExecutionUnit& ra_exe_unit,
3197  const CompilationOptions& co) {
3198  if (!co.filter_on_deleted_column) {
3199  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3200  }
3201  auto ra_exe_unit_with_deleted = ra_exe_unit;
3202  PlanState::DeletedColumnsMap deleted_cols_map;
3203  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3204  if (input_table.getSourceType() != InputSourceType::TABLE) {
3205  continue;
3206  }
3207  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3208  CHECK(td);
3209  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3210  if (!deleted_cd) {
3211  continue;
3212  }
3213  CHECK(deleted_cd->columnType.is_boolean());
3214  // check deleted column is not already present
3215  bool found = false;
3216  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3217  if (input_col.get()->getColId() == deleted_cd->columnId &&
3218  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3219  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3220  found = true;
3221  }
3222  }
3223  if (!found) {
3224  // add deleted column
3225  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3226  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3227  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3228  if (deleted_cols_it == deleted_cols_map.end()) {
3229  CHECK(deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd))
3230  .second);
3231  } else {
3232  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3233  }
3234  }
3235  }
3236  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3237 }
3238 
3239 namespace {
3240 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3241 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3242 // return type.
3243 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3244  const int64_t chunk_min,
3245  const int64_t chunk_max,
3246  const SQLTypeInfo& lhs_type,
3247  const SQLTypeInfo& rhs_type) {
3248  const int32_t ldim = lhs_type.get_dimension();
3249  const int32_t rdim = rhs_type.get_dimension();
3250  CHECK(ldim != rdim);
3251  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3252  if (ldim > rdim) {
3253  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3254  return {true, chunk_min / scale, chunk_max / scale};
3255  }
3256 
3257  using checked_int64_t = boost::multiprecision::number<
3258  boost::multiprecision::cpp_int_backend<64,
3259  64,
3260  boost::multiprecision::signed_magnitude,
3261  boost::multiprecision::checked,
3262  void>>;
3263 
3264  try {
3265  auto ret =
3266  std::make_tuple(true,
3267  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3268  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3269  return ret;
3270  } catch (const std::overflow_error& e) {
3271  // noop
3272  }
3273  return std::make_tuple(false, chunk_min, chunk_max);
3274 }
3275 
3276 } // namespace
3277 
3278 std::pair<bool, int64_t> Executor::skipFragment(
3279  const InputDescriptor& table_desc,
3280  const Fragmenter_Namespace::FragmentInfo& fragment,
3281  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3282  const std::vector<uint64_t>& frag_offsets,
3283  const size_t frag_idx) {
3284  const int table_id = table_desc.getTableId();
3285  for (const auto& simple_qual : simple_quals) {
3286  const auto comp_expr =
3287  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3288  if (!comp_expr) {
3289  // is this possible?
3290  return {false, -1};
3291  }
3292  const auto lhs = comp_expr->get_left_operand();
3293  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3294  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3295  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3296  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3297  if (lhs_uexpr) {
3298  CHECK(lhs_uexpr->get_optype() ==
3299  kCAST); // We should have only been passed a cast expression
3300  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3301  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3302  continue;
3303  }
3304  } else {
3305  continue;
3306  }
3307  }
3308  const auto rhs = comp_expr->get_right_operand();
3309  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3310  if (!rhs_const) {
3311  // is this possible?
3312  return {false, -1};
3313  }
3314  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3315  continue;
3316  }
3317  const int col_id = lhs_col->get_column_id();
3318  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3319  int64_t chunk_min{0};
3320  int64_t chunk_max{0};
3321  bool is_rowid{false};
3322  size_t start_rowid{0};
3323  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3324  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3325  if (cd->isVirtualCol) {
3326  CHECK(cd->columnName == "rowid");
3327  const auto& table_generation = getTableGeneration(table_id);
3328  start_rowid = table_generation.start_rowid;
3329  chunk_min = frag_offsets[frag_idx] + start_rowid;
3330  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3331  is_rowid = true;
3332  }
3333  } else {
3334  const auto& chunk_type = lhs_col->get_type_info();
3335  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3336  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3337  }
3338  if (chunk_min > chunk_max) {
3339  // invalid metadata range, do not skip fragment
3340  return {false, -1};
3341  }
3342  if (lhs->get_type_info().is_timestamp() &&
3343  (lhs_col->get_type_info().get_dimension() !=
3344  rhs_const->get_type_info().get_dimension()) &&
3345  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3346  rhs_const->get_type_info().is_high_precision_timestamp())) {
3347  // If original timestamp lhs col has different precision,
3348  // column metadata holds value in original precision
3349  // therefore adjust rhs value to match lhs precision
3350 
3351  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3352  // artificially limit the lhs column range. RHS overflow/underflow is already
3353  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3354  bool is_valid;
3355  std::tie(is_valid, chunk_min, chunk_max) =
3357  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3358  if (!is_valid) {
3359  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3360  "value: "
3361  << std::to_string(chunk_min)
3362  << "\nChunk max value: " << std::to_string(chunk_max)
3363  << "\nLHS col precision is: "
3364  << std::to_string(lhs_col->get_type_info().get_dimension())
3365  << "\nRHS precision is: "
3366  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3367  return {false, -1};
3368  }
3369  }
3370  llvm::LLVMContext local_context;
3371  CgenState local_cgen_state(local_context);
3372  CodeGenerator code_generator(&local_cgen_state, nullptr);
3373 
3374  const auto rhs_val =
3375  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3376 
3377  switch (comp_expr->get_optype()) {
3378  case kGE:
3379  if (chunk_max < rhs_val) {
3380  return {true, -1};
3381  }
3382  break;
3383  case kGT:
3384  if (chunk_max <= rhs_val) {
3385  return {true, -1};
3386  }
3387  break;
3388  case kLE:
3389  if (chunk_min > rhs_val) {
3390  return {true, -1};
3391  }
3392  break;
3393  case kLT:
3394  if (chunk_min >= rhs_val) {
3395  return {true, -1};
3396  }
3397  break;
3398  case kEQ:
3399  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3400  return {true, -1};
3401  } else if (is_rowid) {
3402  return {false, rhs_val - start_rowid};
3403  }
3404  break;
3405  default:
3406  break;
3407  }
3408  }
3409  return {false, -1};
3410 }
3411 
3412 /*
3413  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3414  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3415  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3416  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3417  * if at least one of these skipFragment calls return a true statment in its first value.
3418  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3419  * second value only if its first value is "false".
3420  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3421  * i.e., we only skip if none of the quals trigger the code to update the
3422  * rowid_lookup_key
3423  * - Only AND operations are valid and considered:
3424  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3425  * the skip
3426  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3427  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3428  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3429  * implemented differently, which cause the first one to skip correctly, but the second
3430  * one will not skip.
3431  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3432  * possible
3433  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3434  * projection, no skipping
3435  */
3436 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3437  const InputDescriptor& table_desc,
3438  const RelAlgExecutionUnit& ra_exe_unit,
3439  const Fragmenter_Namespace::FragmentInfo& fragment,
3440  const std::vector<uint64_t>& frag_offsets,
3441  const size_t frag_idx) {
3442  std::pair<bool, int64_t> skip_frag{false, -1};
3443  for (auto& inner_join : ra_exe_unit.join_quals) {
3444  if (inner_join.type != JoinType::INNER) {
3445  continue;
3446  }
3447 
3448  // extracting all the conjunctive simple_quals from the quals stored for the inner
3449  // join
3450  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3451  for (auto& qual : inner_join.quals) {
3452  auto temp_qual = qual_to_conjunctive_form(qual);
3453  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3454  temp_qual.simple_quals.begin(),
3455  temp_qual.simple_quals.end());
3456  }
3457  auto temp_skip_frag = skipFragment(
3458  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3459  if (temp_skip_frag.second != -1) {
3460  skip_frag.second = temp_skip_frag.second;
3461  return skip_frag;
3462  } else {
3463  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3464  }
3465  }
3466  return skip_frag;
3467 }
3468 
3470  const std::unordered_set<PhysicalInput>& phys_inputs) {
3471  AggregatedColRange agg_col_range_cache;
3472  CHECK(catalog_);
3473  std::unordered_set<int> phys_table_ids;
3474  for (const auto& phys_input : phys_inputs) {
3475  phys_table_ids.insert(phys_input.table_id);
3476  }
3477  std::vector<InputTableInfo> query_infos;
3478  for (const int table_id : phys_table_ids) {
3479  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3480  }
3481  for (const auto& phys_input : phys_inputs) {
3482  const auto cd =
3483  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3484  CHECK(cd);
3485  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3486  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3487  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3488  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3489  agg_col_range_cache.setColRange(phys_input, col_range);
3490  }
3491  }
3492  return agg_col_range_cache;
3493 }
3494 
3496  const std::unordered_set<PhysicalInput>& phys_inputs) {
3497  StringDictionaryGenerations string_dictionary_generations;
3498  CHECK(catalog_);
3499  for (const auto& phys_input : phys_inputs) {
3500  const auto cd =
3501  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3502  CHECK(cd);
3503  const auto& col_ti =
3504  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3505  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3506  const int dict_id = col_ti.get_comp_param();
3507  const auto dd = catalog_->getMetadataForDict(dict_id);
3508  CHECK(dd && dd->stringDict);
3509  string_dictionary_generations.setGeneration(dict_id,
3510  dd->stringDict->storageEntryCount());
3511  }
3512  }
3513  return string_dictionary_generations;
3514 }
3515 
3517  std::unordered_set<int> phys_table_ids) {
3518  TableGenerations table_generations;
3519  for (const int table_id : phys_table_ids) {
3520  const auto table_info = getTableInfo(table_id);
3521  table_generations.setGeneration(
3522  table_id,
3523  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3524  }
3525  return table_generations;
3526 }
3527 
3528 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3529  const std::unordered_set<int>& phys_table_ids) {
3530  CHECK(catalog_);
3531  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
3534  table_generations_ = computeTableGenerations(phys_table_ids);
3535 }
3536 
3538  return executor_session_mutex_;
3539 }
3540 
3541 template <>
3542 void Executor::setCurrentQuerySession(const std::string& query_session,
3543  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3544  if (current_query_session_ == "") {
3545  // only set the query session if it currently has an invalid session
3546  current_query_session_ = query_session;
3547  if (queries_session_map_.count(query_session)) {
3548  queries_session_map_.at(query_session).setQueryStatusAsRunning();
3549  }
3550  }
3551 }
3552 
3553 template <>
3555  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3556  return current_query_session_;
3557 }
3558 
3559 template <>
3560 bool Executor::checkCurrentQuerySession(const std::string& candidate_query_session,
3561  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3562  // if current_query_session is equal to the candidate_query_session,
3563  // or it is empty session we consider
3564  return (current_query_session_ == candidate_query_session);
3565 }
3566 
3567 template <>
3569  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3571 }
3572 
3573 template <>
3574 bool Executor::addToQuerySessionList(const std::string& query_session,
3575  const std::string& query_str,
3576  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3577  queries_session_map_.emplace(
3578  query_session,
3579  QuerySessionStatus(query_session, query_str, std::chrono::system_clock::now()));
3580  return queries_interrupt_flag_.emplace(query_session, false).second;
3581 }
3582 
3583 template <>
3585  const std::string& query_session,
3586  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3587  queries_session_map_.erase(query_session);
3588  return queries_interrupt_flag_.erase(query_session);
3589 }
3590 
3591 template <>
3593  const std::string& query_session,
3594  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3595  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
3596  queries_interrupt_flag_[query_session] = true;
3597  }
3598 }
3599 
3600 template <>
3602  const std::string& query_session,
3603  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3604  auto flag_it = queries_interrupt_flag_.find(query_session);
3605  return flag_it != queries_interrupt_flag_.end() && flag_it->second;
3606 }
3607 
3608 void Executor::enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const {
3609  // The only one scenario that we intentionally call this function is
3610  // to allow runtime query interrupt in QueryRunner for test cases.
3611  // Because test machine's default setting does not allow runtime query interrupt,
3612  // so we have to turn it on within test code if necessary.
3614  g_runtime_query_interrupt_frequency = interrupt_freq;
3615 }
3616 
3617 void Executor::addToCardinalityCache(const std::string& cache_key,
3618  const size_t cache_value) {
3620  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
3621  cardinality_cache_[cache_key] = cache_value;
3622  VLOG(1) << "Put estimated cardinality to the cache";
3623  }
3624 }
3625 
3627  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
3629  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
3630  VLOG(1) << "Reuse cached cardinality";
3631  return {true, cardinality_cache_[cache_key]};
3632  }
3633  return {false, -1};
3634 }
3635 
3636 std::optional<QuerySessionStatus> Executor::getQuerySessionInfo(
3637  const std::string& query_session,
3638  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3639  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
3640  auto& query_info = queries_session_map_.at(query_session);
3641  const std::string query_status =
3642  getCurrentQuerySession(read_lock) == query_session ? "Running" : "Pending";
3643  return QuerySessionStatus(query_session,
3644  query_info.getQueryStr(),
3645  query_info.getQuerySubmittedTime(),
3646  query_status);
3647  }
3648  return std::nullopt;
3649 }
3650 
3651 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
3652 
3653 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
3654 std::string Executor::current_query_session_{""};
3658 
3661 
3665 std::atomic<bool> Executor::interrupted_{false};
3666 
3667 std::mutex Executor::compilation_mutex_;
3668 std::mutex Executor::kernel_mutex_;
3669 
3671 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr *> &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1713
int32_t getIdOfString(const std::string &str) const
const std::string debug_dir_
Definition: Execute.h:940
bool is_agg(const Analyzer::Expr *expr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr *> &target_exprs) const
Definition: Execute.cpp:306
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:2208
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:960
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3469
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1939
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:88
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1913
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2199
static QuerySessionMap queries_session_map_
Definition: Execute.h:965
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:972
bool is_time() const
Definition: sqltypes.h:423
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
int64_t kernel_queue_time_ms_
Definition: Execute.h:947
JoinType
Definition: sqldefs.h:107
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:911
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:96
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2848
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:3143
int64_t compilation_queue_time_ms_
Definition: Execute.h:948
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:237
double g_bump_allocator_step_reduction
Definition: Execute.cpp:105
bool is_string() const
Definition: sqltypes.h:417
bool is_boolean() const
Definition: sqltypes.h:424
int get_column_id() const
Definition: Analyzer.h:195
const ChunkMetadataMap & getChunkMetadataMap() const
const int8_t const int64_t * num_rows
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1191
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:989
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
bool addToQuerySessionList(const std::string &query_session, const std::string &query_str, SESSION_MAP_LOCK &write_lock)
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Definition: Execute.h:925
void setEntryCount(const size_t val)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1116
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:95
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_integer() const
Definition: sqltypes.h:419
bool g_enable_direct_columnarization
Definition: Execute.cpp:106
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3243
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1050
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1598
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:209
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:996
static const int max_gpu_count
Definition: Execute.h:917
GpuSharedMemoryContext gpu_smem_context
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
void invalidateRunningQuerySession(SESSION_MAP_LOCK &write_lock)
const std::optional< bool > union_all
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:187
int64_t float_to_double_bin(int32_t val, bool nullable=false)
static const size_t code_cache_size
Definition: Execute.h:935
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:78
std::string & getCurrentQuerySession(SESSION_MAP_LOCK &read_lock)
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:968
bool g_enable_columnar_output
Definition: Execute.cpp:91
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:869
InputSourceType getSourceType() const
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:973
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2856
int getTableId() const
unsigned gridSize() const
Definition: Execute.cpp:3108
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:377
Definition: sqldefs.h:36
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr *> &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2648
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1163
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3195
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3516
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:241
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:248
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
OutVecOwner(const std::vector< int64_t *> &out_vec)
Definition: Execute.cpp:2636
#define CHECK_GE(x, y)
Definition: Logger.h:210
int64_t const int32_t sz
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:633
Definition: sqldefs.h:49
Definition: sqldefs.h:30
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:90
static const int32_t ERR_GEOS
Definition: Execute.h:995
std::ostream & operator<<(std::ostream &os, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1242
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:150
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:957
void setQuerySessionAsInterrupted(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:922
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:899
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3131
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
bool g_enable_experimental_string_functions
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1020
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:133
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:81
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:921
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:256
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:472
static void invalidateCaches()
int64_t getAggInitValForIndex(const size_t index) const
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2595
bool g_is_test_env
Definition: Execute.cpp:122
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3164
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
int getDeviceCount() const
Definition: CudaMgr.h:86
static std::mutex kernel_mutex_
Definition: Execute.h:998
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
bool is_decimal() const
Definition: sqltypes.h:420
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1844
std::vector< FragmentsPerTable > FragmentsList
virtual ReductionCode codegen() const
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:369
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3537
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:92
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:172
int8_t get_min_byte_width()
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:276
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:83
const Executor * getExecutor() const
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3135
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const size_t high_scan_limit
Definition: Execute.h:408
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:119
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:829
void run(Executor *executor, SharedKernelContext &shared_context)
CodeCache gpu_code_cache_
Definition: Execute.h:931
Definition: sqldefs.h:73
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
std::unique_ptr< QueryMemoryInitializer > query_buffers_
const InputDescriptor & getScanDesc() const
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3014
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3048
bool g_null_div_by_zero
Definition: Execute.cpp:80
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:993
const ExecutorId executor_id_
Definition: Execute.h:943
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:939
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1750
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:981
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
void enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const
Definition: Execute.cpp:3608
static SysCatalog & instance()
Definition: SysCatalog.h:286
const bool allow_multifrag
std::string cat(Ts &&... args)
bool g_from_table_reordering
Definition: Execute.cpp:82
const bool just_validate
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:129
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:260
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:93
std::map< std::string, bool > InterruptFlagMap
Definition: Execute.h:77
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2400
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1008
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2301
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
std::mutex str_dict_mutex_
Definition: Execute.h:926
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:944
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:400
#define CHECK_NE(x, y)
Definition: Logger.h:206
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:985
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:264
const JoinQualsPerNestingLevel join_quals
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
int getNestLevel() const
std::shared_timed_mutex mapd_shared_mutex
const std::string debug_file_
Definition: Execute.h:941
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:616
std::map< const std::string, QuerySessionStatus > QuerySessionMap
Definition: Execute.h:112
SortAlgorithm
const double gpu_input_mem_limit_percent
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:3626
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:915
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:252
friend class QueryMemoryDescriptor
Definition: Execute.h:1007
ExecutorDeviceType getDeviceType() const
ExecutorExplainType explain_type
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1644
size_t g_big_group_threshold
Definition: Execute.cpp:97
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:987
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:978
float g_filter_push_down_high_frac
Definition: Execute.cpp:89
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:963
std::optional< QuerySessionStatus > getQuerySessionInfo(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3636
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:969
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:914
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:988
std::string bool_to_string(const bool val)
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:879
size_t g_max_memory_allocation_size
Definition: Execute.cpp:100
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:129
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1070
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const unsigned block_size_x_
Definition: Execute.h:937
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const unsigned grid_size_x_
Definition: Execute.h:938
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
specifies the content in-memory of a row in the column metadata table
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:94
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:994
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:967
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:298
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:982
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1292
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:272
bool isCPUOnly() const
Definition: Execute.cpp:226
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
ExecutorDeviceType device_type
int64_t getGeneration(const uint32_t id) const
bool g_enable_window_functions
Definition: Execute.cpp:98
int8_t groupColWidth(const size_t key_idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1176
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments *> &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition)
Definition: Execute.cpp:2095
const std::vector< size_t > outer_fragment_indices
size_t g_min_memory_allocation_size
Definition: Execute.cpp:101
static std::shared_ptr< JoinHashTableInterface > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:626
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:54
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:843
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:200
static std::string current_query_session_
Definition: Execute.h:961
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:85
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2550
size_t ExecutorId
Definition: Execute.h:336
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:354
StringDictionaryGenerations string_dictionary_generations_
Definition: Execute.h:958
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:956
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:116
CodeCache cpu_code_cache_
Definition: Execute.h:930
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:261
int get_table_id() const
Definition: Analyzer.h:194
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3278
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Definition: sqldefs.h:76
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2276
int8_t warpSize() const
Definition: Execute.cpp:3099
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
std::vector< std::string > expr_container_to_string(const std::list< Analyzer::OrderEntry > &expr_container)
Definition: Execute.cpp:1154
const int64_t const uint32_t const uint32_t const uint32_t const bool const bool const int32_t frag_idx
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1794
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:208
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3495
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:77
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:509
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1865
bool removeFromQuerySessionList(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3029
bool checkIsQuerySessionInterrupted(const std::string &query_session, SESSION_MAP_LOCK &read_lock)
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2841
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3115
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3436
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2564
TableGenerations table_generations_
Definition: Execute.h:959
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:487
ThreadId thread_id()
Definition: Logger.cpp:731
bool g_cache_string_hash
void resetInterrupt()
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1062
bool g_enable_filter_push_down
Definition: Execute.cpp:87
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:109
bool g_enable_bump_allocator
Definition: Execute.cpp:104
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:3617
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
static std::atomic< bool > interrupted_
Definition: Execute.h:923
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
const int8_t * literals
int32_t getOrAddTransient(const std::string &str)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:983
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:997
bool g_allow_cpu_retry
Definition: Execute.cpp:79
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
bool g_enable_watchdog
Definition: Execute.cpp:74
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1372
static bool typeSupportsRange(const SQLTypeInfo &ti)
size_t getColOffInBytes(const size_t col_idx) const
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1623
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:2227
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:896
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:131
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void > > checked_int64_t
bool checkCurrentQuerySession(const std::string &candidate_query_session, SESSION_MAP_LOCK &read_lock)
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:90
mapd_unique_lock< mapd_shared_mutex > write_lock
SQLTypeInfo columnType
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:201
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1527
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:268
const bool allow_runtime_query_interrupt
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
Definition: Execute.h:891
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:231
const size_t offset
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2137
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:78
QueryDescriptionType getQueryDescriptionType() const
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
bool g_use_tbb_pool
Definition: Execute.cpp:76
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1659
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:197
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1311
ExpressionRange getColRange(const PhysicalInput &) const
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1928
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:3069
#define IS_GEO(T)
Definition: sqltypes.h:174
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:977
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
Functions to support array operations used by the executor.
bool is_number() const
Definition: sqltypes.h:422
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const bool with_watchdog
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:920
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3528
void clearMetaInfoCache()
Definition: Execute.cpp:347
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:154
bool g_enable_table_functions
Definition: Execute.cpp:99
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
bool is_fp() const
Definition: sqltypes.h:421
const TableGeneration & getGeneration(const uint32_t id) const
const TemporaryTables * temporary_tables_
Definition: Execute.h:945
unsigned blockSize() const
Definition: Execute.cpp:3123
void setCurrentQuerySession(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
size_t g_gpu_smem_threshold
Definition: Execute.cpp:111
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1619
const Expr * get_left_operand() const
Definition: Analyzer.h:442
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)