OmniSciDB  17c254d2f8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "BaselineJoinHashTable.h"
21 #include "CodeGenerator.h"
22 #include "ColumnFetcher.h"
25 #include "DynamicWatchdog.h"
26 #include "EquiJoinCondition.h"
27 #include "ErrorHandling.h"
28 #include "ExpressionRewrite.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
32 #include "JsonAccessors.h"
34 #include "OverlapsJoinHashTable.h"
35 #include "QueryRewrite.h"
36 #include "QueryTemplateGenerator.h"
37 #include "ResultSetReductionJIT.h"
38 #include "RuntimeFunctions.h"
39 #include "SpeculativeTopN.h"
40 
43 
44 #include "CudaMgr/CudaMgr.h"
46 #include "Parser/ParserNode.h"
49 #include "Shared/checked_alloc.h"
50 #include "Shared/measure.h"
51 #include "Shared/misc.h"
52 #include "Shared/scope.h"
53 #include "Shared/shard_key.h"
55 #include "Shared/threadpool.h"
56 
57 #include "AggregatedColRange.h"
59 
60 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
61 #include <boost/filesystem/operations.hpp>
62 #include <boost/filesystem/path.hpp>
63 
64 #ifdef HAVE_CUDA
65 #include <cuda.h>
66 #endif // HAVE_CUDA
67 #include <future>
68 #include <memory>
69 #include <numeric>
70 #include <set>
71 #include <thread>
72 
73 bool g_enable_watchdog{false};
75 bool g_use_tbb_pool{false};
77 bool g_allow_cpu_retry{true};
78 bool g_null_div_by_zero{false};
82 extern bool g_enable_smem_group_by;
83 extern std::unique_ptr<llvm::Module> udf_gpu_module;
84 extern std::unique_ptr<llvm::Module> udf_cpu_module;
92 bool g_cache_string_hash{false};
93 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
96 size_t g_big_group_threshold{20000};
99 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
101  256}; // minimum memory allocation required for projection query output buffer
102  // without pre-flight count
110  4096}; // GPU shared memory threshold (in bytes), if larger buffer sizes are required
111  // we do not use GPU shared memory optimizations
113  true}; // enable optimizations for using GPU shared memory in implementation of
114  // non-grouped aggregates
115 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
116  // limits the allocation for the output buffer arena
117 
118 int const Executor::max_gpu_count;
119 
121 
122 Executor::Executor(const int db_id,
123  const size_t block_size_x,
124  const size_t grid_size_x,
125  const std::string& debug_dir,
126  const std::string& debug_file)
127  : cgen_state_(new CgenState({}, false))
129  , interrupted_(false)
130  , cpu_code_cache_(code_cache_size)
131  , gpu_code_cache_(code_cache_size)
132  , block_size_x_(block_size_x)
133  , grid_size_x_(grid_size_x)
134  , debug_dir_(debug_dir)
135  , debug_file_(debug_file)
136  , db_id_(db_id)
137  , catalog_(nullptr)
138  , temporary_tables_(nullptr)
140 
141 std::shared_ptr<Executor> Executor::getExecutor(
142  const int db_id,
143  const std::string& debug_dir,
144  const std::string& debug_file,
145  const SystemParameters system_parameters) {
146  INJECT_TIMER(getExecutor);
147  const auto executor_key = db_id;
148  {
149  mapd_shared_lock<mapd_shared_mutex> read_lock(executors_cache_mutex_);
150  auto it = executors_.find(executor_key);
151  if (it != executors_.end()) {
152  return it->second;
153  }
154  }
155  {
156  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
157  auto it = executors_.find(executor_key);
158  if (it != executors_.end()) {
159  return it->second;
160  }
161  auto executor = std::make_shared<Executor>(db_id,
162  system_parameters.cuda_block_size,
163  system_parameters.cuda_grid_size,
164  debug_dir,
165  debug_file);
166  auto it_ok = executors_.insert(std::make_pair(executor_key, executor));
167  CHECK(it_ok.second);
168  return executor;
169  }
170 }
171 
173  switch (memory_level) {
176  std::lock_guard<std::mutex> flush_lock(
177  execute_mutex_); // Don't flush memory while queries are running
178 
180  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
181  // The hash table cache uses CPU memory not managed by the buffer manager. In the
182  // future, we should manage these allocations with the buffer manager directly.
183  // For now, assume the user wants to purge the hash table cache when they clear
184  // CPU memory (currently used in ExecuteTest to lower memory pressure)
186  }
187  break;
188  }
189  default: {
190  throw std::runtime_error(
191  "Clearing memory levels other than the CPU level or GPU level is not "
192  "supported.");
193  }
194  }
195 }
196 
198  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
199 }
200 
202  const int dict_id_in,
203  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
204  const bool with_generation) const {
205  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
206  CHECK(catalog_);
207  const auto dd = catalog_->getMetadataForDict(dict_id);
208  std::lock_guard<std::mutex> lock(str_dict_mutex_);
209  if (dd) {
210  CHECK(dd->stringDict);
211  CHECK_LE(dd->dictNBits, 32);
212  CHECK(row_set_mem_owner);
213  const auto generation = with_generation
214  ? string_dictionary_generations_.getGeneration(dict_id)
215  : ssize_t(-1);
216  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
217  }
218  CHECK_EQ(0, dict_id);
219  if (!lit_str_dict_proxy_) {
220  std::shared_ptr<StringDictionary> tsd =
221  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
222  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
223  }
224  return lit_str_dict_proxy_.get();
225 }
226 
227 bool Executor::isCPUOnly() const {
228  CHECK(catalog_);
229  return !catalog_->getDataMgr().getCudaMgr();
230 }
231 
233  const Analyzer::ColumnVar* col_var) const {
235  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
236 }
237 
239  const Analyzer::ColumnVar* col_var,
240  int n) const {
241  const auto cd = getColumnDescriptor(col_var);
242  if (!cd || n > cd->columnType.get_physical_cols()) {
243  return nullptr;
244  }
246  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
247 }
248 
250  return catalog_;
251 }
252 
254  catalog_ = catalog;
255 }
256 
257 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
258  return row_set_mem_owner_;
259 }
260 
262  return temporary_tables_;
263 }
264 
266  return input_table_info_cache_.getTableInfo(table_id);
267 }
268 
269 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
270  return table_generations_.getGeneration(table_id);
271 }
272 
274  return agg_col_range_cache_.getColRange(phys_input);
275 }
276 
278  size_t num_bytes = 0;
279  if (!plan_state_) {
280  return 0;
281  }
282  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
283  if (fetched_col_pair.first < 0) {
284  num_bytes += 8;
285  } else {
286  const auto cd =
287  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
288  const auto& ti = cd->columnType;
289  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
290  ? 4
291  : ti.get_size();
292  if (sz < 0) {
293  // for varlen types, only account for the pointer/size for each row, for now
294  num_bytes += 16;
295  } else {
296  num_bytes += sz;
297  }
298  }
299  }
300  return num_bytes;
301 }
302 
303 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
304  const std::vector<Analyzer::Expr*>& target_exprs) const {
305  CHECK(plan_state_);
306  CHECK(catalog_);
307  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
308  for (const auto target_expr : target_exprs) {
309  if (!plan_state_->isLazyFetchColumn(target_expr)) {
310  col_lazy_fetch_info.emplace_back(
311  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
312  } else {
313  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
314  CHECK(col_var);
315  auto col_id = col_var->get_column_id();
316  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
317  auto cd = (col_var->get_table_id() > 0)
318  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
319  : nullptr;
320  if (cd && IS_GEO(cd->columnType.get_type())) {
321  // Geo coords cols will be processed in sequence. So we only need to track the
322  // first coords col in lazy fetch info.
323  {
324  auto cd0 =
325  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
326  auto col0_ti = cd0->columnType;
327  CHECK(!cd0->isVirtualCol);
328  auto col0_var = makeExpr<Analyzer::ColumnVar>(
329  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
330  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
331  col_lazy_fetch_info.emplace_back(
332  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
333  }
334  } else {
335  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
336  const auto& col_ti = col_var->get_type_info();
337  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
338  }
339  }
340  }
341  return col_lazy_fetch_info;
342 }
343 
345  input_table_info_cache_.clear();
346  agg_col_range_cache_.clear();
347  string_dictionary_generations_.clear();
348  table_generations_.clear();
349 }
350 
351 std::vector<int8_t> Executor::serializeLiterals(
352  const std::unordered_map<int, CgenState::LiteralValues>& literals,
353  const int device_id) {
354  if (literals.empty()) {
355  return {};
356  }
357  const auto dev_literals_it = literals.find(device_id);
358  CHECK(dev_literals_it != literals.end());
359  const auto& dev_literals = dev_literals_it->second;
360  size_t lit_buf_size{0};
361  std::vector<std::string> real_strings;
362  std::vector<std::vector<double>> double_array_literals;
363  std::vector<std::vector<int8_t>> align64_int8_array_literals;
364  std::vector<std::vector<int32_t>> int32_array_literals;
365  std::vector<std::vector<int8_t>> align32_int8_array_literals;
366  std::vector<std::vector<int8_t>> int8_array_literals;
367  for (const auto& lit : dev_literals) {
368  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
369  if (lit.which() == 7) {
370  const auto p = boost::get<std::string>(&lit);
371  CHECK(p);
372  real_strings.push_back(*p);
373  } else if (lit.which() == 8) {
374  const auto p = boost::get<std::vector<double>>(&lit);
375  CHECK(p);
376  double_array_literals.push_back(*p);
377  } else if (lit.which() == 9) {
378  const auto p = boost::get<std::vector<int32_t>>(&lit);
379  CHECK(p);
380  int32_array_literals.push_back(*p);
381  } else if (lit.which() == 10) {
382  const auto p = boost::get<std::vector<int8_t>>(&lit);
383  CHECK(p);
384  int8_array_literals.push_back(*p);
385  } else if (lit.which() == 11) {
386  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
387  CHECK(p);
388  if (p->second == 64) {
389  align64_int8_array_literals.push_back(p->first);
390  } else if (p->second == 32) {
391  align32_int8_array_literals.push_back(p->first);
392  } else {
393  CHECK(false);
394  }
395  }
396  }
397  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
398  throw TooManyLiterals();
399  }
400  int16_t crt_real_str_off = lit_buf_size;
401  for (const auto& real_str : real_strings) {
402  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
403  lit_buf_size += real_str.size();
404  }
405  if (double_array_literals.size() > 0) {
406  lit_buf_size = align(lit_buf_size, sizeof(double));
407  }
408  int16_t crt_double_arr_lit_off = lit_buf_size;
409  for (const auto& double_array_literal : double_array_literals) {
410  CHECK_LE(double_array_literal.size(),
411  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
412  lit_buf_size += double_array_literal.size() * sizeof(double);
413  }
414  if (align64_int8_array_literals.size() > 0) {
415  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
416  }
417  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
418  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
419  CHECK_LE(align64_int8_array_literals.size(),
420  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
421  lit_buf_size += align64_int8_array_literal.size();
422  }
423  if (int32_array_literals.size() > 0) {
424  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
425  }
426  int16_t crt_int32_arr_lit_off = lit_buf_size;
427  for (const auto& int32_array_literal : int32_array_literals) {
428  CHECK_LE(int32_array_literal.size(),
429  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
430  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
431  }
432  if (align32_int8_array_literals.size() > 0) {
433  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
434  }
435  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
436  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
437  CHECK_LE(align32_int8_array_literals.size(),
438  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
439  lit_buf_size += align32_int8_array_literal.size();
440  }
441  int16_t crt_int8_arr_lit_off = lit_buf_size;
442  for (const auto& int8_array_literal : int8_array_literals) {
443  CHECK_LE(int8_array_literal.size(),
444  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
445  lit_buf_size += int8_array_literal.size();
446  }
447  unsigned crt_real_str_idx = 0;
448  unsigned crt_double_arr_lit_idx = 0;
449  unsigned crt_align64_int8_arr_lit_idx = 0;
450  unsigned crt_int32_arr_lit_idx = 0;
451  unsigned crt_align32_int8_arr_lit_idx = 0;
452  unsigned crt_int8_arr_lit_idx = 0;
453  std::vector<int8_t> serialized(lit_buf_size);
454  size_t off{0};
455  for (const auto& lit : dev_literals) {
456  const auto lit_bytes = CgenState::literalBytes(lit);
457  off = CgenState::addAligned(off, lit_bytes);
458  switch (lit.which()) {
459  case 0: {
460  const auto p = boost::get<int8_t>(&lit);
461  CHECK(p);
462  serialized[off - lit_bytes] = *p;
463  break;
464  }
465  case 1: {
466  const auto p = boost::get<int16_t>(&lit);
467  CHECK(p);
468  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
469  break;
470  }
471  case 2: {
472  const auto p = boost::get<int32_t>(&lit);
473  CHECK(p);
474  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
475  break;
476  }
477  case 3: {
478  const auto p = boost::get<int64_t>(&lit);
479  CHECK(p);
480  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
481  break;
482  }
483  case 4: {
484  const auto p = boost::get<float>(&lit);
485  CHECK(p);
486  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
487  break;
488  }
489  case 5: {
490  const auto p = boost::get<double>(&lit);
491  CHECK(p);
492  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
493  break;
494  }
495  case 6: {
496  const auto p = boost::get<std::pair<std::string, int>>(&lit);
497  CHECK(p);
498  const auto str_id =
500  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
501  ->getOrAddTransient(p->first)
502  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
503  ->getIdOfString(p->first);
504  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
505  break;
506  }
507  case 7: {
508  const auto p = boost::get<std::string>(&lit);
509  CHECK(p);
510  int32_t off_and_len = crt_real_str_off << 16;
511  const auto& crt_real_str = real_strings[crt_real_str_idx];
512  off_and_len |= static_cast<int16_t>(crt_real_str.size());
513  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
514  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
515  ++crt_real_str_idx;
516  crt_real_str_off += crt_real_str.size();
517  break;
518  }
519  case 8: {
520  const auto p = boost::get<std::vector<double>>(&lit);
521  CHECK(p);
522  int32_t off_and_len = crt_double_arr_lit_off << 16;
523  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
524  int32_t len = crt_double_arr_lit.size();
525  CHECK_EQ((len >> 16), 0);
526  off_and_len |= static_cast<int16_t>(len);
527  int32_t double_array_bytesize = len * sizeof(double);
528  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
529  memcpy(&serialized[crt_double_arr_lit_off],
530  crt_double_arr_lit.data(),
531  double_array_bytesize);
532  ++crt_double_arr_lit_idx;
533  crt_double_arr_lit_off += double_array_bytesize;
534  break;
535  }
536  case 9: {
537  const auto p = boost::get<std::vector<int32_t>>(&lit);
538  CHECK(p);
539  int32_t off_and_len = crt_int32_arr_lit_off << 16;
540  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
541  int32_t len = crt_int32_arr_lit.size();
542  CHECK_EQ((len >> 16), 0);
543  off_and_len |= static_cast<int16_t>(len);
544  int32_t int32_array_bytesize = len * sizeof(int32_t);
545  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
546  memcpy(&serialized[crt_int32_arr_lit_off],
547  crt_int32_arr_lit.data(),
548  int32_array_bytesize);
549  ++crt_int32_arr_lit_idx;
550  crt_int32_arr_lit_off += int32_array_bytesize;
551  break;
552  }
553  case 10: {
554  const auto p = boost::get<std::vector<int8_t>>(&lit);
555  CHECK(p);
556  int32_t off_and_len = crt_int8_arr_lit_off << 16;
557  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
558  int32_t len = crt_int8_arr_lit.size();
559  CHECK_EQ((len >> 16), 0);
560  off_and_len |= static_cast<int16_t>(len);
561  int32_t int8_array_bytesize = len;
562  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
563  memcpy(&serialized[crt_int8_arr_lit_off],
564  crt_int8_arr_lit.data(),
565  int8_array_bytesize);
566  ++crt_int8_arr_lit_idx;
567  crt_int8_arr_lit_off += int8_array_bytesize;
568  break;
569  }
570  case 11: {
571  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
572  CHECK(p);
573  if (p->second == 64) {
574  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
575  const auto& crt_align64_int8_arr_lit =
576  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
577  int32_t len = crt_align64_int8_arr_lit.size();
578  CHECK_EQ((len >> 16), 0);
579  off_and_len |= static_cast<int16_t>(len);
580  int32_t align64_int8_array_bytesize = len;
581  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
582  memcpy(&serialized[crt_align64_int8_arr_lit_off],
583  crt_align64_int8_arr_lit.data(),
584  align64_int8_array_bytesize);
585  ++crt_align64_int8_arr_lit_idx;
586  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
587  } else if (p->second == 32) {
588  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
589  const auto& crt_align32_int8_arr_lit =
590  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
591  int32_t len = crt_align32_int8_arr_lit.size();
592  CHECK_EQ((len >> 16), 0);
593  off_and_len |= static_cast<int16_t>(len);
594  int32_t align32_int8_array_bytesize = len;
595  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
596  memcpy(&serialized[crt_align32_int8_arr_lit_off],
597  crt_align32_int8_arr_lit.data(),
598  align32_int8_array_bytesize);
599  ++crt_align32_int8_arr_lit_idx;
600  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
601  } else {
602  CHECK(false);
603  }
604  break;
605  }
606  default:
607  CHECK(false);
608  }
609  }
610  return serialized;
611 }
612 
613 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
614  if (device_type == ExecutorDeviceType::GPU) {
615  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
616  CHECK(cuda_mgr);
617  return cuda_mgr->getDeviceCount();
618  } else {
619  return 1;
620  }
621 }
622 
624  const Data_Namespace::MemoryLevel memory_level) const {
625  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
626  : deviceCount(ExecutorDeviceType::CPU);
627 }
628 
629 // TODO(alex): remove or split
630 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
631  const SQLTypeInfo& ti,
632  const int64_t agg_init_val,
633  const int8_t out_byte_width,
634  const int64_t* out_vec,
635  const size_t out_vec_sz,
636  const bool is_group_by,
637  const bool float_argument_input) {
638  switch (agg) {
639  case kAVG:
640  case kSUM:
641  if (0 != agg_init_val) {
642  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
643  int64_t agg_result = agg_init_val;
644  for (size_t i = 0; i < out_vec_sz; ++i) {
645  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
646  }
647  return {agg_result, 0};
648  } else {
649  CHECK(ti.is_fp());
650  switch (out_byte_width) {
651  case 4: {
652  int agg_result = static_cast<int32_t>(agg_init_val);
653  for (size_t i = 0; i < out_vec_sz; ++i) {
655  &agg_result,
656  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
657  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
658  }
659  const int64_t converted_bin =
660  float_argument_input
661  ? static_cast<int64_t>(agg_result)
662  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
663  return {converted_bin, 0};
664  break;
665  }
666  case 8: {
667  int64_t agg_result = agg_init_val;
668  for (size_t i = 0; i < out_vec_sz; ++i) {
670  &agg_result,
671  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
672  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
673  }
674  return {agg_result, 0};
675  break;
676  }
677  default:
678  CHECK(false);
679  }
680  }
681  }
682  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
683  int64_t agg_result = 0;
684  for (size_t i = 0; i < out_vec_sz; ++i) {
685  agg_result += out_vec[i];
686  }
687  return {agg_result, 0};
688  } else {
689  CHECK(ti.is_fp());
690  switch (out_byte_width) {
691  case 4: {
692  float r = 0.;
693  for (size_t i = 0; i < out_vec_sz; ++i) {
694  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
695  }
696  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
697  const int64_t converted_bin =
698  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
699  return {converted_bin, 0};
700  }
701  case 8: {
702  double r = 0.;
703  for (size_t i = 0; i < out_vec_sz; ++i) {
704  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
705  }
706  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
707  }
708  default:
709  CHECK(false);
710  }
711  }
712  break;
713  case kCOUNT: {
714  uint64_t agg_result = 0;
715  for (size_t i = 0; i < out_vec_sz; ++i) {
716  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
717  agg_result += out;
718  }
719  return {static_cast<int64_t>(agg_result), 0};
720  }
721  case kMIN: {
722  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
723  int64_t agg_result = agg_init_val;
724  for (size_t i = 0; i < out_vec_sz; ++i) {
725  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
726  }
727  return {agg_result, 0};
728  } else {
729  switch (out_byte_width) {
730  case 4: {
731  int32_t agg_result = static_cast<int32_t>(agg_init_val);
732  for (size_t i = 0; i < out_vec_sz; ++i) {
734  &agg_result,
735  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
736  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
737  }
738  const int64_t converted_bin =
739  float_argument_input
740  ? static_cast<int64_t>(agg_result)
741  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
742  return {converted_bin, 0};
743  }
744  case 8: {
745  int64_t agg_result = agg_init_val;
746  for (size_t i = 0; i < out_vec_sz; ++i) {
748  &agg_result,
749  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
750  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
751  }
752  return {agg_result, 0};
753  }
754  default:
755  CHECK(false);
756  }
757  }
758  }
759  case kMAX:
760  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
761  int64_t agg_result = agg_init_val;
762  for (size_t i = 0; i < out_vec_sz; ++i) {
763  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
764  }
765  return {agg_result, 0};
766  } else {
767  switch (out_byte_width) {
768  case 4: {
769  int32_t agg_result = static_cast<int32_t>(agg_init_val);
770  for (size_t i = 0; i < out_vec_sz; ++i) {
772  &agg_result,
773  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
774  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
775  }
776  const int64_t converted_bin =
777  float_argument_input ? static_cast<int64_t>(agg_result)
778  : float_to_double_bin(agg_result, !ti.get_notnull());
779  return {converted_bin, 0};
780  }
781  case 8: {
782  int64_t agg_result = agg_init_val;
783  for (size_t i = 0; i < out_vec_sz; ++i) {
785  &agg_result,
786  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
787  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
788  }
789  return {agg_result, 0};
790  }
791  default:
792  CHECK(false);
793  }
794  }
795  case kSINGLE_VALUE: {
796  int64_t agg_result = agg_init_val;
797  for (size_t i = 0; i < out_vec_sz; ++i) {
798  if (out_vec[i] != agg_init_val) {
799  if (agg_result == agg_init_val) {
800  agg_result = out_vec[i];
801  } else if (out_vec[i] != agg_result) {
803  }
804  }
805  }
806  return {agg_result, 0};
807  }
808  case kSAMPLE: {
809  int64_t agg_result = agg_init_val;
810  for (size_t i = 0; i < out_vec_sz; ++i) {
811  if (out_vec[i] != agg_init_val) {
812  agg_result = out_vec[i];
813  break;
814  }
815  }
816  return {agg_result, 0};
817  }
818  default:
819  CHECK(false);
820  }
821  abort();
822 }
823 
824 namespace {
825 
827  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
828  auto& first = results_per_device.front().first;
829  CHECK(first);
830  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
831  const auto& next = results_per_device[dev_idx].first;
832  CHECK(next);
833  first->append(*next);
834  }
835  return std::move(first);
836 }
837 
838 } // namespace
839 
841  auto& results_per_device = execution_dispatch.getFragmentResults();
842  if (results_per_device.empty()) {
843  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
844  std::vector<TargetInfo> targets;
845  for (const auto target_expr : ra_exe_unit.target_exprs) {
846  targets.push_back(get_target_info(target_expr, g_bigint_count));
847  }
848  return std::make_shared<ResultSet>(targets,
851  row_set_mem_owner_,
852  this);
853  }
854  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
855  std::sort(results_per_device.begin(),
856  results_per_device.end(),
857  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
858  CHECK_GE(lhs.second.size(), size_t(1));
859  CHECK_GE(rhs.second.size(), size_t(1));
860  return lhs.second.front() < rhs.second.front();
861  });
862 
863  return get_merged_result(results_per_device);
864 }
865 
867  const RelAlgExecutionUnit& ra_exe_unit,
868  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
869  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
870  const QueryMemoryDescriptor& query_mem_desc) const {
871  auto timer = DEBUG_TIMER(__func__);
872  if (ra_exe_unit.estimator) {
873  return reduce_estimator_results(ra_exe_unit, results_per_device);
874  }
875 
876  if (results_per_device.empty()) {
877  std::vector<TargetInfo> targets;
878  for (const auto target_expr : ra_exe_unit.target_exprs) {
879  targets.push_back(get_target_info(target_expr, g_bigint_count));
880  }
881  return std::make_shared<ResultSet>(
882  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
883  }
884 
885  return reduceMultiDeviceResultSets(
886  results_per_device,
887  row_set_mem_owner,
888  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
889 }
890 
892  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
893  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
894  const QueryMemoryDescriptor& query_mem_desc) const {
895  auto timer = DEBUG_TIMER(__func__);
896  std::shared_ptr<ResultSet> reduced_results;
897 
898  const auto& first = results_per_device.front().first;
899 
900  if (query_mem_desc.getQueryDescriptionType() ==
902  results_per_device.size() > 1) {
903  const auto total_entry_count = std::accumulate(
904  results_per_device.begin(),
905  results_per_device.end(),
906  size_t(0),
907  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
908  const auto& r = rs.first;
909  return init + r->getQueryMemDesc().getEntryCount();
910  });
911  CHECK(total_entry_count);
912  auto query_mem_desc = first->getQueryMemDesc();
913  query_mem_desc.setEntryCount(total_entry_count);
914  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
917  row_set_mem_owner,
918  this);
919  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
920  reduced_results->initializeStorage();
921  switch (query_mem_desc.getEffectiveKeyWidth()) {
922  case 4:
923  first->getStorage()->moveEntriesToBuffer<int32_t>(
924  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
925  break;
926  case 8:
927  first->getStorage()->moveEntriesToBuffer<int64_t>(
928  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
929  break;
930  default:
931  CHECK(false);
932  }
933  } else {
934  reduced_results = first;
935  }
936 
937  const auto& this_result_set = results_per_device[0].first;
938  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
939  this_result_set->getTargetInfos(),
940  this_result_set->getTargetInitVals());
941  const auto reduction_code = reduction_jit.codegen();
942  for (size_t i = 1; i < results_per_device.size(); ++i) {
943  reduced_results->getStorage()->reduce(
944  *(results_per_device[i].first->getStorage()), {}, reduction_code);
945  }
946 
947  return reduced_results;
948 }
949 
951  const RelAlgExecutionUnit& ra_exe_unit,
952  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
953  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
954  const QueryMemoryDescriptor& query_mem_desc) const {
955  if (results_per_device.size() == 1) {
956  return std::move(results_per_device.front().first);
957  }
958  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
960  for (const auto& result : results_per_device) {
961  auto rows = result.first;
962  CHECK(rows);
963  if (!rows) {
964  continue;
965  }
966  SpeculativeTopNMap that(
967  *rows,
968  ra_exe_unit.target_exprs,
969  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
970  m.reduce(that);
971  }
972  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
973  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
974  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
975 }
976 
977 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
978  std::unordered_set<int> available_gpus;
979  if (cat.getDataMgr().gpusPresent()) {
980  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
981  CHECK_GT(gpu_count, 0);
982  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
983  available_gpus.insert(gpu_id);
984  }
985  }
986  return available_gpus;
987 }
988 
989 size_t get_context_count(const ExecutorDeviceType device_type,
990  const size_t cpu_count,
991  const size_t gpu_count) {
992  return device_type == ExecutorDeviceType::GPU ? gpu_count
993  : static_cast<size_t>(cpu_count);
994 }
995 
996 namespace {
997 
998 // Compute a very conservative entry count for the output buffer entry count using no
999 // other information than the number of tuples in each table and multiplying them
1000 // together.
1001 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1003  // Check for overflows since we're multiplying potentially big table sizes.
1004  using checked_size_t = boost::multiprecision::number<
1005  boost::multiprecision::cpp_int_backend<64,
1006  64,
1007  boost::multiprecision::unsigned_magnitude,
1008  boost::multiprecision::checked,
1009  void>>;
1010  checked_size_t max_groups_buffer_entry_guess = 1;
1011  for (const auto& query_info : query_infos) {
1012  CHECK(!query_info.info.fragments.empty());
1013  auto it = std::max_element(query_info.info.fragments.begin(),
1014  query_info.info.fragments.end(),
1015  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1016  return f1.getNumTuples() < f2.getNumTuples();
1017  });
1018  max_groups_buffer_entry_guess *= it->getNumTuples();
1019  }
1020  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1021  // baseline group layout with that many entries anyway.
1022  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1023  try {
1024  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1025  max_groups_buffer_entry_guess_cap);
1026  } catch (...) {
1027  return max_groups_buffer_entry_guess_cap;
1028  }
1029 }
1030 
1031 std::string get_table_name(const InputDescriptor& input_desc,
1033  const auto source_type = input_desc.getSourceType();
1034  if (source_type == InputSourceType::TABLE) {
1035  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1036  CHECK(td);
1037  return td->tableName;
1038  } else {
1039  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1040  }
1041 }
1042 
1043 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1044  const int device_count) {
1045  if (device_type == ExecutorDeviceType::GPU) {
1046  return device_count * Executor::high_scan_limit;
1047  }
1049 }
1050 
1052  const std::vector<InputTableInfo>& table_infos,
1054  const ExecutorDeviceType device_type,
1055  const int device_count) {
1056  for (const auto target_expr : ra_exe_unit.target_exprs) {
1057  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1058  return;
1059  }
1060  }
1061  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1062  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1063  // Allow a query with no scan limit to run on small tables
1064  return;
1065  }
1066  if (ra_exe_unit.use_bump_allocator) {
1067  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1068  // relative to the size of the input), so we bypass this check for now
1069  return;
1070  }
1071  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1072  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1073  (!ra_exe_unit.scan_limit ||
1074  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1075  std::vector<std::string> table_names;
1076  const auto& input_descs = ra_exe_unit.input_descs;
1077  for (const auto& input_desc : input_descs) {
1078  table_names.push_back(get_table_name(input_desc, cat));
1079  }
1080  if (!ra_exe_unit.scan_limit) {
1081  throw WatchdogException(
1082  "Projection query would require a scan without a limit on table(s): " +
1083  boost::algorithm::join(table_names, ", "));
1084  } else {
1085  throw WatchdogException(
1086  "Projection query output result set on table(s): " +
1087  boost::algorithm::join(table_names, ", ") + " would contain " +
1088  std::to_string(ra_exe_unit.scan_limit) +
1089  " rows, which is more than the current system limit of " +
1090  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1091  }
1092  }
1093 }
1094 
1095 } // namespace
1096 
1097 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1098  const RelAlgExecutionUnit& ra_exe_unit) {
1099  if (ra_exe_unit.input_descs.size() < 2) {
1100  return false;
1101  }
1102 
1103  // We only support loop join at the end of folded joins
1104  // where ra_exe_unit.input_descs.size() > 2 for now.
1105  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1106 
1107  ssize_t inner_table_idx = -1;
1108  for (size_t i = 0; i < query_infos.size(); ++i) {
1109  if (query_infos[i].table_id == inner_table_id) {
1110  inner_table_idx = i;
1111  break;
1112  }
1113  }
1114  CHECK_NE(ssize_t(-1), inner_table_idx);
1115  return query_infos[inner_table_idx].info.getNumTuples() <=
1117 }
1118 
1119 namespace {
1120 
1121 template <typename T>
1122 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1123  std::vector<std::string> expr_strs;
1124  for (const auto& expr : expr_container) {
1125  if (!expr) {
1126  expr_strs.emplace_back("NULL");
1127  } else {
1128  expr_strs.emplace_back(expr->toString());
1129  }
1130  }
1131  return expr_strs;
1132 }
1133 
1134 template <>
1135 std::vector<std::string> expr_container_to_string(
1136  const std::list<Analyzer::OrderEntry>& expr_container) {
1137  std::vector<std::string> expr_strs;
1138  for (const auto& expr : expr_container) {
1139  expr_strs.emplace_back(expr.toString());
1140  }
1141  return expr_strs;
1142 }
1143 
1144 std::string join_type_to_string(const JoinType type) {
1145  switch (type) {
1146  case JoinType::INNER:
1147  return "INNER";
1148  case JoinType::LEFT:
1149  return "LEFT";
1150  case JoinType::INVALID:
1151  return "INVALID";
1152  }
1153  UNREACHABLE();
1154  return "";
1155 }
1156 
1157 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1158  switch (algorithm) {
1160  return "ResultSet";
1162  return "Speculative Top N";
1164  return "Streaming Top N";
1165  }
1166  UNREACHABLE();
1167  return "";
1168 }
1169 
1170 } // namespace
1171 
1172 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1173  os << "\n\tTable/Col/Levels: ";
1174  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1175  const auto& scan_desc = input_col_desc->getScanDesc();
1176  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1177  << scan_desc.getNestLevel() << ") ";
1178  }
1179  if (!ra_exe_unit.simple_quals.empty()) {
1180  os << "\n\tSimple Quals: "
1182  ", ");
1183  }
1184  if (!ra_exe_unit.quals.empty()) {
1185  os << "\n\tQuals: "
1186  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1187  }
1188  if (!ra_exe_unit.join_quals.empty()) {
1189  os << "\n\tJoin Quals: ";
1190  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1191  const auto& join_condition = ra_exe_unit.join_quals[i];
1192  os << "\t\t" << std::to_string(i) << " "
1193  << join_type_to_string(join_condition.type);
1194  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1195  }
1196  }
1197  if (!ra_exe_unit.groupby_exprs.empty()) {
1198  os << "\n\tGroup By: "
1200  ", ");
1201  }
1202  os << "\n\tProjected targets: "
1204  os << "\n\tHas Estimator: " << bool_to_string(ra_exe_unit.estimator == nullptr);
1205  os << "\n\tSort Info: ";
1206  const auto& sort_info = ra_exe_unit.sort_info;
1207  os << "\n\t Order Entries: "
1208  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1209  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1210  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1211  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1212  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1213  os << "\n\tBump Allocator: " << bool_to_string(ra_exe_unit.use_bump_allocator);
1214  if (ra_exe_unit.union_all) {
1215  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1216  }
1217  return os;
1218 }
1219 
1220 namespace {
1221 
1223  const size_t new_scan_limit) {
1224  return {ra_exe_unit_in.input_descs,
1225  ra_exe_unit_in.input_col_descs,
1226  ra_exe_unit_in.simple_quals,
1227  ra_exe_unit_in.quals,
1228  ra_exe_unit_in.join_quals,
1229  ra_exe_unit_in.groupby_exprs,
1230  ra_exe_unit_in.target_exprs,
1231  ra_exe_unit_in.estimator,
1232  ra_exe_unit_in.sort_info,
1233  new_scan_limit,
1234  ra_exe_unit_in.use_bump_allocator,
1235  ra_exe_unit_in.union_all,
1236  ra_exe_unit_in.query_state};
1237 }
1238 
1239 } // namespace
1240 
1242  size_t& max_groups_buffer_entry_guess,
1243  const bool is_agg,
1244  const std::vector<InputTableInfo>& query_infos,
1245  const RelAlgExecutionUnit& ra_exe_unit_in,
1246  const CompilationOptions& co,
1247  const ExecutionOptions& eo,
1249  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1250  RenderInfo* render_info,
1251  const bool has_cardinality_estimation,
1252  ColumnCacheMap& column_cache) {
1253  VLOG(1) << "Executing work unit:" << ra_exe_unit_in;
1254 
1255  try {
1256  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1257  is_agg,
1258  true,
1259  query_infos,
1260  ra_exe_unit_in,
1261  co,
1262  eo,
1263  cat,
1264  row_set_mem_owner,
1265  render_info,
1266  has_cardinality_estimation,
1267  column_cache);
1268  } catch (const CompilationRetryNewScanLimit& e) {
1269  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1270  is_agg,
1271  false,
1272  query_infos,
1273  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1274  co,
1275  eo,
1276  cat,
1277  row_set_mem_owner,
1278  render_info,
1279  has_cardinality_estimation,
1280  column_cache);
1281  }
1282 }
1283 
1285  size_t& max_groups_buffer_entry_guess,
1286  const bool is_agg,
1287  const bool allow_single_frag_table_opt,
1288  const std::vector<InputTableInfo>& query_infos,
1289  const RelAlgExecutionUnit& ra_exe_unit_in,
1290  const CompilationOptions& co,
1291  const ExecutionOptions& eo,
1293  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1294  RenderInfo* render_info,
1295  const bool has_cardinality_estimation,
1296  ColumnCacheMap& column_cache) {
1297  INJECT_TIMER(Exec_executeWorkUnit);
1298  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1299  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1300  CHECK(!query_infos.empty());
1301  if (!max_groups_buffer_entry_guess) {
1302  // The query has failed the first execution attempt because of running out
1303  // of group by slots. Make the conservative choice: allocate fragment size
1304  // slots and run on the CPU.
1305  CHECK(device_type == ExecutorDeviceType::CPU);
1306  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1307  }
1308 
1309  int8_t crt_min_byte_width{get_min_byte_width()};
1310  do {
1311  ExecutionDispatch execution_dispatch(
1312  this, ra_exe_unit, query_infos, cat, row_set_mem_owner, render_info);
1313  ColumnFetcher column_fetcher(this, column_cache);
1314  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1315  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1316  if (eo.executor_type == ExecutorType::Native) {
1317  try {
1318  INJECT_TIMER(execution_dispatch_comp);
1319  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1320  execution_dispatch.compile(max_groups_buffer_entry_guess,
1321  crt_min_byte_width,
1322  {device_type,
1323  co.hoist_literals,
1324  co.opt_level,
1326  co.allow_lazy_fetch,
1327  co.add_delete_column,
1328  co.explain_type,
1330  eo,
1331  column_fetcher,
1332  has_cardinality_estimation);
1333  CHECK(query_comp_desc_owned);
1334  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1335  } catch (CompilationRetryNoCompaction&) {
1336  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1337  continue;
1338  }
1339  } else {
1340  plan_state_.reset(new PlanState(false, this));
1341  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1342  CHECK(!query_comp_desc_owned);
1343  query_comp_desc_owned.reset(new QueryCompilationDescriptor());
1344  CHECK(!query_mem_desc_owned);
1345  query_mem_desc_owned.reset(
1347  }
1348  if (eo.just_explain) {
1349  return executeExplain(*query_comp_desc_owned);
1350  }
1351 
1352  for (const auto target_expr : ra_exe_unit.target_exprs) {
1353  plan_state_->target_exprs_.push_back(target_expr);
1354  }
1355 
1356  auto dispatch = [&execution_dispatch,
1357  &column_fetcher,
1358  &eo,
1359  parent_thread_id = logger::thread_id()](
1360  const ExecutorDeviceType chosen_device_type,
1361  int chosen_device_id,
1362  const QueryCompilationDescriptor& query_comp_desc,
1364  const FragmentsList& frag_list,
1365  const ExecutorDispatchMode kernel_dispatch_mode,
1366  const int64_t rowid_lookup_key) {
1367  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1368  INJECT_TIMER(execution_dispatch_run);
1369  execution_dispatch.run(chosen_device_type,
1370  chosen_device_id,
1371  eo,
1372  column_fetcher,
1373  query_comp_desc,
1374  query_mem_desc,
1375  frag_list,
1376  kernel_dispatch_mode,
1377  rowid_lookup_key);
1378  };
1379 
1380  QueryFragmentDescriptor fragment_descriptor(
1381  ra_exe_unit,
1382  query_infos,
1383  query_comp_desc_owned->getDeviceType() == ExecutorDeviceType::GPU
1385  : std::vector<Data_Namespace::MemoryInfo>{},
1386  eo.gpu_input_mem_limit_percent,
1387  eo.outer_fragment_indices);
1388 
1389  if (!eo.just_validate) {
1390  int available_cpus = cpu_threads();
1391  auto available_gpus = get_available_gpus(cat);
1392 
1393  const auto context_count =
1394  get_context_count(device_type, available_cpus, available_gpus.size());
1395  try {
1396  if (g_use_tbb_pool) {
1397 #ifdef HAVE_TBB
1398  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1399  dispatchFragments<threadpool::TbbThreadPool<void>>(dispatch,
1400  execution_dispatch,
1401  query_infos,
1402  eo,
1403  is_agg,
1404  allow_single_frag_table_opt,
1405  context_count,
1406  *query_comp_desc_owned,
1407  *query_mem_desc_owned,
1408  fragment_descriptor,
1409  available_gpus,
1410  available_cpus);
1411 #else
1412  throw std::runtime_error(
1413  "This build is not TBB enabled. Restart the server with "
1414  "\"enable-modern-thread-pool\" disabled.");
1415 #endif
1416  } else {
1417  dispatchFragments<threadpool::FuturesThreadPool<void>>(
1418  dispatch,
1419  execution_dispatch,
1420  query_infos,
1421  eo,
1422  is_agg,
1423  allow_single_frag_table_opt,
1424  context_count,
1425  *query_comp_desc_owned,
1426  *query_mem_desc_owned,
1427  fragment_descriptor,
1428  available_gpus,
1429  available_cpus);
1430  }
1431  } catch (QueryExecutionError& e) {
1432  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1433  e.getErrorCode() == ERR_OUT_OF_TIME) {
1434  resetInterrupt();
1435  throw QueryExecutionError(ERR_INTERRUPTED);
1436  }
1437  if (eo.allow_runtime_query_interrupt && interrupted_.load()) {
1438  resetInterrupt();
1439  throw QueryExecutionError(ERR_INTERRUPTED);
1440  }
1441  cat.getDataMgr().freeAllBuffers();
1442  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1443  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1444  crt_min_byte_width <<= 1;
1445  continue;
1446  }
1447  throw;
1448  }
1449  }
1450  cat.getDataMgr().freeAllBuffers();
1451  if (is_agg) {
1452  try {
1453  return collectAllDeviceResults(execution_dispatch,
1454  ra_exe_unit.target_exprs,
1455  *query_mem_desc_owned,
1456  query_comp_desc_owned->getDeviceType(),
1457  row_set_mem_owner);
1458  } catch (ReductionRanOutOfSlots&) {
1459  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1460  } catch (OverflowOrUnderflow&) {
1461  crt_min_byte_width <<= 1;
1462  continue;
1463  }
1464  }
1465  return resultsUnion(execution_dispatch);
1466 
1467  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1468 
1469  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1472  nullptr,
1473  this);
1474 }
1475 
1477  const InputTableInfo& table_info,
1478  const CompilationOptions& co,
1479  const ExecutionOptions& eo,
1481  PerFragmentCallBack& cb) {
1482  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1483  ColumnCacheMap column_cache;
1484 
1485  std::vector<InputTableInfo> table_infos{table_info};
1486  // TODO(adb): ensure this is under a try / catch
1487  ExecutionDispatch execution_dispatch(
1488  this, ra_exe_unit, table_infos, cat, row_set_mem_owner_, nullptr);
1489  ColumnFetcher column_fetcher(this, column_cache);
1490  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1491  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1492  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1493  execution_dispatch.compile(0, 8, co, eo, column_fetcher, false);
1494  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1495  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1496  const auto& outer_fragments = table_info.info.fragments;
1497  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1498  ++fragment_index) {
1499  // We may want to consider in the future allowing this to execute on devices other
1500  // than CPU
1501  execution_dispatch.run(co.device_type,
1502  0,
1503  eo,
1504  column_fetcher,
1505  *query_comp_desc_owned,
1506  *query_mem_desc_owned,
1507  {{table_id, {fragment_index}}},
1509  -1);
1510  }
1511 
1512  const auto& all_fragment_results = execution_dispatch.getFragmentResults();
1513 
1514  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1515  ++fragment_index) {
1516  const auto fragment_results = all_fragment_results[fragment_index];
1517  cb(fragment_results.first, outer_fragments[fragment_index]);
1518  }
1519 }
1520 
1522  const TableFunctionExecutionUnit exe_unit,
1523  const std::vector<InputTableInfo>& table_infos,
1524  const CompilationOptions& co,
1525  const ExecutionOptions& eo,
1527  INJECT_TIMER(Exec_executeTableFunction);
1528  nukeOldState(false, table_infos, nullptr);
1529 
1530  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1531  // framework, we may want to move this up a level
1532 
1533  ColumnFetcher column_fetcher(this, column_cache);
1534  TableFunctionCompilationContext compilation_context;
1535  compilation_context.compile(exe_unit, co, this);
1536 
1537  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1538  CHECK_EQ(table_infos.size(), size_t(1));
1539  return exe_context.execute(exe_unit,
1540  table_infos.front(),
1541  &compilation_context,
1542  column_fetcher,
1543  co.device_type,
1544  this);
1545 }
1546 
1548  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1549 }
1550 
1551 // Looks at the targets and returns a feasible device type. We only punt
1552 // to CPU for count distinct and we should probably fix it and remove this.
1554  const RelAlgExecutionUnit& ra_exe_unit,
1555  const ExecutorDeviceType requested_device_type) {
1556  for (const auto target_expr : ra_exe_unit.target_exprs) {
1557  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1558  if (!ra_exe_unit.groupby_exprs.empty() &&
1559  !isArchPascalOrLater(requested_device_type)) {
1560  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1561  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1562  return ExecutorDeviceType::CPU;
1563  }
1564  }
1565  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1566  return ExecutorDeviceType::CPU;
1567  }
1568  }
1569  return requested_device_type;
1570 }
1571 
1572 namespace {
1573 
1574 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1575  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1576  if (ti.is_fp()) {
1577  if (float_argument_input && ti.get_type() == kFLOAT) {
1578  int64_t float_null_val = 0;
1579  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1580  static_cast<float>(inline_fp_null_val(ti));
1581  return float_null_val;
1582  }
1583  const auto double_null_val = inline_fp_null_val(ti);
1584  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1585  }
1586  return inline_int_null_val(ti);
1587 }
1588 
1589 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1590  std::vector<int64_t>& entry,
1591  const std::vector<Analyzer::Expr*>& target_exprs,
1593  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1594  const auto target_expr = target_exprs[target_idx];
1595  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1596  CHECK(agg_info.is_agg);
1597  target_infos.push_back(agg_info);
1598  if (g_cluster) {
1599  const auto executor = query_mem_desc.getExecutor();
1600  CHECK(executor);
1601  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1602  CHECK(row_set_mem_owner);
1603  const auto& count_distinct_desc =
1604  query_mem_desc.getCountDistinctDescriptor(target_idx);
1605  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1606  auto count_distinct_buffer = static_cast<int8_t*>(
1607  checked_calloc(count_distinct_desc.bitmapPaddedSizeBytes(), 1));
1608  CHECK(row_set_mem_owner);
1609  row_set_mem_owner->addCountDistinctBuffer(
1610  count_distinct_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), true);
1611  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1612  continue;
1613  }
1614  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1615  auto count_distinct_set = new std::set<int64_t>();
1616  CHECK(row_set_mem_owner);
1617  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1618  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1619  continue;
1620  }
1621  }
1622  const bool float_argument_input = takes_float_argument(agg_info);
1623  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1624  entry.push_back(0);
1625  } else if (agg_info.agg_kind == kAVG) {
1626  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1627  entry.push_back(0);
1628  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1629  if (agg_info.sql_type.is_geometry()) {
1630  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1631  entry.push_back(0);
1632  }
1633  } else if (agg_info.sql_type.is_varlen()) {
1634  entry.push_back(0);
1635  entry.push_back(0);
1636  } else {
1637  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1638  }
1639  } else {
1640  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1641  }
1642  }
1643 }
1644 
1646  const std::vector<Analyzer::Expr*>& target_exprs_in,
1648  const ExecutorDeviceType device_type) {
1649  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1650  std::vector<Analyzer::Expr*> target_exprs;
1651  for (const auto target_expr : target_exprs_in) {
1652  const auto target_expr_copy =
1653  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1654  CHECK(target_expr_copy);
1655  auto ti = target_expr->get_type_info();
1656  ti.set_notnull(false);
1657  target_expr_copy->set_type_info(ti);
1658  if (target_expr_copy->get_arg()) {
1659  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1660  arg_ti.set_notnull(false);
1661  target_expr_copy->get_arg()->set_type_info(arg_ti);
1662  }
1663  target_exprs_owned_copies.push_back(target_expr_copy);
1664  target_exprs.push_back(target_expr_copy.get());
1665  }
1666  std::vector<TargetInfo> target_infos;
1667  std::vector<int64_t> entry;
1668  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1669  const auto executor = query_mem_desc.getExecutor();
1670  CHECK(executor);
1671  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1672  CHECK(row_set_mem_owner);
1673  auto rs = std::make_shared<ResultSet>(
1674  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1675  rs->allocateStorage();
1676  rs->fillOneEntry(entry);
1677  return rs;
1678 }
1679 
1680 } // namespace
1681 
1683  ExecutionDispatch& execution_dispatch,
1684  const std::vector<Analyzer::Expr*>& target_exprs,
1686  const ExecutorDeviceType device_type,
1687  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1688  auto timer = DEBUG_TIMER(__func__);
1689  auto& result_per_device = execution_dispatch.getFragmentResults();
1690  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1692  return build_row_for_empty_input(target_exprs, query_mem_desc, device_type);
1693  }
1694  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1695  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1696  try {
1697  return reduceSpeculativeTopN(
1698  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1699  } catch (const std::bad_alloc&) {
1700  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1701  }
1702  }
1703  const auto shard_count =
1704  device_type == ExecutorDeviceType::GPU
1706  : 0;
1707 
1708  if (shard_count && !result_per_device.empty()) {
1709  return collectAllDeviceShardedTopResults(execution_dispatch);
1710  }
1711  return reduceMultiDeviceResults(
1712  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1713 }
1714 
1715 namespace {
1726 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1727  const QueryMemoryDescriptor& input_query_mem_desc,
1728  const ResultSetStorage* output_storage,
1729  size_t output_row_index,
1730  const QueryMemoryDescriptor& output_query_mem_desc,
1731  const std::vector<uint32_t>& top_permutation) {
1732  const auto output_buffer = output_storage->getUnderlyingBuffer();
1733  const auto input_buffer = input_storage->getUnderlyingBuffer();
1734  for (const auto sorted_idx : top_permutation) {
1735  // permuting all group-columns in this result set into the final buffer:
1736  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1737  group_idx++) {
1738  const auto input_column_ptr =
1739  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1740  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1741  const auto output_column_ptr =
1742  output_buffer +
1743  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1744  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1745  memcpy(output_column_ptr,
1746  input_column_ptr,
1747  output_query_mem_desc.groupColWidth(group_idx));
1748  }
1749  // permuting all agg-columns in this result set into the final buffer:
1750  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1751  slot_idx++) {
1752  const auto input_column_ptr =
1753  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1754  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1755  const auto output_column_ptr =
1756  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1757  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1758  memcpy(output_column_ptr,
1759  input_column_ptr,
1760  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1761  }
1762  ++output_row_index;
1763  }
1764  return output_row_index;
1765 }
1766 
1776 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1777  const ResultSetStorage* output_storage,
1778  size_t output_row_index,
1779  const QueryMemoryDescriptor& output_query_mem_desc,
1780  const std::vector<uint32_t>& top_permutation) {
1781  const auto output_buffer = output_storage->getUnderlyingBuffer();
1782  const auto input_buffer = input_storage->getUnderlyingBuffer();
1783  for (const auto sorted_idx : top_permutation) {
1784  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1785  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1786  row_ptr,
1787  output_query_mem_desc.getRowSize());
1788  ++output_row_index;
1789  }
1790  return output_row_index;
1791 }
1792 } // namespace
1793 
1794 // Collect top results from each device, stitch them together and sort. Partial
1795 // results from each device are guaranteed to be disjunct because we only go on
1796 // this path when one of the columns involved is a shard key.
1798  ExecutionDispatch& execution_dispatch) const {
1799  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1800  auto& result_per_device = execution_dispatch.getFragmentResults();
1801  const auto first_result_set = result_per_device.front().first;
1802  CHECK(first_result_set);
1803  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1804  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1805  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1806  top_query_mem_desc.setEntryCount(0);
1807  for (auto& result : result_per_device) {
1808  const auto result_set = result.first;
1809  CHECK(result_set);
1810  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1811  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1812  top_query_mem_desc.setEntryCount(new_entry_cnt);
1813  }
1814  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1815  first_result_set->getDeviceType(),
1816  top_query_mem_desc,
1817  first_result_set->getRowSetMemOwner(),
1818  this);
1819  auto top_storage = top_result_set->allocateStorage();
1820  size_t top_output_row_idx{0};
1821  for (auto& result : result_per_device) {
1822  const auto result_set = result.first;
1823  CHECK(result_set);
1824  const auto& top_permutation = result_set->getPermutationBuffer();
1825  CHECK_LE(top_permutation.size(), top_n);
1826  if (top_query_mem_desc.didOutputColumnar()) {
1827  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1828  result_set->getQueryMemDesc(),
1829  top_storage,
1830  top_output_row_idx,
1831  top_query_mem_desc,
1832  top_permutation);
1833  } else {
1834  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1835  top_storage,
1836  top_output_row_idx,
1837  top_query_mem_desc,
1838  top_permutation);
1839  }
1840  }
1841  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1842  return top_result_set;
1843 }
1844 
1845 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1846  const {
1847  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1848  const auto& join_info = plan_state_->join_info_;
1849  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1850  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1851  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1852  id_to_cond.insert(
1853  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1854  }
1855  return id_to_cond;
1856 }
1857 
1858 namespace {
1859 
1860 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1861  for (const auto& col : fetched_cols) {
1862  if (col.is_lazily_fetched) {
1863  return true;
1864  }
1865  }
1866  return false;
1867 }
1868 
1869 } // namespace
1870 
1871 template <typename THREAD_POOL>
1873  const std::function<void(const ExecutorDeviceType chosen_device_type,
1874  int chosen_device_id,
1875  const QueryCompilationDescriptor& query_comp_desc,
1877  const FragmentsList& frag_list,
1878  const ExecutorDispatchMode kernel_dispatch_mode,
1879  const int64_t rowid_lookup_key)> dispatch,
1880  const ExecutionDispatch& execution_dispatch,
1881  const std::vector<InputTableInfo>& table_infos,
1882  const ExecutionOptions& eo,
1883  const bool is_agg,
1884  const bool allow_single_frag_table_opt,
1885  const size_t context_count,
1886  const QueryCompilationDescriptor& query_comp_desc,
1887  const QueryMemoryDescriptor& query_mem_desc,
1888  QueryFragmentDescriptor& fragment_descriptor,
1889  std::unordered_set<int>& available_gpus,
1890  int& available_cpus) {
1891  THREAD_POOL query_threads;
1892  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1893  CHECK(!ra_exe_unit.input_descs.empty());
1894 
1895  const auto device_type = query_comp_desc.getDeviceType();
1896  const bool uses_lazy_fetch =
1897  plan_state_->allow_lazy_fetch_ &&
1898  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
1899  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1900  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1901 
1902  const auto device_count = deviceCount(device_type);
1903  CHECK_GT(device_count, 0);
1904 
1905  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1906  execution_dispatch.getFragOffsets(),
1907  device_count,
1908  device_type,
1909  use_multifrag_kernel,
1911  this);
1912  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1913  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1914  }
1915 
1916  if (use_multifrag_kernel) {
1917  VLOG(1) << "Dispatching multifrag kernels";
1918  VLOG(1) << query_mem_desc.toString();
1919 
1920  // NB: We should never be on this path when the query is retried because of running
1921  // out of group by slots; also, for scan only queries on CPU we want the
1922  // high-granularity, fragment by fragment execution instead. For scan only queries on
1923  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1924  // buffer per fragment.
1925  auto multifrag_kernel_dispatch =
1926  [&query_threads, &dispatch, query_comp_desc, query_mem_desc](
1927  const int device_id,
1928  const FragmentsList& frag_list,
1929  const int64_t rowid_lookup_key) {
1930  query_threads.append(dispatch,
1932  device_id,
1933  query_comp_desc,
1934  query_mem_desc,
1935  frag_list,
1937  rowid_lookup_key);
1938  };
1939  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
1940  } else {
1941  VLOG(1) << "Dispatching kernel per fragment";
1942  VLOG(1) << query_mem_desc.toString();
1943 
1944  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
1946  table_infos.size() == 1 && table_infos.front().table_id > 0) {
1947  const auto max_frag_size =
1948  table_infos.front().info.getFragmentNumTuplesUpperBound();
1949  if (max_frag_size < query_mem_desc.getEntryCount()) {
1950  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
1951  << " to match max fragment size " << max_frag_size
1952  << " for kernel per fragment execution path.";
1953  throw CompilationRetryNewScanLimit(max_frag_size);
1954  }
1955  }
1956 
1957  size_t frag_list_idx{0};
1958  auto fragment_per_kernel_dispatch = [&query_threads,
1959  &dispatch,
1960  &frag_list_idx,
1961  &device_type,
1962  query_comp_desc,
1963  query_mem_desc](const int device_id,
1964  const FragmentsList& frag_list,
1965  const int64_t rowid_lookup_key) {
1966  if (!frag_list.size()) {
1967  return;
1968  }
1969  CHECK_GE(device_id, 0);
1970 
1971  query_threads.append(dispatch,
1972  device_type,
1973  device_id,
1974  query_comp_desc,
1975  query_mem_desc,
1976  frag_list,
1978  rowid_lookup_key);
1979 
1980  ++frag_list_idx;
1981  };
1982 
1983  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
1984  ra_exe_unit);
1985  }
1986  query_threads.join();
1987 }
1988 
1990  const RelAlgExecutionUnit& ra_exe_unit,
1991  const ExecutorDeviceType device_type,
1992  const size_t table_idx,
1993  const size_t outer_frag_idx,
1994  std::map<int, const TableFragments*>& selected_tables_fragments,
1995  const std::unordered_map<int, const Analyzer::BinOper*>&
1996  inner_table_id_to_join_condition) {
1997  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1998  auto table_frags_it = selected_tables_fragments.find(table_id);
1999  CHECK(table_frags_it != selected_tables_fragments.end());
2000  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2001  const auto outer_table_fragments_it =
2002  selected_tables_fragments.find(outer_input_desc.getTableId());
2003  const auto outer_table_fragments = outer_table_fragments_it->second;
2004  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2005  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2006  if (!table_idx) {
2007  return {outer_frag_idx};
2008  }
2009  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2010  auto& inner_frags = table_frags_it->second;
2011  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2012  std::vector<size_t> all_frag_ids;
2013  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2014  ++inner_frag_idx) {
2015  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2016  if (skipFragmentPair(outer_fragment_info,
2017  inner_frag_info,
2018  table_idx,
2019  inner_table_id_to_join_condition,
2020  ra_exe_unit,
2021  device_type)) {
2022  continue;
2023  }
2024  all_frag_ids.push_back(inner_frag_idx);
2025  }
2026  return all_frag_ids;
2027 }
2028 
2029 // Returns true iff the join between two fragments cannot yield any results, per
2030 // shard information. The pair can be skipped to avoid full broadcast.
2032  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2033  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2034  const int table_idx,
2035  const std::unordered_map<int, const Analyzer::BinOper*>&
2036  inner_table_id_to_join_condition,
2037  const RelAlgExecutionUnit& ra_exe_unit,
2038  const ExecutorDeviceType device_type) {
2039  if (device_type != ExecutorDeviceType::GPU) {
2040  return false;
2041  }
2042  CHECK(table_idx >= 0 &&
2043  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2044  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2045  // Both tables need to be sharded the same way.
2046  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2047  outer_fragment_info.shard == inner_fragment_info.shard) {
2048  return false;
2049  }
2050  const Analyzer::BinOper* join_condition{nullptr};
2051  if (ra_exe_unit.join_quals.empty()) {
2052  CHECK(!inner_table_id_to_join_condition.empty());
2053  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2054  CHECK(condition_it != inner_table_id_to_join_condition.end());
2055  join_condition = condition_it->second;
2056  CHECK(join_condition);
2057  } else {
2058  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2059  plan_state_->join_info_.join_hash_tables_.size());
2060  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2061  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2062  table_idx) {
2063  CHECK(!join_condition);
2064  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2065  }
2066  }
2067  }
2068  if (!join_condition) {
2069  return false;
2070  }
2071  // TODO(adb): support fragment skipping based on the overlaps operator
2072  if (join_condition->is_overlaps_oper()) {
2073  return false;
2074  }
2075  size_t shard_count{0};
2076  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2077  join_condition->get_left_operand())) {
2078  auto inner_outer_pairs =
2079  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2081  join_condition, this, inner_outer_pairs);
2082  } else {
2083  shard_count = get_shard_count(join_condition, this);
2084  }
2085  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2086  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2087  }
2088  return shard_count;
2089 }
2090 
2091 namespace {
2092 
2095  const int table_id = col_desc->getScanDesc().getTableId();
2096  const int col_id = col_desc->getColId();
2097  return get_column_descriptor_maybe(col_id, table_id, cat);
2098 }
2099 
2100 } // namespace
2101 
2102 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2103  const std::vector<InputDescriptor>& input_descs,
2104  const std::map<int, const TableFragments*>& all_tables_fragments) {
2105  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2106  for (auto& desc : input_descs) {
2107  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2108  CHECK(fragments_it != all_tables_fragments.end());
2109  const auto& fragments = *fragments_it->second;
2110  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2111  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2112  frag_offsets[i] = off;
2113  off += fragments[i].getNumTuples();
2114  }
2115  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2116  }
2117  return tab_id_to_frag_offsets;
2118 }
2119 
2120 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2122  const RelAlgExecutionUnit& ra_exe_unit,
2123  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2124  const std::vector<InputDescriptor>& input_descs,
2125  const std::map<int, const TableFragments*>& all_tables_fragments) {
2126  std::vector<std::vector<int64_t>> all_num_rows;
2127  std::vector<std::vector<uint64_t>> all_frag_offsets;
2128  const auto tab_id_to_frag_offsets =
2129  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2130  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2131  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2132  std::vector<int64_t> num_rows;
2133  std::vector<uint64_t> frag_offsets;
2134  if (!ra_exe_unit.union_all) {
2135  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2136  }
2137  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2138  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2139  const auto fragments_it =
2140  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2141  CHECK(fragments_it != all_tables_fragments.end());
2142  const auto& fragments = *fragments_it->second;
2143  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2144  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2145  const auto& fragment = fragments[frag_id];
2146  num_rows.push_back(fragment.getNumTuples());
2147  } else {
2148  size_t total_row_count{0};
2149  for (const auto& fragment : fragments) {
2150  total_row_count += fragment.getNumTuples();
2151  }
2152  num_rows.push_back(total_row_count);
2153  }
2154  const auto frag_offsets_it =
2155  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2156  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2157  const auto& offsets = frag_offsets_it->second;
2158  CHECK_LT(frag_id, offsets.size());
2159  frag_offsets.push_back(offsets[frag_id]);
2160  }
2161  all_num_rows.push_back(num_rows);
2162  // Fragment offsets of outer table should be ONLY used by rowid for now.
2163  all_frag_offsets.push_back(frag_offsets);
2164  }
2165  return {all_num_rows, all_frag_offsets};
2166 }
2167 
2168 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2169 // all the table fragments.
2171  const RelAlgExecutionUnit& ra_exe_unit,
2172  const FragmentsList& selected_fragments) const {
2173  const auto& input_descs = ra_exe_unit.input_descs;
2174  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2175  if (nest_level < 1 ||
2176  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2177  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2178  (ra_exe_unit.join_quals.empty() &&
2179  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2180  return false;
2181  }
2182  const int table_id = inner_col_desc.getScanDesc().getTableId();
2183  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2184  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2185  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2186  return fragments.size() > 1;
2187 }
2188 
2189 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2190  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2191  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2192  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2193 }
2194 
2196  const ColumnFetcher& column_fetcher,
2197  const RelAlgExecutionUnit& ra_exe_unit,
2198  const int device_id,
2199  const Data_Namespace::MemoryLevel memory_level,
2200  const std::map<int, const TableFragments*>& all_tables_fragments,
2201  const FragmentsList& selected_fragments,
2203  std::list<ChunkIter>& chunk_iterators,
2204  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
2205  auto timer = DEBUG_TIMER(__func__);
2206  INJECT_TIMER(fetchChunks);
2207  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2208  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2209  std::vector<size_t> local_col_to_frag_pos;
2210  buildSelectedFragsMapping(selected_fragments_crossjoin,
2211  local_col_to_frag_pos,
2212  col_global_ids,
2213  selected_fragments,
2214  ra_exe_unit);
2215 
2217  selected_fragments_crossjoin);
2218 
2219  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2220  std::vector<std::vector<int64_t>> all_num_rows;
2221  std::vector<std::vector<uint64_t>> all_frag_offsets;
2222 
2223  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2224  std::vector<const int8_t*> frag_col_buffers(
2225  plan_state_->global_to_local_col_ids_.size());
2226  for (const auto& col_id : col_global_ids) {
2227  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2229  interrupted_.load()) {
2230  resetInterrupt();
2231  throw QueryExecutionError(ERR_INTERRUPTED);
2232  }
2233  CHECK(col_id);
2234  const int table_id = col_id->getScanDesc().getTableId();
2235  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2236  if (cd && cd->isVirtualCol) {
2237  CHECK_EQ("rowid", cd->columnName);
2238  continue;
2239  }
2240  const auto fragments_it = all_tables_fragments.find(table_id);
2241  CHECK(fragments_it != all_tables_fragments.end());
2242  const auto fragments = fragments_it->second;
2243  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2244  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2245  CHECK_LT(static_cast<size_t>(it->second),
2246  plan_state_->global_to_local_col_ids_.size());
2247  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2248  if (!fragments->size()) {
2249  return {};
2250  }
2251  CHECK_LT(frag_id, fragments->size());
2252  auto memory_level_for_column = memory_level;
2253  if (plan_state_->columns_to_fetch_.find(
2254  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2255  plan_state_->columns_to_fetch_.end()) {
2256  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2257  }
2258  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2259  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2260  col_id.get(), memory_level_for_column, device_id);
2261  } else {
2262  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2263  frag_col_buffers[it->second] =
2264  column_fetcher.getAllTableColumnFragments(table_id,
2265  col_id->getColId(),
2266  all_tables_fragments,
2267  memory_level_for_column,
2268  device_id);
2269  } else {
2270  frag_col_buffers[it->second] =
2271  column_fetcher.getOneTableColumnFragment(table_id,
2272  frag_id,
2273  col_id->getColId(),
2274  all_tables_fragments,
2275  chunks,
2276  chunk_iterators,
2277  memory_level_for_column,
2278  device_id);
2279  }
2280  }
2281  }
2282  all_frag_col_buffers.push_back(frag_col_buffers);
2283  }
2284  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2285  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2286  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2287 }
2288 
2289 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2290 // This is written under the assumption that multiple inputs implies a UNION ALL.
2292  const ColumnFetcher& column_fetcher,
2293  const RelAlgExecutionUnit& ra_exe_unit,
2294  const int device_id,
2295  const Data_Namespace::MemoryLevel memory_level,
2296  const std::map<int, const TableFragments*>& all_tables_fragments,
2297  const FragmentsList& selected_fragments,
2299  std::list<ChunkIter>& chunk_iterators,
2300  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
2301  auto timer = DEBUG_TIMER(__func__);
2302  INJECT_TIMER(fetchUnionChunks);
2303 
2304  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2305  std::vector<std::vector<int64_t>> all_num_rows;
2306  std::vector<std::vector<uint64_t>> all_frag_offsets;
2307 
2308  CHECK(!selected_fragments.empty());
2309  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2310  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2311  using TableId = int;
2312  TableId const selected_table_id = selected_fragments.front().table_id;
2313  bool const input_descs_index =
2314  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2315  if (!input_descs_index) {
2316  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2317  }
2318  bool const input_col_descs_index =
2319  selected_table_id ==
2320  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2321  if (!input_col_descs_index) {
2322  CHECK_EQ(selected_table_id,
2323  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2324  }
2325  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2326  << " selected_table_id=" << selected_table_id
2327  << " input_descs_index=" << int(input_descs_index)
2328  << " input_col_descs_index=" << int(input_col_descs_index)
2329  << " ra_exe_unit.input_descs="
2330  << shared::printContainer(ra_exe_unit.input_descs)
2331  << " ra_exe_unit.input_col_descs="
2332  << shared::printContainer(ra_exe_unit.input_col_descs);
2333 
2334  // Partition col_global_ids by table_id
2335  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2336  table_id_to_input_col_descs;
2337  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2338  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2339  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2340  }
2341  for (auto const& pair : table_id_to_input_col_descs) {
2342  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2343  std::vector<size_t> local_col_to_frag_pos;
2344 
2345  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2346  local_col_to_frag_pos,
2347  pair.second,
2348  selected_fragments,
2349  ra_exe_unit);
2350 
2352  selected_fragments_crossjoin);
2353 
2354  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2355  std::vector<const int8_t*> frag_col_buffers(
2356  plan_state_->global_to_local_col_ids_.size());
2357  for (const auto& col_id : pair.second) {
2358  CHECK(col_id);
2359  const int table_id = col_id->getScanDesc().getTableId();
2360  CHECK_EQ(table_id, pair.first);
2361  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2362  if (cd && cd->isVirtualCol) {
2363  CHECK_EQ("rowid", cd->columnName);
2364  continue;
2365  }
2366  const auto fragments_it = all_tables_fragments.find(table_id);
2367  CHECK(fragments_it != all_tables_fragments.end());
2368  const auto fragments = fragments_it->second;
2369  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2370  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2371  CHECK_LT(static_cast<size_t>(it->second),
2372  plan_state_->global_to_local_col_ids_.size());
2373  const size_t frag_id = ra_exe_unit.union_all
2374  ? 0
2375  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2376  if (!fragments->size()) {
2377  return {};
2378  }
2379  CHECK_LT(frag_id, fragments->size());
2380  auto memory_level_for_column = memory_level;
2381  if (plan_state_->columns_to_fetch_.find(
2382  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2383  plan_state_->columns_to_fetch_.end()) {
2384  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2385  }
2386  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2387  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2388  col_id.get(), memory_level_for_column, device_id);
2389  } else {
2390  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2391  frag_col_buffers[it->second] =
2392  column_fetcher.getAllTableColumnFragments(table_id,
2393  col_id->getColId(),
2394  all_tables_fragments,
2395  memory_level_for_column,
2396  device_id);
2397  } else {
2398  frag_col_buffers[it->second] =
2399  column_fetcher.getOneTableColumnFragment(table_id,
2400  frag_id,
2401  col_id->getColId(),
2402  all_tables_fragments,
2403  chunks,
2404  chunk_iterators,
2405  memory_level_for_column,
2406  device_id);
2407  }
2408  }
2409  }
2410  all_frag_col_buffers.push_back(frag_col_buffers);
2411  }
2412  std::vector<std::vector<int64_t>> num_rows;
2413  std::vector<std::vector<uint64_t>> frag_offsets;
2414  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2415  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2416  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2417  all_frag_offsets.insert(
2418  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2419  }
2420  // UNION ALL hacks.
2421  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2422  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2423  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2424  }
2425  if (input_descs_index == input_col_descs_index) {
2426  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2427  }
2428 
2429  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2430  << " all_num_rows=" << shared::printContainer(all_num_rows)
2431  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2432  << " input_col_descs_index=" << input_col_descs_index;
2433  return {{all_frag_col_buffers[input_descs_index]},
2434  {{all_num_rows[0][input_descs_index]}},
2435  {{all_frag_offsets[0][input_descs_index]}}};
2436 }
2437 
2438 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2439  const size_t scan_idx,
2440  const RelAlgExecutionUnit& ra_exe_unit) {
2441  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2442  scan_idx > 0 &&
2443  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2444  !selected_fragments[scan_idx].fragment_ids.empty()) {
2445  // Fetch all fragments
2446  return {size_t(0)};
2447  }
2448 
2449  return selected_fragments[scan_idx].fragment_ids;
2450 }
2451 
2453  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2454  std::vector<size_t>& local_col_to_frag_pos,
2455  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2456  const FragmentsList& selected_fragments,
2457  const RelAlgExecutionUnit& ra_exe_unit) {
2458  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2459  size_t frag_pos{0};
2460  const auto& input_descs = ra_exe_unit.input_descs;
2461  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2462  const int table_id = input_descs[scan_idx].getTableId();
2463  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2464  selected_fragments_crossjoin.push_back(
2465  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2466  for (const auto& col_id : col_global_ids) {
2467  CHECK(col_id);
2468  const auto& input_desc = col_id->getScanDesc();
2469  if (input_desc.getTableId() != table_id ||
2470  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2471  continue;
2472  }
2473  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2474  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2475  CHECK_LT(static_cast<size_t>(it->second),
2476  plan_state_->global_to_local_col_ids_.size());
2477  local_col_to_frag_pos[it->second] = frag_pos;
2478  }
2479  ++frag_pos;
2480  }
2481 }
2482 
2484  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2485  std::vector<size_t>& local_col_to_frag_pos,
2486  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2487  const FragmentsList& selected_fragments,
2488  const RelAlgExecutionUnit& ra_exe_unit) {
2489  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2490  size_t frag_pos{0};
2491  const auto& input_descs = ra_exe_unit.input_descs;
2492  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2493  const int table_id = input_descs[scan_idx].getTableId();
2494  // selected_fragments here is from assignFragsToKernelDispatch
2495  // execution_kernel.fragments
2496  if (selected_fragments[0].table_id != table_id) { // TODO 0
2497  continue;
2498  }
2499  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2500  selected_fragments_crossjoin.push_back(
2501  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2502  {size_t(1)}); // TODO
2503  for (const auto& col_id : col_global_ids) {
2504  CHECK(col_id);
2505  const auto& input_desc = col_id->getScanDesc();
2506  if (input_desc.getTableId() != table_id ||
2507  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2508  continue;
2509  }
2510  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2511  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2512  CHECK_LT(static_cast<size_t>(it->second),
2513  plan_state_->global_to_local_col_ids_.size());
2514  local_col_to_frag_pos[it->second] = frag_pos;
2515  }
2516  ++frag_pos;
2517  }
2518 }
2519 
2520 namespace {
2521 
2523  public:
2524  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2526  for (auto out : out_vec_) {
2527  delete[] out;
2528  }
2529  }
2530 
2531  private:
2532  std::vector<int64_t*> out_vec_;
2533 };
2534 } // namespace
2535 
2537  const RelAlgExecutionUnit& ra_exe_unit,
2538  const CompilationResult& compilation_result,
2539  const bool hoist_literals,
2540  ResultSetPtr& results,
2541  const std::vector<Analyzer::Expr*>& target_exprs,
2542  const ExecutorDeviceType device_type,
2543  std::vector<std::vector<const int8_t*>>& col_buffers,
2544  QueryExecutionContext* query_exe_context,
2545  const std::vector<std::vector<int64_t>>& num_rows,
2546  const std::vector<std::vector<uint64_t>>& frag_offsets,
2547  Data_Namespace::DataMgr* data_mgr,
2548  const int device_id,
2549  const uint32_t start_rowid,
2550  const uint32_t num_tables,
2551  RenderInfo* render_info) {
2552  INJECT_TIMER(executePlanWithoutGroupBy);
2553  auto timer = DEBUG_TIMER(__func__);
2554  CHECK(!results);
2555  if (col_buffers.empty()) {
2556  return 0;
2557  }
2558 
2559  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2560  if (render_info) {
2561  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2562  // here, we are in non-insitu mode.
2563  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2564  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2565  "currently unsupported.";
2566  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2567  }
2568 
2569  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2570  std::vector<int64_t*> out_vec;
2571  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2572  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2573  std::unique_ptr<OutVecOwner> output_memory_scope;
2575  interrupted_.load()) {
2576  resetInterrupt();
2577  throw QueryExecutionError(ERR_INTERRUPTED);
2578  }
2579  if (device_type == ExecutorDeviceType::CPU) {
2580  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2581  compilation_result.native_functions,
2582  hoist_literals,
2583  hoist_buf,
2584  col_buffers,
2585  num_rows,
2586  frag_offsets,
2587  0,
2588  &error_code,
2589  num_tables,
2590  join_hash_table_ptrs);
2591  output_memory_scope.reset(new OutVecOwner(out_vec));
2592  } else {
2593  try {
2594  out_vec = query_exe_context->launchGpuCode(
2595  ra_exe_unit,
2596  compilation_result.native_functions,
2597  hoist_literals,
2598  hoist_buf,
2599  col_buffers,
2600  num_rows,
2601  frag_offsets,
2602  0,
2603  data_mgr,
2604  blockSize(),
2605  gridSize(),
2606  device_id,
2607  compilation_result.gpu_smem_context.getSharedMemorySize(),
2608  &error_code,
2609  num_tables,
2610  join_hash_table_ptrs,
2611  render_allocator_map_ptr);
2612  output_memory_scope.reset(new OutVecOwner(out_vec));
2613  } catch (const OutOfMemory&) {
2614  return ERR_OUT_OF_GPU_MEM;
2615  } catch (const std::exception& e) {
2616  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2617  }
2618  }
2619  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2620  error_code == Executor::ERR_DIV_BY_ZERO ||
2621  error_code == Executor::ERR_OUT_OF_TIME ||
2622  error_code == Executor::ERR_INTERRUPTED ||
2624  return error_code;
2625  }
2626  if (ra_exe_unit.estimator) {
2627  CHECK(!error_code);
2628  results =
2629  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2630  return 0;
2631  }
2632  std::vector<int64_t> reduced_outs;
2633  const auto num_frags = col_buffers.size();
2634  const size_t entry_count =
2635  device_type == ExecutorDeviceType::GPU
2636  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2637  ? 1
2638  : blockSize() * gridSize() * num_frags)
2639  : num_frags;
2640  if (size_t(1) == entry_count) {
2641  for (auto out : out_vec) {
2642  CHECK(out);
2643  reduced_outs.push_back(*out);
2644  }
2645  } else {
2646  size_t out_vec_idx = 0;
2647 
2648  for (const auto target_expr : target_exprs) {
2649  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2650  CHECK(agg_info.is_agg);
2651 
2652  const int num_iterations = agg_info.sql_type.is_geometry()
2653  ? agg_info.sql_type.get_physical_coord_cols()
2654  : 1;
2655 
2656  for (int i = 0; i < num_iterations; i++) {
2657  int64_t val1;
2658  const bool float_argument_input = takes_float_argument(agg_info);
2659  if (is_distinct_target(agg_info)) {
2660  CHECK(agg_info.agg_kind == kCOUNT ||
2661  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2662  val1 = out_vec[out_vec_idx][0];
2663  error_code = 0;
2664  } else {
2665  const auto chosen_bytes = static_cast<size_t>(
2666  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2667  std::tie(val1, error_code) = Executor::reduceResults(
2668  agg_info.agg_kind,
2669  agg_info.sql_type,
2670  query_exe_context->getAggInitValForIndex(out_vec_idx),
2671  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2672  out_vec[out_vec_idx],
2673  entry_count,
2674  false,
2675  float_argument_input);
2676  }
2677  if (error_code) {
2678  break;
2679  }
2680  reduced_outs.push_back(val1);
2681  if (agg_info.agg_kind == kAVG ||
2682  (agg_info.agg_kind == kSAMPLE &&
2683  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2684  const auto chosen_bytes = static_cast<size_t>(
2685  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
2686  1));
2687  int64_t val2;
2688  std::tie(val2, error_code) = Executor::reduceResults(
2689  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2690  agg_info.sql_type,
2691  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2692  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2693  out_vec[out_vec_idx + 1],
2694  entry_count,
2695  false,
2696  false);
2697  if (error_code) {
2698  break;
2699  }
2700  reduced_outs.push_back(val2);
2701  ++out_vec_idx;
2702  }
2703  ++out_vec_idx;
2704  }
2705  }
2706  }
2707 
2708  if (error_code) {
2709  return error_code;
2710  }
2711 
2712  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2713  auto rows_ptr = std::shared_ptr<ResultSet>(
2714  query_exe_context->query_buffers_->result_sets_[0].release());
2715  rows_ptr->fillOneEntry(reduced_outs);
2716  results = std::move(rows_ptr);
2717  return error_code;
2718 }
2719 
2720 namespace {
2721 
2722 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2723  CHECK(scan_limit);
2724  return results && results->rowCount() < scan_limit;
2725 }
2726 
2727 } // namespace
2728 
2730  const RelAlgExecutionUnit& ra_exe_unit,
2731  const CompilationResult& compilation_result,
2732  const bool hoist_literals,
2733  ResultSetPtr& results,
2734  const ExecutorDeviceType device_type,
2735  std::vector<std::vector<const int8_t*>>& col_buffers,
2736  const std::vector<size_t> outer_tab_frag_ids,
2737  QueryExecutionContext* query_exe_context,
2738  const std::vector<std::vector<int64_t>>& num_rows,
2739  const std::vector<std::vector<uint64_t>>& frag_offsets,
2740  Data_Namespace::DataMgr* data_mgr,
2741  const int device_id,
2742  const int outer_table_id,
2743  const int64_t scan_limit,
2744  const uint32_t start_rowid,
2745  const uint32_t num_tables,
2746  RenderInfo* render_info) {
2747  auto timer = DEBUG_TIMER(__func__);
2748  INJECT_TIMER(executePlanWithGroupBy);
2749  CHECK(!results);
2750  if (col_buffers.empty()) {
2751  return 0;
2752  }
2753  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2754  // TODO(alex):
2755  // 1. Optimize size (make keys more compact).
2756  // 2. Resize on overflow.
2757  // 3. Optimize runtime.
2758  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2759  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2760  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2762  interrupted_.load()) {
2763  return ERR_INTERRUPTED;
2764  }
2765 
2766  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2767  if (render_info && render_info->useCudaBuffers()) {
2768  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2769  }
2770 
2771  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
2772  << " ra_exe_unit.input_descs="
2773  << shared::printContainer(ra_exe_unit.input_descs)
2774  << " ra_exe_unit.input_col_descs="
2775  << shared::printContainer(ra_exe_unit.input_col_descs)
2776  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
2777  << " num_rows=" << shared::printContainer(num_rows)
2778  << " frag_offsets=" << shared::printContainer(frag_offsets)
2779  << " query_exe_context->query_buffers_->num_rows_="
2780  << query_exe_context->query_buffers_->num_rows_
2781  << " query_exe_context->query_mem_desc_.getEntryCount()="
2782  << query_exe_context->query_mem_desc_.getEntryCount()
2783  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
2784  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
2785  << " num_tables=" << num_tables;
2786 
2787  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
2788  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
2789  // with outer_table_id.
2790  if (ra_exe_unit_copy.union_all) {
2791  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
2792  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
2793  ra_exe_unit_copy.input_descs.end(),
2794  [outer_table_id](auto const& a, auto const& b) {
2795  return a.getTableId() == outer_table_id &&
2796  b.getTableId() != outer_table_id;
2797  });
2798  while (!ra_exe_unit_copy.input_descs.empty() &&
2799  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
2800  ra_exe_unit_copy.input_descs.pop_back();
2801  }
2802  // Filter ra_exe_unit_copy.input_col_descs.
2803  ra_exe_unit_copy.input_col_descs.remove_if(
2804  [outer_table_id](auto const& input_col_desc) {
2805  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2806  });
2807  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
2808  }
2809 
2810  if (device_type == ExecutorDeviceType::CPU) {
2811  query_exe_context->launchCpuCode(
2812  ra_exe_unit_copy,
2813  compilation_result.native_functions,
2814  hoist_literals,
2815  hoist_buf,
2816  col_buffers,
2817  num_rows,
2818  frag_offsets,
2819  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2820  &error_code,
2821  num_tables,
2822  join_hash_table_ptrs);
2823  } else {
2824  try {
2825  query_exe_context->launchGpuCode(
2826  ra_exe_unit_copy,
2827  compilation_result.native_functions,
2828  hoist_literals,
2829  hoist_buf,
2830  col_buffers,
2831  num_rows,
2832  frag_offsets,
2833  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2834  data_mgr,
2835  blockSize(),
2836  gridSize(),
2837  device_id,
2838  compilation_result.gpu_smem_context.getSharedMemorySize(),
2839  &error_code,
2840  num_tables,
2841  join_hash_table_ptrs,
2842  render_allocator_map_ptr);
2843  } catch (const OutOfMemory&) {
2844  return ERR_OUT_OF_GPU_MEM;
2845  } catch (const OutOfRenderMemory&) {
2846  return ERR_OUT_OF_RENDER_MEM;
2847  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2848  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
2849  } catch (const std::exception& e) {
2850  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2851  }
2852  }
2853 
2854  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2855  error_code == Executor::ERR_DIV_BY_ZERO ||
2856  error_code == Executor::ERR_OUT_OF_TIME ||
2857  error_code == Executor::ERR_INTERRUPTED ||
2859  return error_code;
2860  }
2861 
2862  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2863  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2864  results = query_exe_context->getRowSet(ra_exe_unit_copy,
2865  query_exe_context->query_mem_desc_);
2866  CHECK(results);
2867  VLOG(2) << "results->rowCount()=" << results->rowCount();
2868  results->holdLiterals(hoist_buf);
2869  }
2870  if (error_code < 0 && render_allocator_map_ptr) {
2871  auto const adjusted_scan_limit =
2872  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
2873  // More rows passed the filter than available slots. We don't have a count to check,
2874  // so assume we met the limit if a scan limit is set
2875  if (adjusted_scan_limit != 0) {
2876  return 0;
2877  } else {
2878  return error_code;
2879  }
2880  }
2881  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
2882  return error_code; // unlucky, not enough results and we ran out of slots
2883  }
2884 
2885  return 0;
2886 }
2887 
2888 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
2889  const int device_id) {
2890  std::vector<int64_t> table_ptrs;
2891  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
2892  for (auto hash_table : join_hash_tables) {
2893  if (!hash_table) {
2894  CHECK(table_ptrs.empty());
2895  return {};
2896  }
2897  table_ptrs.push_back(hash_table->getJoinHashBuffer(
2898  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
2899  }
2900  return table_ptrs;
2901 }
2902 
2903 void Executor::nukeOldState(const bool allow_lazy_fetch,
2904  const std::vector<InputTableInfo>& query_infos,
2905  const RelAlgExecutionUnit* ra_exe_unit) {
2906  const bool contains_left_deep_outer_join =
2907  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
2908  ra_exe_unit->join_quals.end(),
2909  [](const JoinCondition& join_condition) {
2910  return join_condition.type == JoinType::LEFT;
2911  }) != ra_exe_unit->join_quals.end();
2912  cgen_state_.reset(new CgenState(query_infos, contains_left_deep_outer_join));
2913  plan_state_.reset(
2914  new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join, this));
2915 }
2916 
2917 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
2918  const std::vector<InputTableInfo>& query_infos) {
2919  const auto ld_count = input_descs.size();
2920  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
2921  for (size_t i = 0; i < ld_count; ++i) {
2922  CHECK_LT(i, query_infos.size());
2923  const auto frag_count = query_infos[i].info.fragments.size();
2924  if (i > 0) {
2925  cgen_state_->frag_offsets_.push_back(nullptr);
2926  } else {
2927  if (frag_count > 1) {
2928  cgen_state_->frag_offsets_.push_back(
2929  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
2930  } else {
2931  cgen_state_->frag_offsets_.push_back(nullptr);
2932  }
2933  }
2934  }
2935 }
2936 
2938  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
2939  const std::vector<InputTableInfo>& query_infos,
2940  const MemoryLevel memory_level,
2941  const JoinHashTableInterface::HashType preferred_hash_type,
2942  ColumnCacheMap& column_cache) {
2943  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
2944  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
2945  }
2946  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2948  interrupted_.load()) {
2949  resetInterrupt();
2950  throw QueryExecutionError(ERR_INTERRUPTED);
2951  }
2952  try {
2953  auto tbl =
2955  query_infos,
2956  memory_level,
2957  preferred_hash_type,
2958  deviceCountForMemoryLevel(memory_level),
2959  column_cache,
2960  this);
2961  return {tbl, ""};
2962  } catch (const HashJoinFail& e) {
2963  return {nullptr, e.what()};
2964  }
2965 }
2966 
2967 int8_t Executor::warpSize() const {
2968  CHECK(catalog_);
2969  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2970  CHECK(cuda_mgr);
2971  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2972  CHECK(!dev_props.empty());
2973  return dev_props.front().warpSize;
2974 }
2975 
2976 unsigned Executor::gridSize() const {
2977  CHECK(catalog_);
2978  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2979  CHECK(cuda_mgr);
2980  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2981  return grid_size_x_ ? grid_size_x_ : 2 * dev_props.front().numMPs;
2982 }
2983 
2984 unsigned Executor::blockSize() const {
2985  CHECK(catalog_);
2986  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2987  CHECK(cuda_mgr);
2988  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2989  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
2990 }
2991 
2992 int64_t Executor::deviceCycles(int milliseconds) const {
2993  CHECK(catalog_);
2994  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2995  CHECK(cuda_mgr);
2996  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2997  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
2998 }
2999 
3000 llvm::Value* Executor::castToFP(llvm::Value* val) {
3001  if (!val->getType()->isIntegerTy()) {
3002  return val;
3003  }
3004 
3005  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
3006  llvm::Type* dest_ty{nullptr};
3007  switch (val_width) {
3008  case 32:
3009  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
3010  break;
3011  case 64:
3012  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
3013  break;
3014  default:
3015  LOG(FATAL) << "Unsupported FP width: " << std::to_string(val_width);
3016  }
3017  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3018 }
3019 
3020 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3021  CHECK(val->getType()->isPointerTy());
3022 
3023  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3024  const auto val_type = val_ptr_type->getElementType();
3025  size_t val_width = 0;
3026  if (val_type->isIntegerTy()) {
3027  val_width = val_type->getIntegerBitWidth();
3028  } else {
3029  if (val_type->isFloatTy()) {
3030  val_width = 32;
3031  } else {
3032  CHECK(val_type->isDoubleTy());
3033  val_width = 64;
3034  }
3035  }
3036  CHECK_LT(size_t(0), val_width);
3037  if (bitWidth == val_width) {
3038  return val;
3039  }
3040  return cgen_state_->ir_builder_.CreateBitCast(
3041  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3042 }
3043 
3044 #define EXECUTE_INCLUDE
3045 #include "ArrayOps.cpp"
3046 #include "DateAdd.cpp"
3047 #include "StringFunctions.cpp"
3048 #undef EXECUTE_INCLUDE
3049 
3051  const CompilationOptions& co) {
3052  if (!co.add_delete_column) {
3053  return ra_exe_unit;
3054  }
3055  auto ra_exe_unit_with_deleted = ra_exe_unit;
3056  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3057  if (input_table.getSourceType() != InputSourceType::TABLE) {
3058  continue;
3059  }
3060  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3061  CHECK(td);
3062  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3063  if (!deleted_cd) {
3064  continue;
3065  }
3066  CHECK(deleted_cd->columnType.is_boolean());
3067  // check deleted column is not already present
3068  bool found = false;
3069  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3070  if (input_col.get()->getColId() == deleted_cd->columnId &&
3071  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3072  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3073  found = true;
3074  }
3075  }
3076  if (!found) {
3077  // add deleted column
3078  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3079  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3080  }
3081  }
3082  return ra_exe_unit_with_deleted;
3083 }
3084 
3085 namespace {
3086 
3087 int64_t get_hpt_scaled_value(const int64_t& val,
3088  const int32_t& ldim,
3089  const int32_t& rdim) {
3090  CHECK(ldim != rdim);
3091  return ldim > rdim ? val / DateTimeUtils::get_timestamp_precision_scale(ldim - rdim)
3092  : val * DateTimeUtils::get_timestamp_precision_scale(rdim - ldim);
3093 }
3094 
3095 } // namespace
3096 
3097 std::pair<bool, int64_t> Executor::skipFragment(
3098  const InputDescriptor& table_desc,
3099  const Fragmenter_Namespace::FragmentInfo& fragment,
3100  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3101  const std::vector<uint64_t>& frag_offsets,
3102  const size_t frag_idx) {
3103  const int table_id = table_desc.getTableId();
3104  for (const auto& simple_qual : simple_quals) {
3105  const auto comp_expr =
3106  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3107  if (!comp_expr) {
3108  // is this possible?
3109  return {false, -1};
3110  }
3111  const auto lhs = comp_expr->get_left_operand();
3112  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3113  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3114  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3115  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3116  if (lhs_uexpr) {
3117  CHECK(lhs_uexpr->get_optype() ==
3118  kCAST); // We should have only been passed a cast expression
3119  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3120  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3121  continue;
3122  }
3123  } else {
3124  continue;
3125  }
3126  }
3127  const auto rhs = comp_expr->get_right_operand();
3128  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3129  if (!rhs_const) {
3130  // is this possible?
3131  return {false, -1};
3132  }
3133  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3134  continue;
3135  }
3136  const int col_id = lhs_col->get_column_id();
3137  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3138  int64_t chunk_min{0};
3139  int64_t chunk_max{0};
3140  bool is_rowid{false};
3141  size_t start_rowid{0};
3142  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3143  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3144  if (cd->isVirtualCol) {
3145  CHECK(cd->columnName == "rowid");
3146  const auto& table_generation = getTableGeneration(table_id);
3147  start_rowid = table_generation.start_rowid;
3148  chunk_min = frag_offsets[frag_idx] + start_rowid;
3149  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3150  is_rowid = true;
3151  }
3152  } else {
3153  const auto& chunk_type = lhs_col->get_type_info();
3154  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3155  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3156  }
3157  if (lhs->get_type_info().is_timestamp() &&
3158  (lhs_col->get_type_info().get_dimension() !=
3159  rhs_const->get_type_info().get_dimension()) &&
3160  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3161  rhs_const->get_type_info().is_high_precision_timestamp())) {
3162  // If original timestamp lhs col has different precision,
3163  // column metadata holds value in original precision
3164  // therefore adjust value to match rhs precision
3165  const auto lhs_dimen = lhs_col->get_type_info().get_dimension();
3166  const auto rhs_dimen = rhs_const->get_type_info().get_dimension();
3167  chunk_min = get_hpt_scaled_value(chunk_min, lhs_dimen, rhs_dimen);
3168  chunk_max = get_hpt_scaled_value(chunk_max, lhs_dimen, rhs_dimen);
3169  }
3170  CodeGenerator code_generator(this);
3171  const auto rhs_val = code_generator.codegenIntConst(rhs_const)->getSExtValue();
3172  switch (comp_expr->get_optype()) {
3173  case kGE:
3174  if (chunk_max < rhs_val) {
3175  return {true, -1};
3176  }
3177  break;
3178  case kGT:
3179  if (chunk_max <= rhs_val) {
3180  return {true, -1};
3181  }
3182  break;
3183  case kLE:
3184  if (chunk_min > rhs_val) {
3185  return {true, -1};
3186  }
3187  break;
3188  case kLT:
3189  if (chunk_min >= rhs_val) {
3190  return {true, -1};
3191  }
3192  break;
3193  case kEQ:
3194  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3195  return {true, -1};
3196  } else if (is_rowid) {
3197  return {false, rhs_val - start_rowid};
3198  }
3199  break;
3200  default:
3201  break;
3202  }
3203  }
3204  return {false, -1};
3205 }
3206 
3207 /*
3208  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3209  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3210  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3211  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3212  * if at least one of these skipFragment calls return a true statment in its first value.
3213  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3214  * second value only if its first value is "false".
3215  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3216  * i.e., we only skip if none of the quals trigger the code to update the
3217  * rowid_lookup_key
3218  * - Only AND operations are valid and considered:
3219  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3220  * the skip
3221  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3222  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3223  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3224  * implemented differently, which cause the first one to skip correctly, but the second
3225  * one will not skip.
3226  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3227  * possible
3228  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3229  * projection, no skipping
3230  */
3231 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3232  const InputDescriptor& table_desc,
3233  const RelAlgExecutionUnit& ra_exe_unit,
3234  const Fragmenter_Namespace::FragmentInfo& fragment,
3235  const std::vector<uint64_t>& frag_offsets,
3236  const size_t frag_idx) {
3237  std::pair<bool, int64_t> skip_frag{false, -1};
3238  for (auto& inner_join : ra_exe_unit.join_quals) {
3239  if (inner_join.type != JoinType::INNER) {
3240  continue;
3241  }
3242 
3243  // extracting all the conjunctive simple_quals from the quals stored for the inner
3244  // join
3245  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3246  for (auto& qual : inner_join.quals) {
3247  auto temp_qual = qual_to_conjunctive_form(qual);
3248  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3249  temp_qual.simple_quals.begin(),
3250  temp_qual.simple_quals.end());
3251  }
3252  auto temp_skip_frag = skipFragment(
3253  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3254  if (temp_skip_frag.second != -1) {
3255  skip_frag.second = temp_skip_frag.second;
3256  return skip_frag;
3257  } else {
3258  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3259  }
3260  }
3261  return skip_frag;
3262 }
3263 
3265  const std::unordered_set<PhysicalInput>& phys_inputs) {
3266  AggregatedColRange agg_col_range_cache;
3267  CHECK(catalog_);
3268  std::unordered_set<int> phys_table_ids;
3269  for (const auto& phys_input : phys_inputs) {
3270  phys_table_ids.insert(phys_input.table_id);
3271  }
3272  std::vector<InputTableInfo> query_infos;
3273  for (const int table_id : phys_table_ids) {
3274  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3275  }
3276  for (const auto& phys_input : phys_inputs) {
3277  const auto cd =
3278  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3279  CHECK(cd);
3280  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3281  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3282  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3283  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3284  agg_col_range_cache.setColRange(phys_input, col_range);
3285  }
3286  }
3287  return agg_col_range_cache;
3288 }
3289 
3291  const std::unordered_set<PhysicalInput>& phys_inputs) {
3292  StringDictionaryGenerations string_dictionary_generations;
3293  CHECK(catalog_);
3294  for (const auto& phys_input : phys_inputs) {
3295  const auto cd =
3296  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3297  CHECK(cd);
3298  const auto& col_ti =
3299  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3300  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3301  const int dict_id = col_ti.get_comp_param();
3302  const auto dd = catalog_->getMetadataForDict(dict_id);
3303  CHECK(dd && dd->stringDict);
3304  string_dictionary_generations.setGeneration(dict_id,
3305  dd->stringDict->storageEntryCount());
3306  }
3307  }
3308  return string_dictionary_generations;
3309 }
3310 
3312  std::unordered_set<int> phys_table_ids) {
3313  TableGenerations table_generations;
3314  for (const int table_id : phys_table_ids) {
3315  const auto table_info = getTableInfo(table_id);
3316  table_generations.setGeneration(
3317  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
3318  }
3319  return table_generations;
3320 }
3321 
3322 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3323  const std::unordered_set<int>& phys_table_ids) {
3324  CHECK(catalog_);
3325  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
3326  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3327  string_dictionary_generations_ = computeStringDictionaryGenerations(phys_inputs);
3328  table_generations_ = computeTableGenerations(phys_table_ids);
3329 }
3330 
3332  return executor_session_mutex_;
3333 }
3334 
3335 template <>
3336 void Executor::setCurrentQuerySession(const std::string& query_session,
3337  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3338  if (current_query_session_ == "") {
3339  // only set the query session if it currently has an invalid session
3340  current_query_session_ = query_session;
3341  }
3342 }
3343 
3344 template <>
3346  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3347  return current_query_session_;
3348 }
3349 
3350 template <>
3351 bool Executor::checkCurrentQuerySession(const std::string& candidate_query_session,
3352  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3353  // if current_query_session is equal to the candidate_query_session,
3354  // or it is empty session we consider
3355  return (current_query_session_ == candidate_query_session);
3356 }
3357 
3358 template <>
3359 void Executor::invalidateQuerySession(mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3360  current_query_session_ = "";
3361 }
3362 
3363 template <>
3364 bool Executor::addToQuerySessionList(const std::string& query_session,
3365  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3366  return queries_interrupt_flag_.emplace(query_session, false).second;
3367 }
3368 
3369 template <>
3371  const std::string& query_session,
3372  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3373  return queries_interrupt_flag_.erase(query_session);
3374 }
3375 
3376 template <>
3378  const std::string& query_session,
3379  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3380  queries_interrupt_flag_[query_session] = true;
3381 }
3382 
3383 template <>
3385  const std::string& query_session,
3386  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3387  auto flag_it = queries_interrupt_flag_.find(query_session);
3388  return flag_it != queries_interrupt_flag_.end() && flag_it->second;
3389 }
3390 
3391 void Executor::enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const {
3392  // The only one scenario that we intentionally call this function is
3393  // to allow runtime query interrupt in QueryRunner for test cases.
3394  // Because test machine's default setting does not allow runtime query interrupt,
3395  // so we have to turn it on within test code if necessary.
3397  g_runtime_query_interrupt_frequency = interrupt_freq;
3398 }
3399 
3400 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
3401 std::mutex Executor::execute_mutex_;
3403 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
3404 std::string Executor::current_query_session_{""};
int get_table_id() const
Definition: Analyzer.h:194
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:147
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1017
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3264
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:86
ResultSetPtr collectAllDeviceResults(ExecutionDispatch &execution_dispatch, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1682
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2093
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:107
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:95
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
db_id_(db_id)
ExecutorDeviceType getDeviceType() const
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:3000
size_t g_gpu_smem_threshold
Definition: Execute.cpp:109
double g_bump_allocator_step_reduction
Definition: Execute.cpp:104
std::string cat(Ts &&...args)
gpu_active_modules_device_mask_(0x0)
block_size_x_(block_size_x)
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1036
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:139
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1097
bool g_enable_bump_allocator
Definition: Execute.cpp:103
grid_size_x_(grid_size_x)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:94
std::unique_ptr< llvm::Module > udf_gpu_module
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:257
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:1989
bool g_enable_direct_columnarization
Definition: Execute.cpp:105
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1031
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &)
Definition: Execute.cpp:2195
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1521
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:977
static const int max_gpu_count
Definition: Execute.h:984
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2524
std::unordered_map< int, CgenState::LiteralValues > literal_values
Definition: Execute.h:465
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
const std::optional< bool > union_all
int64_t float_to_double_bin(int32_t val, bool nullable=false)
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:66
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const InputTableInfo &table_info, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
std::string & getCurrentQuerySession(SESSION_MAP_LOCK &read_lock)
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1023
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:413
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1025
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:303
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:382
Definition: sqldefs.h:36
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1144
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3311
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:241
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:210
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:630
Definition: sqldefs.h:49
Definition: sqldefs.h:30
void assignFragsToKernelDispatch(DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:315
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:88
const RelAlgExecutionUnit & getExecutionUnit() const
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
void setQuerySessionAsInterrupted(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1589
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:74
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1001
bool addToQuerySessionList(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:121
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:79
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2483
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:613
bool g_is_test_env
Definition: Execute.cpp:115
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3020
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:414
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &)
Definition: Execute.cpp:2291
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:227
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1776
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:415
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2170
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:345
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3331
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:90
gpu_code_cache_(code_cache_size)
size_t g_max_memory_allocation_size
Definition: Execute.cpp:99
bool g_enable_watchdog
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:172
int8_t get_min_byte_width()
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:86
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:81
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:265
cpu_code_cache_(code_cache_size)
ExecutorDispatchMode
static const size_t high_scan_limit
Definition: Execute.h:421
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:112
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:826
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2729
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:2888
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:2917
bool g_null_div_by_zero
Definition: Execute.cpp:78
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:122
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:189
int8_t warpSize() const
Definition: Execute.cpp:2967
const size_t limit
bool g_use_tbb_pool
Definition: Execute.cpp:75
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:950
bool g_enable_columnar_output
Definition: Execute.cpp:89
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:238
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1028
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
static SysCatalog & instance()
Definition: SysCatalog.h:256
const bool allow_multifrag
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
bool g_from_table_reordering
Definition: Execute.cpp:80
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:123
Classes representing a parse tree.
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:91
std::map< std::string, bool > InterruptFlagMap
Definition: Execute.h:104
size_t g_min_memory_allocation_size
Definition: Execute.cpp:100
int getDeviceCount() const
Definition: CudaMgr.h:96
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:989
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:177
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:2992
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:276
bool is_integer() const
Definition: sqltypes.h:411
#define INJECT_TIMER(DESC)
Definition: measure.h:91
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:376
#define CHECK_NE(x, y)
Definition: Logger.h:206
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:866
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1122
std::shared_timed_mutex mapd_shared_mutex
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:141
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:253
const std::vector< uint64_t > & getFragOffsets() const
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2102
ExecutorExplainType explain_type
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1574
size_t g_big_group_threshold
Definition: Execute.cpp:96
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1034
float g_filter_push_down_high_frac
Definition: Execute.cpp:87
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1020
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1035
std::string bool_to_string(const bool val)
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:269
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:52
void setGeneration(const uint32_t id, const size_t generation)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:117
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1051
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:232
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:2903
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3050
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
interrupted_(false)
bool is_boolean() const
Definition: sqltypes.h:416
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:93
void dispatchFragments(const std::function< void(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)> dispatch, const ExecutionDispatch &execution_dispatch, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, QueryFragmentDescriptor &fragment_descriptor, std::unordered_set< int > &available_gpus, int &available_cpus)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1041
const std::shared_ptr< Analyzer::Estimator > estimator
int64_t get_hpt_scaled_value(const int64_t &val, const int32_t &ldim, const int32_t &rdim)
Definition: Execute.cpp:3087
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1022
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:258
ResultSetPtr resultsUnion(ExecutionDispatch &execution_dispatch)
Definition: Execute.cpp:840
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1222
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:249
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2536
bool g_enable_window_functions
Definition: Execute.cpp:97
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1157
InputSourceType getSourceType() const
static std::shared_ptr< JoinHashTableInterface > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:108
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:199
static std::string current_query_session_
Definition: Execute.h:1018
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2438
void enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const
Definition: Execute.cpp:3391
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:351
void invalidateQuerySession(SESSION_MAP_LOCK &write_lock)
std::vector< std::pair< void *, void * > > native_functions
Definition: Execute.h:464
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1241
ResultSetPtr collectAllDeviceShardedTopResults(ExecutionDispatch &execution_dispatch) const
Definition: Execute.cpp:1797
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3097
bool g_enable_experimental_string_functions
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:2976
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2121
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t frag_idx
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1726
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3290
std::unique_ptr< llvm::Module > udf_cpu_module
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:492
bool g_cache_string_hash
Definition: Execute.cpp:92
bool removeFromQuerySessionList(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
void assignFragsToMultiDispatch(DISPATCH_FCN f) const
bool g_enable_table_functions
Definition: Execute.cpp:98
bool checkIsQuerySessionInterrupted(const std::string &query_session, SESSION_MAP_LOCK &read_lock)
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2722
std::vector< std::vector< const int8_t * > > col_buffers
Definition: Execute.h:339
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3231
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2452
llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant)
Definition: ConstantIR.cpp:88
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1645
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:574
ThreadId thread_id()
Definition: Logger.cpp:715
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1043
bool g_enable_filter_push_down
Definition: Execute.cpp:85
std::list< std::shared_ptr< Analyzer::Expr > > quals
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:891
GpuSharedMemoryContext gpu_smem_context
Definition: Execute.h:468
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define DEBUG_TIMER(name)
Definition: Logger.h:313
size_t getNumBytesForFetchedRow() const
Definition: Execute.cpp:277
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
const int8_t * literals
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:77
std::vector< std::vector< int64_t > > num_rows
Definition: Execute.h:340
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
const Expr * get_left_operand() const
Definition: Analyzer.h:436
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1284
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1845
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1553
std::shared_ptr< const query_state::QueryState > query_state
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
bool checkCurrentQuerySession(const std::string &candidate_query_session, SESSION_MAP_LOCK &read_lock)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:273
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:60
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:195
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:409
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1476
unsigned blockSize() const
Definition: Execute.cpp:2984
std::vector< std::vector< uint64_t > > frag_offsets
Definition: Execute.h:341
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:265
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:76
Definition: sqldefs.h:74
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
int cpu_threads()
Definition: thread_count.h:25
static std::mutex execute_mutex_
Definition: Execute.h:1024
bool is_decimal() const
Definition: sqltypes.h:412
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:623
temporary_tables_(nullptr)
Descriptor for the fragments required for a query.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:197
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1860
int getNestLevel() const
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2937
const InputDescriptor & getScanDesc() const
#define IS_GEO(T)
Definition: sqltypes.h:173
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:107
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:201
#define VLOG(n)
Definition: Logger.h:291
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:261
const Executor * getExecutor() const
const bool with_watchdog
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3322
void clearMetaInfoCache()
Definition: Execute.cpp:344
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:148
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
void setCurrentQuerySession(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2031
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1547
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)