OmniSciDB  29e35f4d58
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "BaselineJoinHashTable.h"
21 #include "CodeGenerator.h"
22 #include "ColumnFetcher.h"
25 #include "DynamicWatchdog.h"
26 #include "EquiJoinCondition.h"
27 #include "ErrorHandling.h"
28 #include "ExpressionRewrite.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
32 #include "JsonAccessors.h"
34 #include "OverlapsJoinHashTable.h"
35 #include "QueryRewrite.h"
36 #include "QueryTemplateGenerator.h"
37 #include "ResultSetReductionJIT.h"
38 #include "RuntimeFunctions.h"
39 #include "SpeculativeTopN.h"
40 
43 
44 #include "CudaMgr/CudaMgr.h"
46 #include "Parser/ParserNode.h"
48 #include "Shared/MapDParameters.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/scope.h"
53 #include "Shared/shard_key.h"
54 
55 #include "AggregatedColRange.h"
57 
58 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
59 #include <boost/filesystem/operations.hpp>
60 #include <boost/filesystem/path.hpp>
61 
62 #ifdef HAVE_CUDA
63 #include <cuda.h>
64 #endif // HAVE_CUDA
65 #include <future>
66 #include <memory>
67 #include <numeric>
68 #include <set>
69 #include <thread>
70 
71 bool g_enable_watchdog{false};
74 bool g_allow_cpu_retry{true};
75 bool g_null_div_by_zero{false};
79 extern bool g_enable_smem_group_by;
80 extern std::unique_ptr<llvm::Module> udf_gpu_module;
81 extern std::unique_ptr<llvm::Module> udf_cpu_module;
88 bool g_cache_string_hash{false};
89 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
92 size_t g_big_group_threshold{20000};
95 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
97  256}; // minimum memory allocation required for projection query output buffer
98  // without pre-flight count
103 
104 int const Executor::max_gpu_count;
105 
107 
108 Executor::Executor(const int db_id,
109  const size_t block_size_x,
110  const size_t grid_size_x,
111  const std::string& debug_dir,
112  const std::string& debug_file)
113  : cgen_state_(new CgenState({}, false))
115  , interrupted_(false)
118  , block_size_x_(block_size_x)
119  , grid_size_x_(grid_size_x)
120  , debug_dir_(debug_dir)
121  , debug_file_(debug_file)
122  , db_id_(db_id)
123  , catalog_(nullptr)
124  , temporary_tables_(nullptr)
125  , input_table_info_cache_(this) {}
126 
127 std::shared_ptr<Executor> Executor::getExecutor(const int db_id,
128  const std::string& debug_dir,
129  const std::string& debug_file,
130  const MapDParameters mapd_parameters) {
132  const auto executor_key = db_id;
133  {
134  mapd_shared_lock<mapd_shared_mutex> read_lock(executors_cache_mutex_);
135  auto it = executors_.find(executor_key);
136  if (it != executors_.end()) {
137  return it->second;
138  }
139  }
140  {
141  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
142  auto it = executors_.find(executor_key);
143  if (it != executors_.end()) {
144  return it->second;
145  }
146  auto executor = std::make_shared<Executor>(db_id,
147  mapd_parameters.cuda_block_size,
148  mapd_parameters.cuda_grid_size,
149  debug_dir,
150  debug_file);
151  auto it_ok = executors_.insert(std::make_pair(executor_key, executor));
152  CHECK(it_ok.second);
153  return executor;
154  }
155 }
156 
158  switch (memory_level) {
161  std::lock_guard<std::mutex> flush_lock(
162  execute_mutex_); // Don't flush memory while queries are running
163 
165  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
166  // The hash table cache uses CPU memory not managed by the buffer manager. In the
167  // future, we should manage these allocations with the buffer manager directly.
168  // For now, assume the user wants to purge the hash table cache when they clear
169  // CPU memory (currently used in ExecuteTest to lower memory pressure)
171  }
172  break;
173  }
174  default: {
175  throw std::runtime_error(
176  "Clearing memory levels other than the CPU level or GPU level is not "
177  "supported.");
178  }
179  }
180 }
181 
183  const int dict_id_in,
184  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
185  const bool with_generation) const {
186  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
187  CHECK(catalog_);
188  const auto dd = catalog_->getMetadataForDict(dict_id);
189  std::lock_guard<std::mutex> lock(str_dict_mutex_);
190  if (dd) {
191  CHECK(dd->stringDict);
192  CHECK_LE(dd->dictNBits, 32);
193  CHECK(row_set_mem_owner);
194  const auto generation = with_generation
196  : ssize_t(-1);
197  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
198  }
199  CHECK_EQ(0, dict_id);
200  if (!lit_str_dict_proxy_) {
201  std::shared_ptr<StringDictionary> tsd =
202  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
203  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
204  }
205  return lit_str_dict_proxy_.get();
206 }
207 
208 bool Executor::isCPUOnly() const {
209  CHECK(catalog_);
210  return !catalog_->getDataMgr().getCudaMgr();
211 }
212 
214  const Analyzer::ColumnVar* col_var) const {
216  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
217 }
218 
220  const Analyzer::ColumnVar* col_var,
221  int n) const {
222  const auto cd = getColumnDescriptor(col_var);
223  if (!cd || n > cd->columnType.get_physical_cols()) {
224  return nullptr;
225  }
227  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
228 }
229 
231  return catalog_;
232 }
233 
235  catalog_ = catalog;
236 }
237 
238 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
239  return row_set_mem_owner_;
240 }
241 
243  return temporary_tables_;
244 }
245 
247  return input_table_info_cache_.getTableInfo(table_id);
248 }
249 
250 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
251  return table_generations_.getGeneration(table_id);
252 }
253 
255  return agg_col_range_cache_.getColRange(phys_input);
256 }
257 
259  size_t num_bytes = 0;
260  if (!plan_state_) {
261  return 0;
262  }
263  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
264  if (fetched_col_pair.first < 0) {
265  num_bytes += 8;
266  } else {
267  const auto cd =
268  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
269  const auto& ti = cd->columnType;
270  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
271  ? 4
272  : ti.get_size();
273  if (sz < 0) {
274  // for varlen types, only account for the pointer/size for each row, for now
275  num_bytes += 16;
276  } else {
277  num_bytes += sz;
278  }
279  }
280  }
281  return num_bytes;
282 }
283 
284 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
285  const std::vector<Analyzer::Expr*>& target_exprs) const {
287  CHECK(catalog_);
288  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
289  for (const auto target_expr : target_exprs) {
290  if (!plan_state_->isLazyFetchColumn(target_expr)) {
291  col_lazy_fetch_info.emplace_back(
292  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
293  } else {
294  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
295  CHECK(col_var);
296  auto col_id = col_var->get_column_id();
297  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
298  auto cd = (col_var->get_table_id() > 0)
299  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
300  : nullptr;
301  if (cd && IS_GEO(cd->columnType.get_type())) {
302  // Geo coords cols will be processed in sequence. So we only need to track the
303  // first coords col in lazy fetch info.
304  {
305  auto cd0 =
306  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
307  auto col0_ti = cd0->columnType;
308  CHECK(!cd0->isVirtualCol);
309  auto col0_var = makeExpr<Analyzer::ColumnVar>(
310  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
311  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
312  col_lazy_fetch_info.emplace_back(
313  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
314  }
315  } else {
316  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
317  const auto& col_ti = col_var->get_type_info();
318  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
319  }
320  }
321  }
322  return col_lazy_fetch_info;
323 }
324 
330 }
331 
332 std::vector<int8_t> Executor::serializeLiterals(
333  const std::unordered_map<int, CgenState::LiteralValues>& literals,
334  const int device_id) {
335  if (literals.empty()) {
336  return {};
337  }
338  const auto dev_literals_it = literals.find(device_id);
339  CHECK(dev_literals_it != literals.end());
340  const auto& dev_literals = dev_literals_it->second;
341  size_t lit_buf_size{0};
342  std::vector<std::string> real_strings;
343  std::vector<std::vector<double>> double_array_literals;
344  std::vector<std::vector<int8_t>> align64_int8_array_literals;
345  std::vector<std::vector<int32_t>> int32_array_literals;
346  std::vector<std::vector<int8_t>> align32_int8_array_literals;
347  std::vector<std::vector<int8_t>> int8_array_literals;
348  for (const auto& lit : dev_literals) {
349  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
350  if (lit.which() == 7) {
351  const auto p = boost::get<std::string>(&lit);
352  CHECK(p);
353  real_strings.push_back(*p);
354  } else if (lit.which() == 8) {
355  const auto p = boost::get<std::vector<double>>(&lit);
356  CHECK(p);
357  double_array_literals.push_back(*p);
358  } else if (lit.which() == 9) {
359  const auto p = boost::get<std::vector<int32_t>>(&lit);
360  CHECK(p);
361  int32_array_literals.push_back(*p);
362  } else if (lit.which() == 10) {
363  const auto p = boost::get<std::vector<int8_t>>(&lit);
364  CHECK(p);
365  int8_array_literals.push_back(*p);
366  } else if (lit.which() == 11) {
367  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
368  CHECK(p);
369  if (p->second == 64) {
370  align64_int8_array_literals.push_back(p->first);
371  } else if (p->second == 32) {
372  align32_int8_array_literals.push_back(p->first);
373  } else {
374  CHECK(false);
375  }
376  }
377  }
378  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
379  throw TooManyLiterals();
380  }
381  int16_t crt_real_str_off = lit_buf_size;
382  for (const auto& real_str : real_strings) {
383  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
384  lit_buf_size += real_str.size();
385  }
386  if (double_array_literals.size() > 0) {
387  lit_buf_size = align(lit_buf_size, sizeof(double));
388  }
389  int16_t crt_double_arr_lit_off = lit_buf_size;
390  for (const auto& double_array_literal : double_array_literals) {
391  CHECK_LE(double_array_literal.size(),
392  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
393  lit_buf_size += double_array_literal.size() * sizeof(double);
394  }
395  if (align64_int8_array_literals.size() > 0) {
396  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
397  }
398  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
399  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
400  CHECK_LE(align64_int8_array_literals.size(),
401  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
402  lit_buf_size += align64_int8_array_literal.size();
403  }
404  if (int32_array_literals.size() > 0) {
405  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
406  }
407  int16_t crt_int32_arr_lit_off = lit_buf_size;
408  for (const auto& int32_array_literal : int32_array_literals) {
409  CHECK_LE(int32_array_literal.size(),
410  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
411  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
412  }
413  if (align32_int8_array_literals.size() > 0) {
414  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
415  }
416  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
417  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
418  CHECK_LE(align32_int8_array_literals.size(),
419  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
420  lit_buf_size += align32_int8_array_literal.size();
421  }
422  int16_t crt_int8_arr_lit_off = lit_buf_size;
423  for (const auto& int8_array_literal : int8_array_literals) {
424  CHECK_LE(int8_array_literal.size(),
425  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
426  lit_buf_size += int8_array_literal.size();
427  }
428  unsigned crt_real_str_idx = 0;
429  unsigned crt_double_arr_lit_idx = 0;
430  unsigned crt_align64_int8_arr_lit_idx = 0;
431  unsigned crt_int32_arr_lit_idx = 0;
432  unsigned crt_align32_int8_arr_lit_idx = 0;
433  unsigned crt_int8_arr_lit_idx = 0;
434  std::vector<int8_t> serialized(lit_buf_size);
435  size_t off{0};
436  for (const auto& lit : dev_literals) {
437  const auto lit_bytes = CgenState::literalBytes(lit);
438  off = CgenState::addAligned(off, lit_bytes);
439  switch (lit.which()) {
440  case 0: {
441  const auto p = boost::get<int8_t>(&lit);
442  CHECK(p);
443  serialized[off - lit_bytes] = *p;
444  break;
445  }
446  case 1: {
447  const auto p = boost::get<int16_t>(&lit);
448  CHECK(p);
449  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
450  break;
451  }
452  case 2: {
453  const auto p = boost::get<int32_t>(&lit);
454  CHECK(p);
455  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
456  break;
457  }
458  case 3: {
459  const auto p = boost::get<int64_t>(&lit);
460  CHECK(p);
461  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
462  break;
463  }
464  case 4: {
465  const auto p = boost::get<float>(&lit);
466  CHECK(p);
467  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
468  break;
469  }
470  case 5: {
471  const auto p = boost::get<double>(&lit);
472  CHECK(p);
473  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
474  break;
475  }
476  case 6: {
477  const auto p = boost::get<std::pair<std::string, int>>(&lit);
478  CHECK(p);
479  const auto str_id =
481  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
482  ->getOrAddTransient(p->first)
483  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
484  ->getIdOfString(p->first);
485  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
486  break;
487  }
488  case 7: {
489  const auto p = boost::get<std::string>(&lit);
490  CHECK(p);
491  int32_t off_and_len = crt_real_str_off << 16;
492  const auto& crt_real_str = real_strings[crt_real_str_idx];
493  off_and_len |= static_cast<int16_t>(crt_real_str.size());
494  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
495  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
496  ++crt_real_str_idx;
497  crt_real_str_off += crt_real_str.size();
498  break;
499  }
500  case 8: {
501  const auto p = boost::get<std::vector<double>>(&lit);
502  CHECK(p);
503  int32_t off_and_len = crt_double_arr_lit_off << 16;
504  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
505  int32_t len = crt_double_arr_lit.size();
506  CHECK_EQ((len >> 16), 0);
507  off_and_len |= static_cast<int16_t>(len);
508  int32_t double_array_bytesize = len * sizeof(double);
509  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
510  memcpy(&serialized[crt_double_arr_lit_off],
511  crt_double_arr_lit.data(),
512  double_array_bytesize);
513  ++crt_double_arr_lit_idx;
514  crt_double_arr_lit_off += double_array_bytesize;
515  break;
516  }
517  case 9: {
518  const auto p = boost::get<std::vector<int32_t>>(&lit);
519  CHECK(p);
520  int32_t off_and_len = crt_int32_arr_lit_off << 16;
521  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
522  int32_t len = crt_int32_arr_lit.size();
523  CHECK_EQ((len >> 16), 0);
524  off_and_len |= static_cast<int16_t>(len);
525  int32_t int32_array_bytesize = len * sizeof(int32_t);
526  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
527  memcpy(&serialized[crt_int32_arr_lit_off],
528  crt_int32_arr_lit.data(),
529  int32_array_bytesize);
530  ++crt_int32_arr_lit_idx;
531  crt_int32_arr_lit_off += int32_array_bytesize;
532  break;
533  }
534  case 10: {
535  const auto p = boost::get<std::vector<int8_t>>(&lit);
536  CHECK(p);
537  int32_t off_and_len = crt_int8_arr_lit_off << 16;
538  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
539  int32_t len = crt_int8_arr_lit.size();
540  CHECK_EQ((len >> 16), 0);
541  off_and_len |= static_cast<int16_t>(len);
542  int32_t int8_array_bytesize = len;
543  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
544  memcpy(&serialized[crt_int8_arr_lit_off],
545  crt_int8_arr_lit.data(),
546  int8_array_bytesize);
547  ++crt_int8_arr_lit_idx;
548  crt_int8_arr_lit_off += int8_array_bytesize;
549  break;
550  }
551  case 11: {
552  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
553  CHECK(p);
554  if (p->second == 64) {
555  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
556  const auto& crt_align64_int8_arr_lit =
557  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
558  int32_t len = crt_align64_int8_arr_lit.size();
559  CHECK_EQ((len >> 16), 0);
560  off_and_len |= static_cast<int16_t>(len);
561  int32_t align64_int8_array_bytesize = len;
562  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
563  memcpy(&serialized[crt_align64_int8_arr_lit_off],
564  crt_align64_int8_arr_lit.data(),
565  align64_int8_array_bytesize);
566  ++crt_align64_int8_arr_lit_idx;
567  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
568  } else if (p->second == 32) {
569  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
570  const auto& crt_align32_int8_arr_lit =
571  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
572  int32_t len = crt_align32_int8_arr_lit.size();
573  CHECK_EQ((len >> 16), 0);
574  off_and_len |= static_cast<int16_t>(len);
575  int32_t align32_int8_array_bytesize = len;
576  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
577  memcpy(&serialized[crt_align32_int8_arr_lit_off],
578  crt_align32_int8_arr_lit.data(),
579  align32_int8_array_bytesize);
580  ++crt_align32_int8_arr_lit_idx;
581  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
582  } else {
583  CHECK(false);
584  }
585  break;
586  }
587  default:
588  CHECK(false);
589  }
590  }
591  return serialized;
592 }
593 
594 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
595  if (device_type == ExecutorDeviceType::GPU) {
596  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
597  CHECK(cuda_mgr);
598  return cuda_mgr->getDeviceCount();
599  } else {
600  return 1;
601  }
602 }
603 
605  const Data_Namespace::MemoryLevel memory_level) const {
606  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
608 }
609 
610 // TODO(alex): remove or split
611 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
612  const SQLTypeInfo& ti,
613  const int64_t agg_init_val,
614  const int8_t out_byte_width,
615  const int64_t* out_vec,
616  const size_t out_vec_sz,
617  const bool is_group_by,
618  const bool float_argument_input) {
619  switch (agg) {
620  case kAVG:
621  case kSUM:
622  if (0 != agg_init_val) {
623  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
624  int64_t agg_result = agg_init_val;
625  for (size_t i = 0; i < out_vec_sz; ++i) {
626  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
627  }
628  return {agg_result, 0};
629  } else {
630  CHECK(ti.is_fp());
631  switch (out_byte_width) {
632  case 4: {
633  int agg_result = static_cast<int32_t>(agg_init_val);
634  for (size_t i = 0; i < out_vec_sz; ++i) {
636  &agg_result,
637  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
638  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
639  }
640  const int64_t converted_bin =
641  float_argument_input
642  ? static_cast<int64_t>(agg_result)
643  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
644  return {converted_bin, 0};
645  break;
646  }
647  case 8: {
648  int64_t agg_result = agg_init_val;
649  for (size_t i = 0; i < out_vec_sz; ++i) {
651  &agg_result,
652  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
653  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
654  }
655  return {agg_result, 0};
656  break;
657  }
658  default:
659  CHECK(false);
660  }
661  }
662  }
663  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
664  int64_t agg_result = 0;
665  for (size_t i = 0; i < out_vec_sz; ++i) {
666  agg_result += out_vec[i];
667  }
668  return {agg_result, 0};
669  } else {
670  CHECK(ti.is_fp());
671  switch (out_byte_width) {
672  case 4: {
673  float r = 0.;
674  for (size_t i = 0; i < out_vec_sz; ++i) {
675  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
676  }
677  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
678  const int64_t converted_bin =
679  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
680  return {converted_bin, 0};
681  }
682  case 8: {
683  double r = 0.;
684  for (size_t i = 0; i < out_vec_sz; ++i) {
685  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
686  }
687  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
688  }
689  default:
690  CHECK(false);
691  }
692  }
693  break;
694  case kCOUNT: {
695  uint64_t agg_result = 0;
696  for (size_t i = 0; i < out_vec_sz; ++i) {
697  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
698  agg_result += out;
699  }
700  return {static_cast<int64_t>(agg_result), 0};
701  }
702  case kMIN: {
703  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
704  int64_t agg_result = agg_init_val;
705  for (size_t i = 0; i < out_vec_sz; ++i) {
706  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
707  }
708  return {agg_result, 0};
709  } else {
710  switch (out_byte_width) {
711  case 4: {
712  int32_t agg_result = static_cast<int32_t>(agg_init_val);
713  for (size_t i = 0; i < out_vec_sz; ++i) {
715  &agg_result,
716  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
717  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
718  }
719  const int64_t converted_bin =
720  float_argument_input
721  ? static_cast<int64_t>(agg_result)
722  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
723  return {converted_bin, 0};
724  }
725  case 8: {
726  int64_t agg_result = agg_init_val;
727  for (size_t i = 0; i < out_vec_sz; ++i) {
729  &agg_result,
730  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
731  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
732  }
733  return {agg_result, 0};
734  }
735  default:
736  CHECK(false);
737  }
738  }
739  }
740  case kMAX:
741  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
742  int64_t agg_result = agg_init_val;
743  for (size_t i = 0; i < out_vec_sz; ++i) {
744  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
745  }
746  return {agg_result, 0};
747  } else {
748  switch (out_byte_width) {
749  case 4: {
750  int32_t agg_result = static_cast<int32_t>(agg_init_val);
751  for (size_t i = 0; i < out_vec_sz; ++i) {
753  &agg_result,
754  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
755  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
756  }
757  const int64_t converted_bin =
758  float_argument_input ? static_cast<int64_t>(agg_result)
759  : float_to_double_bin(agg_result, !ti.get_notnull());
760  return {converted_bin, 0};
761  }
762  case 8: {
763  int64_t agg_result = agg_init_val;
764  for (size_t i = 0; i < out_vec_sz; ++i) {
766  &agg_result,
767  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
768  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
769  }
770  return {agg_result, 0};
771  }
772  default:
773  CHECK(false);
774  }
775  }
776  case kSINGLE_VALUE: {
777  int64_t agg_result = agg_init_val;
778  for (size_t i = 0; i < out_vec_sz; ++i) {
779  if (out_vec[i] != agg_init_val) {
780  if (agg_result == agg_init_val) {
781  agg_result = out_vec[i];
782  } else if (out_vec[i] != agg_result) {
784  }
785  }
786  }
787  return {agg_result, 0};
788  }
789  case kSAMPLE: {
790  int64_t agg_result = agg_init_val;
791  for (size_t i = 0; i < out_vec_sz; ++i) {
792  if (out_vec[i] != agg_init_val) {
793  agg_result = out_vec[i];
794  break;
795  }
796  }
797  return {agg_result, 0};
798  }
799  default:
800  CHECK(false);
801  }
802  abort();
803 }
804 
805 namespace {
806 
808  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
809  auto& first = results_per_device.front().first;
810  CHECK(first);
811  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
812  const auto& next = results_per_device[dev_idx].first;
813  CHECK(next);
814  first->append(*next);
815  }
816  return std::move(first);
817 }
818 
819 } // namespace
820 
822  auto& results_per_device = execution_dispatch.getFragmentResults();
823  if (results_per_device.empty()) {
824  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
825  std::vector<TargetInfo> targets;
826  for (const auto target_expr : ra_exe_unit.target_exprs) {
827  targets.push_back(get_target_info(target_expr, g_bigint_count));
828  }
829  return std::make_shared<ResultSet>(targets,
833  this);
834  }
835  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
836  std::sort(results_per_device.begin(),
837  results_per_device.end(),
838  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
839  CHECK_GE(lhs.second.size(), size_t(1));
840  CHECK_GE(rhs.second.size(), size_t(1));
841  return lhs.second.front() < rhs.second.front();
842  });
843 
844  return get_merged_result(results_per_device);
845 }
846 
848  const RelAlgExecutionUnit& ra_exe_unit,
849  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
850  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
851  const QueryMemoryDescriptor& query_mem_desc) const {
852  if (ra_exe_unit.estimator) {
853  return reduce_estimator_results(ra_exe_unit, results_per_device);
854  }
855 
856  if (results_per_device.empty()) {
857  std::vector<TargetInfo> targets;
858  for (const auto target_expr : ra_exe_unit.target_exprs) {
859  targets.push_back(get_target_info(target_expr, g_bigint_count));
860  }
861  return std::make_shared<ResultSet>(
862  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
863  }
864 
866  results_per_device,
867  row_set_mem_owner,
868  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
869 }
870 
872  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
873  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
874  const QueryMemoryDescriptor& query_mem_desc) const {
875  auto timer = DEBUG_TIMER(__func__);
876  std::shared_ptr<ResultSet> reduced_results;
877 
878  const auto& first = results_per_device.front().first;
879 
880  if (query_mem_desc.getQueryDescriptionType() ==
882  results_per_device.size() > 1) {
883  const auto total_entry_count = std::accumulate(
884  results_per_device.begin(),
885  results_per_device.end(),
886  size_t(0),
887  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
888  const auto& r = rs.first;
889  return init + r->getQueryMemDesc().getEntryCount();
890  });
891  CHECK(total_entry_count);
892  auto query_mem_desc = first->getQueryMemDesc();
893  query_mem_desc.setEntryCount(total_entry_count);
894  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
896  query_mem_desc,
897  row_set_mem_owner,
898  this);
899  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
900  reduced_results->initializeStorage();
901  switch (query_mem_desc.getEffectiveKeyWidth()) {
902  case 4:
903  first->getStorage()->moveEntriesToBuffer<int32_t>(
904  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
905  break;
906  case 8:
907  first->getStorage()->moveEntriesToBuffer<int64_t>(
908  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
909  break;
910  default:
911  CHECK(false);
912  }
913  } else {
914  reduced_results = first;
915  }
916 
917  const auto& this_result_set = results_per_device[0].first;
918  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
919  this_result_set->getTargetInfos(),
920  this_result_set->getTargetInitVals());
921  const auto reduction_code = reduction_jit.codegen();
922  for (size_t i = 1; i < results_per_device.size(); ++i) {
923  reduced_results->getStorage()->reduce(
924  *(results_per_device[i].first->getStorage()), {}, reduction_code);
925  }
926 
927  return reduced_results;
928 }
929 
931  const RelAlgExecutionUnit& ra_exe_unit,
932  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
933  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
934  const QueryMemoryDescriptor& query_mem_desc) const {
935  if (results_per_device.size() == 1) {
936  return std::move(results_per_device.front().first);
937  }
938  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
940  for (const auto& result : results_per_device) {
941  auto rows = result.first;
942  CHECK(rows);
943  if (!rows) {
944  continue;
945  }
946  SpeculativeTopNMap that(
947  *rows,
948  ra_exe_unit.target_exprs,
949  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
950  m.reduce(that);
951  }
952  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
953  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
954  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
955 }
956 
957 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
958  std::unordered_set<int> available_gpus;
959  if (cat.getDataMgr().gpusPresent()) {
960  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
961  CHECK_GT(gpu_count, 0);
962  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
963  available_gpus.insert(gpu_id);
964  }
965  }
966  return available_gpus;
967 }
968 
969 size_t get_context_count(const ExecutorDeviceType device_type,
970  const size_t cpu_count,
971  const size_t gpu_count) {
972  return device_type == ExecutorDeviceType::GPU ? gpu_count
973  : static_cast<size_t>(cpu_count);
974 }
975 
976 namespace {
977 
978 // Compute a very conservative entry count for the output buffer entry count using no
979 // other information than the number of tuples in each table and multiplying them
980 // together.
981 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
983  // Check for overflows since we're multiplying potentially big table sizes.
984  using checked_size_t = boost::multiprecision::number<
985  boost::multiprecision::cpp_int_backend<64,
986  64,
987  boost::multiprecision::unsigned_magnitude,
988  boost::multiprecision::checked,
989  void>>;
990  checked_size_t max_groups_buffer_entry_guess = 1;
991  for (const auto& query_info : query_infos) {
992  CHECK(!query_info.info.fragments.empty());
993  auto it = std::max_element(query_info.info.fragments.begin(),
994  query_info.info.fragments.end(),
995  [](const FragmentInfo& f1, const FragmentInfo& f2) {
996  return f1.getNumTuples() < f2.getNumTuples();
997  });
998  max_groups_buffer_entry_guess *= it->getNumTuples();
999  }
1000  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1001  // baseline group layout with that many entries anyway.
1002  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1003  try {
1004  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1005  max_groups_buffer_entry_guess_cap);
1006  } catch (...) {
1007  return max_groups_buffer_entry_guess_cap;
1008  }
1009 }
1010 
1011 std::string get_table_name(const InputDescriptor& input_desc,
1012  const Catalog_Namespace::Catalog& cat) {
1013  const auto source_type = input_desc.getSourceType();
1014  if (source_type == InputSourceType::TABLE) {
1015  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1016  CHECK(td);
1017  return td->tableName;
1018  } else {
1019  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1020  }
1021 }
1022 
1023 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1024  const int device_count) {
1025  if (device_type == ExecutorDeviceType::GPU) {
1026  return device_count * Executor::high_scan_limit;
1027  }
1029 }
1030 
1032  const std::vector<InputTableInfo>& table_infos,
1033  const Catalog_Namespace::Catalog& cat,
1034  const ExecutorDeviceType device_type,
1035  const int device_count) {
1036  for (const auto target_expr : ra_exe_unit.target_exprs) {
1037  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1038  return;
1039  }
1040  }
1041  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1042  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1043  // Allow a query with no scan limit to run on small tables
1044  return;
1045  }
1046  if (ra_exe_unit.use_bump_allocator) {
1047  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1048  // relative to the size of the input), so we bypass this check for now
1049  return;
1050  }
1051  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1052  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1053  (!ra_exe_unit.scan_limit ||
1054  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1055  std::vector<std::string> table_names;
1056  const auto& input_descs = ra_exe_unit.input_descs;
1057  for (const auto& input_desc : input_descs) {
1058  table_names.push_back(get_table_name(input_desc, cat));
1059  }
1060  if (!ra_exe_unit.scan_limit) {
1061  throw WatchdogException(
1062  "Projection query would require a scan without a limit on table(s): " +
1063  boost::algorithm::join(table_names, ", "));
1064  } else {
1065  throw WatchdogException(
1066  "Projection query output result set on table(s): " +
1067  boost::algorithm::join(table_names, ", ") + " would contain " +
1068  std::to_string(ra_exe_unit.scan_limit) +
1069  " rows, which is more than the current system limit of " +
1070  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1071  }
1072  }
1073 }
1074 
1075 } // namespace
1076 
1077 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1078  const RelAlgExecutionUnit& ra_exe_unit) {
1079  if (ra_exe_unit.input_descs.size() < 2) {
1080  return false;
1081  }
1082 
1083  // We only support loop join at the end of folded joins
1084  // where ra_exe_unit.input_descs.size() > 2 for now.
1085  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1086 
1087  ssize_t inner_table_idx = -1;
1088  for (size_t i = 0; i < query_infos.size(); ++i) {
1089  if (query_infos[i].table_id == inner_table_id) {
1090  inner_table_idx = i;
1091  break;
1092  }
1093  }
1094  CHECK_NE(ssize_t(-1), inner_table_idx);
1095  return query_infos[inner_table_idx].info.getNumTuples() <=
1097 }
1098 
1099 namespace {
1100 
1102  const size_t new_scan_limit) {
1103  return {ra_exe_unit_in.input_descs,
1104  ra_exe_unit_in.input_col_descs,
1105  ra_exe_unit_in.simple_quals,
1106  ra_exe_unit_in.quals,
1107  ra_exe_unit_in.join_quals,
1108  ra_exe_unit_in.groupby_exprs,
1109  ra_exe_unit_in.target_exprs,
1110  ra_exe_unit_in.estimator,
1111  ra_exe_unit_in.sort_info,
1112  new_scan_limit,
1113  ra_exe_unit_in.query_features,
1114  ra_exe_unit_in.use_bump_allocator};
1115 }
1116 
1117 } // namespace
1118 
1120  size_t& max_groups_buffer_entry_guess,
1121  const bool is_agg,
1122  const std::vector<InputTableInfo>& query_infos,
1123  const RelAlgExecutionUnit& ra_exe_unit_in,
1124  const CompilationOptions& co,
1125  const ExecutionOptions& eo,
1126  const Catalog_Namespace::Catalog& cat,
1127  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1128  RenderInfo* render_info,
1129  const bool has_cardinality_estimation,
1130  ColumnCacheMap& column_cache) {
1131  try {
1132  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1133  is_agg,
1134  true,
1135  query_infos,
1136  ra_exe_unit_in,
1137  co,
1138  eo,
1139  cat,
1140  row_set_mem_owner,
1141  render_info,
1142  has_cardinality_estimation,
1143  column_cache);
1144  } catch (const CompilationRetryNewScanLimit& e) {
1145  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1146  is_agg,
1147  false,
1148  query_infos,
1149  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1150  co,
1151  eo,
1152  cat,
1153  row_set_mem_owner,
1154  render_info,
1155  has_cardinality_estimation,
1156  column_cache);
1157  }
1158 }
1159 
1161  size_t& max_groups_buffer_entry_guess,
1162  const bool is_agg,
1163  const bool allow_single_frag_table_opt,
1164  const std::vector<InputTableInfo>& query_infos,
1165  const RelAlgExecutionUnit& ra_exe_unit_in,
1166  const CompilationOptions& co,
1167  const ExecutionOptions& eo,
1168  const Catalog_Namespace::Catalog& cat,
1169  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1170  RenderInfo* render_info,
1171  const bool has_cardinality_estimation,
1172  ColumnCacheMap& column_cache) {
1173  INJECT_TIMER(Exec_executeWorkUnit);
1174  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1175  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type_);
1176  CHECK(!query_infos.empty());
1177  if (!max_groups_buffer_entry_guess) {
1178  // The query has failed the first execution attempt because of running out
1179  // of group by slots. Make the conservative choice: allocate fragment size
1180  // slots and run on the CPU.
1181  CHECK(device_type == ExecutorDeviceType::CPU);
1182  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1183  }
1184 
1185  int8_t crt_min_byte_width{get_min_byte_width()};
1186  do {
1187  ExecutionDispatch execution_dispatch(
1188  this, ra_exe_unit, query_infos, cat, row_set_mem_owner, render_info);
1189  ColumnFetcher column_fetcher(this, column_cache);
1190  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1191  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1192  try {
1193  INJECT_TIMER(execution_dispatch_comp);
1194  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1195  execution_dispatch.compile(max_groups_buffer_entry_guess,
1196  crt_min_byte_width,
1197  {device_type,
1198  co.hoist_literals_,
1199  co.opt_level_,
1201  co.explain_type_,
1203  eo,
1204  column_fetcher,
1205  has_cardinality_estimation);
1206  CHECK(query_comp_desc_owned);
1207  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1208  } catch (CompilationRetryNoCompaction&) {
1209  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1210  continue;
1211  }
1212  if (eo.just_explain) {
1213  return executeExplain(*query_comp_desc_owned);
1214  }
1215 
1216  for (const auto target_expr : ra_exe_unit.target_exprs) {
1217  plan_state_->target_exprs_.push_back(target_expr);
1218  }
1219 
1220  auto dispatch = [&execution_dispatch,
1221  &column_fetcher,
1222  &eo,
1223  parent_thread_id = std::this_thread::get_id()](
1224  const ExecutorDeviceType chosen_device_type,
1225  int chosen_device_id,
1226  const QueryCompilationDescriptor& query_comp_desc,
1227  const QueryMemoryDescriptor& query_mem_desc,
1228  const FragmentsList& frag_list,
1229  const ExecutorDispatchMode kernel_dispatch_mode,
1230  const int64_t rowid_lookup_key) {
1231  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1232  INJECT_TIMER(execution_dispatch_run);
1233  execution_dispatch.run(chosen_device_type,
1234  chosen_device_id,
1235  eo,
1236  column_fetcher,
1237  query_comp_desc,
1238  query_mem_desc,
1239  frag_list,
1240  kernel_dispatch_mode,
1241  rowid_lookup_key);
1242  };
1243 
1244  QueryFragmentDescriptor fragment_descriptor(
1245  ra_exe_unit,
1246  query_infos,
1247  query_comp_desc_owned->getDeviceType() == ExecutorDeviceType::GPU
1249  : std::vector<Data_Namespace::MemoryInfo>{},
1250  eo.gpu_input_mem_limit_percent);
1251 
1252  if (!eo.just_validate) {
1253  int available_cpus = cpu_threads();
1254  auto available_gpus = get_available_gpus(cat);
1255 
1256  const auto context_count =
1257  get_context_count(device_type, available_cpus, available_gpus.size());
1258  try {
1259  dispatchFragments(dispatch,
1260  execution_dispatch,
1261  query_infos,
1262  eo,
1263  is_agg,
1264  allow_single_frag_table_opt,
1265  context_count,
1266  *query_comp_desc_owned,
1267  *query_mem_desc_owned,
1268  fragment_descriptor,
1269  available_gpus,
1270  available_cpus);
1271  } catch (QueryExecutionError& e) {
1272  if (eo.with_dynamic_watchdog && interrupted_ &&
1273  e.getErrorCode() == ERR_OUT_OF_TIME) {
1275  }
1276  cat.getDataMgr().freeAllBuffers();
1278  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1279  crt_min_byte_width <<= 1;
1280  continue;
1281  }
1282  throw;
1283  }
1284  }
1285  cat.getDataMgr().freeAllBuffers();
1286  if (is_agg) {
1287  try {
1288  return collectAllDeviceResults(execution_dispatch,
1289  ra_exe_unit.target_exprs,
1290  *query_mem_desc_owned,
1291  query_comp_desc_owned->getDeviceType(),
1292  row_set_mem_owner);
1293  } catch (ReductionRanOutOfSlots&) {
1295  } catch (OverflowOrUnderflow&) {
1296  crt_min_byte_width <<= 1;
1297  continue;
1298  }
1299  }
1300  return resultsUnion(execution_dispatch);
1301 
1302  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1303 
1304  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1307  nullptr,
1308  this);
1309 }
1310 
1312  const InputTableInfo& table_info,
1313  const CompilationOptions& co,
1314  const ExecutionOptions& eo,
1315  const Catalog_Namespace::Catalog& cat,
1316  PerFragmentCallBack& cb) {
1317  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1318  ColumnCacheMap column_cache;
1319 
1320  std::vector<InputTableInfo> table_infos{table_info};
1321  // TODO(adb): ensure this is under a try / catch
1322  ExecutionDispatch execution_dispatch(
1323  this, ra_exe_unit, table_infos, cat, row_set_mem_owner_, nullptr);
1324  ColumnFetcher column_fetcher(this, column_cache);
1325  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1326  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1327  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1328  execution_dispatch.compile(0, 8, co, eo, column_fetcher, false);
1329  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1330  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1331  const auto& outer_fragments = table_info.info.fragments;
1332  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1333  ++fragment_index) {
1334  // We may want to consider in the future allowing this to execute on devices other
1335  // than CPU
1336  execution_dispatch.run(co.device_type_,
1337  0,
1338  eo,
1339  column_fetcher,
1340  *query_comp_desc_owned,
1341  *query_mem_desc_owned,
1342  {{table_id, {fragment_index}}},
1344  -1);
1345  }
1346 
1347  const auto& all_fragment_results = execution_dispatch.getFragmentResults();
1348 
1349  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1350  ++fragment_index) {
1351  const auto fragment_results = all_fragment_results[fragment_index];
1352  cb(fragment_results.first, outer_fragments[fragment_index]);
1353  }
1354 }
1355 
1357  const TableFunctionExecutionUnit exe_unit,
1358  const std::vector<InputTableInfo>& table_infos,
1359  const CompilationOptions& co,
1360  const ExecutionOptions& eo,
1361  const Catalog_Namespace::Catalog& cat) {
1362  INJECT_TIMER(Exec_executeTableFunction);
1363  nukeOldState(false, table_infos, nullptr);
1364 
1365  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1366  // framework, we may want to move this up a level
1367 
1368  ColumnFetcher column_fetcher(this, column_cache);
1369  TableFunctionCompilationContext compilation_context;
1370  compilation_context.compile(exe_unit, co, this);
1371 
1373  CHECK_EQ(table_infos.size(), size_t(1));
1374  return exe_context.execute(exe_unit,
1375  table_infos.front(),
1376  &compilation_context,
1377  column_fetcher,
1378  co.device_type_,
1379  this);
1380 }
1381 
1383  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1384 }
1385 
1386 // Looks at the targets and returns a feasible device type. We only punt
1387 // to CPU for count distinct and we should probably fix it and remove this.
1389  const RelAlgExecutionUnit& ra_exe_unit,
1390  const ExecutorDeviceType requested_device_type) {
1391  for (const auto target_expr : ra_exe_unit.target_exprs) {
1392  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1393  if (!ra_exe_unit.groupby_exprs.empty() &&
1394  !isArchPascalOrLater(requested_device_type)) {
1395  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1396  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1397  return ExecutorDeviceType::CPU;
1398  }
1399  }
1400  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1401  return ExecutorDeviceType::CPU;
1402  }
1403  }
1404  return requested_device_type;
1405 }
1406 
1407 namespace {
1408 
1409 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1410  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1411  if (ti.is_fp()) {
1412  if (float_argument_input && ti.get_type() == kFLOAT) {
1413  int64_t float_null_val = 0;
1414  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1415  static_cast<float>(inline_fp_null_val(ti));
1416  return float_null_val;
1417  }
1418  const auto double_null_val = inline_fp_null_val(ti);
1419  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1420  }
1421  return inline_int_null_val(ti);
1422 }
1423 
1424 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1425  std::vector<int64_t>& entry,
1426  const std::vector<Analyzer::Expr*>& target_exprs,
1427  const QueryMemoryDescriptor& query_mem_desc) {
1428  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1429  const auto target_expr = target_exprs[target_idx];
1430  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1431  CHECK(agg_info.is_agg);
1432  target_infos.push_back(agg_info);
1433  if (g_cluster) {
1434  const auto executor = query_mem_desc.getExecutor();
1435  CHECK(executor);
1436  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1437  CHECK(row_set_mem_owner);
1438  const auto& count_distinct_desc =
1439  query_mem_desc.getCountDistinctDescriptor(target_idx);
1440  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1441  auto count_distinct_buffer = static_cast<int8_t*>(
1442  checked_calloc(count_distinct_desc.bitmapPaddedSizeBytes(), 1));
1443  CHECK(row_set_mem_owner);
1444  row_set_mem_owner->addCountDistinctBuffer(
1445  count_distinct_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), true);
1446  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1447  continue;
1448  }
1449  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1450  auto count_distinct_set = new std::set<int64_t>();
1451  CHECK(row_set_mem_owner);
1452  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1453  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1454  continue;
1455  }
1456  }
1457  const bool float_argument_input = takes_float_argument(agg_info);
1458  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1459  entry.push_back(0);
1460  } else if (agg_info.agg_kind == kAVG) {
1461  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1462  entry.push_back(0);
1463  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1464  if (agg_info.sql_type.is_geometry()) {
1465  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1466  entry.push_back(0);
1467  }
1468  } else if (agg_info.sql_type.is_varlen()) {
1469  entry.push_back(0);
1470  entry.push_back(0);
1471  } else {
1472  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1473  }
1474  } else {
1475  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1476  }
1477  }
1478 }
1479 
1481  const std::vector<Analyzer::Expr*>& target_exprs_in,
1482  const QueryMemoryDescriptor& query_mem_desc,
1483  const ExecutorDeviceType device_type) {
1484  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1485  std::vector<Analyzer::Expr*> target_exprs;
1486  for (const auto target_expr : target_exprs_in) {
1487  const auto target_expr_copy =
1488  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1489  CHECK(target_expr_copy);
1490  auto ti = target_expr->get_type_info();
1491  ti.set_notnull(false);
1492  target_expr_copy->set_type_info(ti);
1493  if (target_expr_copy->get_arg()) {
1494  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1495  arg_ti.set_notnull(false);
1496  target_expr_copy->get_arg()->set_type_info(arg_ti);
1497  }
1498  target_exprs_owned_copies.push_back(target_expr_copy);
1499  target_exprs.push_back(target_expr_copy.get());
1500  }
1501  std::vector<TargetInfo> target_infos;
1502  std::vector<int64_t> entry;
1503  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1504  const auto executor = query_mem_desc.getExecutor();
1505  CHECK(executor);
1506  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1507  CHECK(row_set_mem_owner);
1508  auto rs = std::make_shared<ResultSet>(
1509  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1510  rs->allocateStorage();
1511  rs->fillOneEntry(entry);
1512  return rs;
1513 }
1514 
1515 } // namespace
1516 
1518  ExecutionDispatch& execution_dispatch,
1519  const std::vector<Analyzer::Expr*>& target_exprs,
1520  const QueryMemoryDescriptor& query_mem_desc,
1521  const ExecutorDeviceType device_type,
1522  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1523  auto timer = DEBUG_TIMER(__func__);
1524  auto& result_per_device = execution_dispatch.getFragmentResults();
1525  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1527  return build_row_for_empty_input(target_exprs, query_mem_desc, device_type);
1528  }
1529  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1530  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1531  return reduceSpeculativeTopN(
1532  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1533  }
1534  const auto shard_count =
1535  device_type == ExecutorDeviceType::GPU
1537  : 0;
1538 
1539  if (shard_count && !result_per_device.empty()) {
1540  return collectAllDeviceShardedTopResults(execution_dispatch);
1541  }
1542  return reduceMultiDeviceResults(
1543  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1544 }
1545 
1546 namespace {
1557 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1558  const QueryMemoryDescriptor& input_query_mem_desc,
1559  const ResultSetStorage* output_storage,
1560  size_t output_row_index,
1561  const QueryMemoryDescriptor& output_query_mem_desc,
1562  const std::vector<uint32_t>& top_permutation) {
1563  const auto output_buffer = output_storage->getUnderlyingBuffer();
1564  const auto input_buffer = input_storage->getUnderlyingBuffer();
1565  for (const auto sorted_idx : top_permutation) {
1566  // permuting all group-columns in this result set into the final buffer:
1567  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1568  group_idx++) {
1569  const auto input_column_ptr =
1570  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1571  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1572  const auto output_column_ptr =
1573  output_buffer +
1574  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1575  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1576  memcpy(output_column_ptr,
1577  input_column_ptr,
1578  output_query_mem_desc.groupColWidth(group_idx));
1579  }
1580  // permuting all agg-columns in this result set into the final buffer:
1581  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1582  slot_idx++) {
1583  const auto input_column_ptr =
1584  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1585  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1586  const auto output_column_ptr =
1587  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1588  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1589  memcpy(output_column_ptr,
1590  input_column_ptr,
1591  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1592  }
1593  ++output_row_index;
1594  }
1595  return output_row_index;
1596 }
1597 
1607 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1608  const ResultSetStorage* output_storage,
1609  size_t output_row_index,
1610  const QueryMemoryDescriptor& output_query_mem_desc,
1611  const std::vector<uint32_t>& top_permutation) {
1612  const auto output_buffer = output_storage->getUnderlyingBuffer();
1613  const auto input_buffer = input_storage->getUnderlyingBuffer();
1614  for (const auto sorted_idx : top_permutation) {
1615  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1616  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1617  row_ptr,
1618  output_query_mem_desc.getRowSize());
1619  ++output_row_index;
1620  }
1621  return output_row_index;
1622 }
1623 } // namespace
1624 
1625 // Collect top results from each device, stitch them together and sort. Partial
1626 // results from each device are guaranteed to be disjunct because we only go on
1627 // this path when one of the columns involved is a shard key.
1629  ExecutionDispatch& execution_dispatch) const {
1630  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1631  auto& result_per_device = execution_dispatch.getFragmentResults();
1632  const auto first_result_set = result_per_device.front().first;
1633  CHECK(first_result_set);
1634  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1635  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1636  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1637  top_query_mem_desc.setEntryCount(0);
1638  for (auto& result : result_per_device) {
1639  const auto result_set = result.first;
1640  CHECK(result_set);
1641  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1642  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1643  top_query_mem_desc.setEntryCount(new_entry_cnt);
1644  }
1645  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1646  first_result_set->getDeviceType(),
1647  top_query_mem_desc,
1648  first_result_set->getRowSetMemOwner(),
1649  this);
1650  auto top_storage = top_result_set->allocateStorage();
1651  size_t top_output_row_idx{0};
1652  for (auto& result : result_per_device) {
1653  const auto result_set = result.first;
1654  CHECK(result_set);
1655  const auto& top_permutation = result_set->getPermutationBuffer();
1656  CHECK_LE(top_permutation.size(), top_n);
1657  if (top_query_mem_desc.didOutputColumnar()) {
1658  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1659  result_set->getQueryMemDesc(),
1660  top_storage,
1661  top_output_row_idx,
1662  top_query_mem_desc,
1663  top_permutation);
1664  } else {
1665  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1666  top_storage,
1667  top_output_row_idx,
1668  top_query_mem_desc,
1669  top_permutation);
1670  }
1671  }
1672  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1673  return top_result_set;
1674 }
1675 
1676 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1677  const {
1678  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1679  const auto& join_info = plan_state_->join_info_;
1680  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1681  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1682  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1683  id_to_cond.insert(
1684  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1685  }
1686  return id_to_cond;
1687 }
1688 
1689 namespace {
1690 
1691 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1692  for (const auto& col : fetched_cols) {
1693  if (col.is_lazily_fetched) {
1694  return true;
1695  }
1696  }
1697  return false;
1698 }
1699 
1700 } // namespace
1701 
1703  const std::function<void(const ExecutorDeviceType chosen_device_type,
1704  int chosen_device_id,
1705  const QueryCompilationDescriptor& query_comp_desc,
1706  const QueryMemoryDescriptor& query_mem_desc,
1707  const FragmentsList& frag_list,
1708  const ExecutorDispatchMode kernel_dispatch_mode,
1709  const int64_t rowid_lookup_key)> dispatch,
1710  const ExecutionDispatch& execution_dispatch,
1711  const std::vector<InputTableInfo>& table_infos,
1712  const ExecutionOptions& eo,
1713  const bool is_agg,
1714  const bool allow_single_frag_table_opt,
1715  const size_t context_count,
1716  const QueryCompilationDescriptor& query_comp_desc,
1717  const QueryMemoryDescriptor& query_mem_desc,
1718  QueryFragmentDescriptor& fragment_descriptor,
1719  std::unordered_set<int>& available_gpus,
1720  int& available_cpus) {
1721  std::vector<std::future<void>> query_threads;
1722  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1723  CHECK(!ra_exe_unit.input_descs.empty());
1724 
1725  const auto device_type = query_comp_desc.getDeviceType();
1726  const bool uses_lazy_fetch =
1727  plan_state_->allow_lazy_fetch_ &&
1728  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
1729  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1730  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1731 
1732  const auto device_count = deviceCount(device_type);
1733  CHECK_GT(device_count, 0);
1734 
1735  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1736  execution_dispatch.getFragOffsets(),
1737  device_count,
1738  device_type,
1739  use_multifrag_kernel,
1741  this);
1742  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1743  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1744  }
1745 
1746  if (use_multifrag_kernel) {
1747  VLOG(1) << "Dispatching multifrag kernels";
1748  VLOG(1) << query_mem_desc.toString();
1749 
1750  // NB: We should never be on this path when the query is retried because of running
1751  // out of group by slots; also, for scan only queries on CPU we want the
1752  // high-granularity, fragment by fragment execution instead. For scan only queries on
1753  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1754  // buffer per fragment.
1755  auto multifrag_kernel_dispatch =
1756  [&query_threads, &dispatch, query_comp_desc, query_mem_desc](
1757  const int device_id,
1758  const FragmentsList& frag_list,
1759  const int64_t rowid_lookup_key) {
1760  query_threads.push_back(std::async(std::launch::async,
1761  dispatch,
1763  device_id,
1764  query_comp_desc,
1765  query_mem_desc,
1766  frag_list,
1768  rowid_lookup_key));
1769  };
1770  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
1771  } else {
1772  VLOG(1) << "Dispatching kernel per fragment";
1773  VLOG(1) << query_mem_desc.toString();
1774 
1775  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
1777  table_infos.size() == 1 && table_infos.front().table_id > 0) {
1778  const auto max_frag_size =
1779  table_infos.front().info.getFragmentNumTuplesUpperBound();
1780  if (max_frag_size < query_mem_desc.getEntryCount()) {
1781  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
1782  << " to match max fragment size " << max_frag_size
1783  << " for kernel per fragment execution path.";
1784  throw CompilationRetryNewScanLimit(max_frag_size);
1785  }
1786  }
1787 
1788  size_t frag_list_idx{0};
1789  auto fragment_per_kernel_dispatch = [&query_threads,
1790  &dispatch,
1791  &frag_list_idx,
1792  &device_type,
1793  query_comp_desc,
1794  query_mem_desc](const int device_id,
1795  const FragmentsList& frag_list,
1796  const int64_t rowid_lookup_key) {
1797  if (!frag_list.size()) {
1798  return;
1799  }
1800  CHECK_GE(device_id, 0);
1801 
1802  query_threads.push_back(std::async(std::launch::async,
1803  dispatch,
1804  device_type,
1805  device_id,
1806  query_comp_desc,
1807  query_mem_desc,
1808  frag_list,
1810  rowid_lookup_key));
1811 
1812  ++frag_list_idx;
1813  };
1814 
1815  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
1816  ra_exe_unit);
1817  }
1818  for (auto& child : query_threads) {
1819  child.wait();
1820  }
1821  for (auto& child : query_threads) {
1822  child.get();
1823  }
1824 }
1825 
1827  const RelAlgExecutionUnit& ra_exe_unit,
1828  const ExecutorDeviceType device_type,
1829  const size_t table_idx,
1830  const size_t outer_frag_idx,
1831  std::map<int, const TableFragments*>& selected_tables_fragments,
1832  const std::unordered_map<int, const Analyzer::BinOper*>&
1833  inner_table_id_to_join_condition) {
1834  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1835  auto table_frags_it = selected_tables_fragments.find(table_id);
1836  CHECK(table_frags_it != selected_tables_fragments.end());
1837  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
1838  const auto outer_table_fragments_it =
1839  selected_tables_fragments.find(outer_input_desc.getTableId());
1840  const auto outer_table_fragments = outer_table_fragments_it->second;
1841  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
1842  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
1843  if (!table_idx) {
1844  return {outer_frag_idx};
1845  }
1846  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
1847  auto& inner_frags = table_frags_it->second;
1848  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
1849  std::vector<size_t> all_frag_ids;
1850  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
1851  ++inner_frag_idx) {
1852  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
1853  if (skipFragmentPair(outer_fragment_info,
1854  inner_frag_info,
1855  table_idx,
1856  inner_table_id_to_join_condition,
1857  ra_exe_unit,
1858  device_type)) {
1859  continue;
1860  }
1861  all_frag_ids.push_back(inner_frag_idx);
1862  }
1863  return all_frag_ids;
1864 }
1865 
1866 // Returns true iff the join between two fragments cannot yield any results, per
1867 // shard information. The pair can be skipped to avoid full broadcast.
1869  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
1870  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
1871  const int table_idx,
1872  const std::unordered_map<int, const Analyzer::BinOper*>&
1873  inner_table_id_to_join_condition,
1874  const RelAlgExecutionUnit& ra_exe_unit,
1875  const ExecutorDeviceType device_type) {
1876  if (device_type != ExecutorDeviceType::GPU) {
1877  return false;
1878  }
1879  CHECK(table_idx >= 0 &&
1880  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
1881  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1882  // Both tables need to be sharded the same way.
1883  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
1884  outer_fragment_info.shard == inner_fragment_info.shard) {
1885  return false;
1886  }
1887  const Analyzer::BinOper* join_condition{nullptr};
1888  if (ra_exe_unit.join_quals.empty()) {
1889  CHECK(!inner_table_id_to_join_condition.empty());
1890  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
1891  CHECK(condition_it != inner_table_id_to_join_condition.end());
1892  join_condition = condition_it->second;
1893  CHECK(join_condition);
1894  } else {
1895  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
1896  plan_state_->join_info_.join_hash_tables_.size());
1897  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
1898  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
1899  table_idx) {
1900  CHECK(!join_condition);
1901  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
1902  }
1903  }
1904  }
1905  if (!join_condition) {
1906  return false;
1907  }
1908  // TODO(adb): support fragment skipping based on the overlaps operator
1909  if (join_condition->is_overlaps_oper()) {
1910  return false;
1911  }
1912  size_t shard_count{0};
1913  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
1914  join_condition->get_left_operand())) {
1915  auto inner_outer_pairs =
1916  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
1918  join_condition, this, inner_outer_pairs);
1919  } else {
1920  shard_count = get_shard_count(join_condition, this);
1921  }
1922  if (shard_count && !ra_exe_unit.join_quals.empty()) {
1923  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
1924  }
1925  return shard_count;
1926 }
1927 
1928 namespace {
1929 
1931  const Catalog_Namespace::Catalog& cat) {
1932  const int table_id = col_desc->getScanDesc().getTableId();
1933  const int col_id = col_desc->getColId();
1934  return get_column_descriptor_maybe(col_id, table_id, cat);
1935 }
1936 
1937 } // namespace
1938 
1939 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
1940  const std::vector<InputDescriptor>& input_descs,
1941  const std::map<int, const TableFragments*>& all_tables_fragments) {
1942  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
1943  for (auto& desc : input_descs) {
1944  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
1945  CHECK(fragments_it != all_tables_fragments.end());
1946  const auto& fragments = *fragments_it->second;
1947  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
1948  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
1949  frag_offsets[i] = off;
1950  off += fragments[i].getNumTuples();
1951  }
1952  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
1953  }
1954  return tab_id_to_frag_offsets;
1955 }
1956 
1957 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
1959  const RelAlgExecutionUnit& ra_exe_unit,
1960  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
1961  const std::vector<InputDescriptor>& input_descs,
1962  const std::map<int, const TableFragments*>& all_tables_fragments) {
1963  std::vector<std::vector<int64_t>> all_num_rows;
1964  std::vector<std::vector<uint64_t>> all_frag_offsets;
1965  const auto tab_id_to_frag_offsets =
1966  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
1967  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
1968  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
1969  std::vector<int64_t> num_rows;
1970  std::vector<uint64_t> frag_offsets;
1971  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
1972  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
1973  const auto frag_id = selected_frag_ids[tab_idx];
1974  const auto fragments_it =
1975  all_tables_fragments.find(input_descs[tab_idx].getTableId());
1976  CHECK(fragments_it != all_tables_fragments.end());
1977  const auto& fragments = *fragments_it->second;
1978  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
1979  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
1980  const auto& fragment = fragments[frag_id];
1981  num_rows.push_back(fragment.getNumTuples());
1982  } else {
1983  size_t total_row_count{0};
1984  for (const auto& fragment : fragments) {
1985  total_row_count += fragment.getNumTuples();
1986  }
1987  num_rows.push_back(total_row_count);
1988  }
1989  const auto frag_offsets_it =
1990  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
1991  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
1992  const auto& offsets = frag_offsets_it->second;
1993  CHECK_LT(frag_id, offsets.size());
1994  frag_offsets.push_back(offsets[frag_id]);
1995  }
1996  all_num_rows.push_back(num_rows);
1997  // Fragment offsets of outer table should be ONLY used by rowid for now.
1998  all_frag_offsets.push_back(frag_offsets);
1999  }
2000  return {all_num_rows, all_frag_offsets};
2001 }
2002 
2003 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2004 // all the table fragments.
2006  const RelAlgExecutionUnit& ra_exe_unit,
2007  const FragmentsList& selected_fragments) const {
2008  const auto& input_descs = ra_exe_unit.input_descs;
2009  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2010  if (nest_level < 1 ||
2011  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2012  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2013  (ra_exe_unit.join_quals.empty() &&
2014  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2015  return false;
2016  }
2017  const int table_id = inner_col_desc.getScanDesc().getTableId();
2018  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2019  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2020  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2021  return fragments.size() > 1;
2022 }
2023 
2025  const ColumnFetcher& column_fetcher,
2026  const RelAlgExecutionUnit& ra_exe_unit,
2027  const int device_id,
2028  const Data_Namespace::MemoryLevel memory_level,
2029  const std::map<int, const TableFragments*>& all_tables_fragments,
2030  const FragmentsList& selected_fragments,
2031  const Catalog_Namespace::Catalog& cat,
2032  std::list<ChunkIter>& chunk_iterators,
2033  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
2034  auto timer = DEBUG_TIMER(__func__);
2036  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2037  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2038  std::vector<size_t> local_col_to_frag_pos;
2039  buildSelectedFragsMapping(selected_fragments_crossjoin,
2040  local_col_to_frag_pos,
2041  col_global_ids,
2042  selected_fragments,
2043  ra_exe_unit);
2044 
2046  selected_fragments_crossjoin);
2047 
2048  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2049  std::vector<std::vector<int64_t>> all_num_rows;
2050  std::vector<std::vector<uint64_t>> all_frag_offsets;
2051 
2052  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2053  std::vector<const int8_t*> frag_col_buffers(
2054  plan_state_->global_to_local_col_ids_.size());
2055  for (const auto& col_id : col_global_ids) {
2056  CHECK(col_id);
2057  const int table_id = col_id->getScanDesc().getTableId();
2058  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2059  if (cd && cd->isVirtualCol) {
2060  CHECK_EQ("rowid", cd->columnName);
2061  continue;
2062  }
2063  const auto fragments_it = all_tables_fragments.find(table_id);
2064  CHECK(fragments_it != all_tables_fragments.end());
2065  const auto fragments = fragments_it->second;
2066  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2067  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2068  CHECK_LT(static_cast<size_t>(it->second),
2069  plan_state_->global_to_local_col_ids_.size());
2070  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2071  if (!fragments->size()) {
2072  return {};
2073  }
2074  CHECK_LT(frag_id, fragments->size());
2075  auto memory_level_for_column = memory_level;
2076  if (plan_state_->columns_to_fetch_.find(
2077  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2078  plan_state_->columns_to_fetch_.end()) {
2079  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2080  }
2081  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2082  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2083  col_id.get(), memory_level_for_column, device_id);
2084  } else {
2085  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2086  frag_col_buffers[it->second] =
2087  column_fetcher.getAllTableColumnFragments(table_id,
2088  col_id->getColId(),
2089  all_tables_fragments,
2090  memory_level_for_column,
2091  device_id);
2092  } else {
2093  frag_col_buffers[it->second] =
2094  column_fetcher.getOneTableColumnFragment(table_id,
2095  frag_id,
2096  col_id->getColId(),
2097  all_tables_fragments,
2098  chunks,
2099  chunk_iterators,
2100  memory_level_for_column,
2101  device_id);
2102  }
2103  }
2104  }
2105  all_frag_col_buffers.push_back(frag_col_buffers);
2106  }
2107  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2108  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2109  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2110 }
2111 
2112 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2113  const size_t scan_idx,
2114  const RelAlgExecutionUnit& ra_exe_unit) {
2115  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2116  scan_idx > 0 &&
2117  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2118  !selected_fragments[scan_idx].fragment_ids.empty()) {
2119  // Fetch all fragments
2120  return {size_t(0)};
2121  }
2122 
2123  return selected_fragments[scan_idx].fragment_ids;
2124 }
2125 
2127  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2128  std::vector<size_t>& local_col_to_frag_pos,
2129  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2130  const FragmentsList& selected_fragments,
2131  const RelAlgExecutionUnit& ra_exe_unit) {
2132  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2133  size_t frag_pos{0};
2134  const auto& input_descs = ra_exe_unit.input_descs;
2135  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2136  const int table_id = input_descs[scan_idx].getTableId();
2137  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2138  selected_fragments_crossjoin.push_back(
2139  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2140  for (const auto& col_id : col_global_ids) {
2141  CHECK(col_id);
2142  const auto& input_desc = col_id->getScanDesc();
2143  if (input_desc.getTableId() != table_id ||
2144  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2145  continue;
2146  }
2147  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2148  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2149  CHECK_LT(static_cast<size_t>(it->second),
2150  plan_state_->global_to_local_col_ids_.size());
2151  local_col_to_frag_pos[it->second] = frag_pos;
2152  }
2153  ++frag_pos;
2154  }
2155 }
2156 
2157 namespace {
2158 
2160  public:
2161  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2163  for (auto out : out_vec_) {
2164  delete[] out;
2165  }
2166  }
2167 
2168  private:
2169  std::vector<int64_t*> out_vec_;
2170 };
2171 } // namespace
2172 
2173 template <typename META_TYPE_CLASS>
2175  public:
2176  using ReturnType = void;
2177 
2178  // TODO: Avoid parameter struct indirection and forward directly
2179  ReturnType operator()(int const entry_count,
2180  int& error_code,
2181  TargetInfo const& agg_info,
2182  size_t& out_vec_idx,
2183  std::vector<int64_t*>& out_vec,
2184  std::vector<int64_t>& reduced_outs,
2185  QueryExecutionContext* query_exe_context) {
2186  int64_t val1;
2187  const bool float_argument_input = takes_float_argument(agg_info);
2188  if (is_distinct_target(agg_info)) {
2189  CHECK(agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2190  val1 = out_vec[out_vec_idx][0];
2191  error_code = 0;
2192  } else {
2193  const auto chosen_bytes = static_cast<size_t>(
2194  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2195  std::tie(val1, error_code) =
2197  agg_info.sql_type,
2198  query_exe_context->getAggInitValForIndex(out_vec_idx),
2199  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2200  out_vec[out_vec_idx],
2201  entry_count,
2202  false,
2203  float_argument_input);
2204  }
2205  if (error_code) {
2206  return;
2207  }
2208  reduced_outs.push_back(val1);
2209  if (agg_info.agg_kind == kAVG ||
2210  (agg_info.agg_kind == kSAMPLE &&
2211  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2212  const auto chosen_bytes = static_cast<size_t>(
2213  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx + 1));
2214  int64_t val2;
2215  std::tie(val2, error_code) = Executor::reduceResults(
2216  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2217  agg_info.sql_type,
2218  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2219  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2220  out_vec[out_vec_idx + 1],
2221  entry_count,
2222  false,
2223  false);
2224  if (error_code) {
2225  return;
2226  }
2227  reduced_outs.push_back(val2);
2228  ++out_vec_idx;
2229  }
2230  ++out_vec_idx;
2231  }
2232 };
2233 
2234 // Handles reduction for geo-types
2235 template <>
2236 class AggregateReductionEgress<Experimental::MetaTypeClass<Experimental::Geometry>> {
2237  public:
2238  using ReturnType = void;
2239 
2240  ReturnType operator()(int const entry_count,
2241  int& error_code,
2242  TargetInfo const& agg_info,
2243  size_t& out_vec_idx,
2244  std::vector<int64_t*>& out_vec,
2245  std::vector<int64_t>& reduced_outs,
2246  QueryExecutionContext* query_exe_context) {
2247  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2248  int64_t val1;
2249  const auto chosen_bytes = static_cast<size_t>(
2250  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2251  std::tie(val1, error_code) =
2253  agg_info.sql_type,
2254  query_exe_context->getAggInitValForIndex(out_vec_idx),
2255  chosen_bytes,
2256  out_vec[out_vec_idx],
2257  entry_count,
2258  false,
2259  false);
2260  if (error_code) {
2261  return;
2262  }
2263  reduced_outs.push_back(val1);
2264  out_vec_idx++;
2265  }
2266  }
2267 };
2268 
2270  const RelAlgExecutionUnit& ra_exe_unit,
2271  const CompilationResult& compilation_result,
2272  const bool hoist_literals,
2273  ResultSetPtr& results,
2274  const std::vector<Analyzer::Expr*>& target_exprs,
2275  const ExecutorDeviceType device_type,
2276  std::vector<std::vector<const int8_t*>>& col_buffers,
2277  QueryExecutionContext* query_exe_context,
2278  const std::vector<std::vector<int64_t>>& num_rows,
2279  const std::vector<std::vector<uint64_t>>& frag_offsets,
2280  Data_Namespace::DataMgr* data_mgr,
2281  const int device_id,
2282  const uint32_t start_rowid,
2283  const uint32_t num_tables,
2284  RenderInfo* render_info) {
2286  auto timer = DEBUG_TIMER(__func__);
2287  CHECK(!results);
2288  if (col_buffers.empty()) {
2289  return 0;
2290  }
2291 
2292  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2293  if (render_info) {
2294  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2295  // here, we are in non-insitu mode.
2296  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2297  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2298  "currently unsupported.";
2299  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2300  }
2301 
2302  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2303  std::vector<int64_t*> out_vec;
2304  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2305  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2306  std::unique_ptr<OutVecOwner> output_memory_scope;
2308  return ERR_INTERRUPTED;
2309  }
2310  if (device_type == ExecutorDeviceType::CPU) {
2311  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2312  compilation_result.native_functions,
2313  hoist_literals,
2314  hoist_buf,
2315  col_buffers,
2316  num_rows,
2317  frag_offsets,
2318  0,
2319  &error_code,
2320  num_tables,
2321  join_hash_table_ptrs);
2322  output_memory_scope.reset(new OutVecOwner(out_vec));
2323  } else {
2324  try {
2325  out_vec = query_exe_context->launchGpuCode(ra_exe_unit,
2326  compilation_result.native_functions,
2327  hoist_literals,
2328  hoist_buf,
2329  col_buffers,
2330  num_rows,
2331  frag_offsets,
2332  0,
2333  data_mgr,
2334  blockSize(),
2335  gridSize(),
2336  device_id,
2337  &error_code,
2338  num_tables,
2339  join_hash_table_ptrs,
2340  render_allocator_map_ptr);
2341  output_memory_scope.reset(new OutVecOwner(out_vec));
2342  } catch (const OutOfMemory&) {
2343  return ERR_OUT_OF_GPU_MEM;
2344  } catch (const std::exception& e) {
2345  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2346  }
2347  }
2348  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2349  error_code == Executor::ERR_DIV_BY_ZERO ||
2350  error_code == Executor::ERR_OUT_OF_TIME ||
2351  error_code == Executor::ERR_INTERRUPTED ||
2353  return error_code;
2354  }
2355  if (ra_exe_unit.estimator) {
2356  CHECK(!error_code);
2357  results =
2358  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2359  return 0;
2360  }
2361  std::vector<int64_t> reduced_outs;
2362  const auto num_frags = col_buffers.size();
2363  const size_t entry_count = device_type == ExecutorDeviceType::GPU
2364  ? num_frags * blockSize() * gridSize()
2365  : num_frags;
2366  if (size_t(1) == entry_count) {
2367  for (auto out : out_vec) {
2368  CHECK(out);
2369  reduced_outs.push_back(*out);
2370  }
2371  } else {
2372  size_t out_vec_idx = 0;
2373 
2374  for (const auto target_expr : target_exprs) {
2375  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2376  CHECK(agg_info.is_agg);
2377 
2378  auto meta_class(
2380  auto agg_reduction_impl =
2382  agg_reduction_impl(meta_class,
2383  entry_count,
2384  error_code,
2385  agg_info,
2386  out_vec_idx,
2387  out_vec,
2388  reduced_outs,
2389  query_exe_context);
2390  if (error_code) {
2391  break;
2392  }
2393  }
2394  }
2395 
2396  if (error_code) {
2397  return error_code;
2398  }
2399 
2400  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2401  auto rows_ptr = std::shared_ptr<ResultSet>(
2402  query_exe_context->query_buffers_->result_sets_[0].release());
2403  rows_ptr->fillOneEntry(reduced_outs);
2404  results = std::move(rows_ptr);
2405  return error_code;
2406 }
2407 
2408 namespace {
2409 
2410 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2411  CHECK(scan_limit);
2412  return results && results->rowCount() < scan_limit;
2413 }
2414 
2415 } // namespace
2416 
2418  const RelAlgExecutionUnit& ra_exe_unit,
2419  const CompilationResult& compilation_result,
2420  const bool hoist_literals,
2421  ResultSetPtr& results,
2422  const ExecutorDeviceType device_type,
2423  std::vector<std::vector<const int8_t*>>& col_buffers,
2424  const std::vector<size_t> outer_tab_frag_ids,
2425  QueryExecutionContext* query_exe_context,
2426  const std::vector<std::vector<int64_t>>& num_rows,
2427  const std::vector<std::vector<uint64_t>>& frag_offsets,
2428  Data_Namespace::DataMgr* data_mgr,
2429  const int device_id,
2430  const int64_t scan_limit,
2431  const uint32_t start_rowid,
2432  const uint32_t num_tables,
2433  RenderInfo* render_info) {
2434  auto timer = DEBUG_TIMER(__func__);
2436  CHECK(!results);
2437  if (col_buffers.empty()) {
2438  return 0;
2439  }
2440  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2441  // TODO(alex):
2442  // 1. Optimize size (make keys more compact).
2443  // 2. Resize on overflow.
2444  // 3. Optimize runtime.
2445  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2446  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2447  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2449  return ERR_INTERRUPTED;
2450  }
2451 
2452  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2453  if (render_info && render_info->useCudaBuffers()) {
2454  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2455  }
2456 
2457  if (device_type == ExecutorDeviceType::CPU) {
2458  query_exe_context->launchCpuCode(ra_exe_unit,
2459  compilation_result.native_functions,
2460  hoist_literals,
2461  hoist_buf,
2462  col_buffers,
2463  num_rows,
2464  frag_offsets,
2465  scan_limit,
2466  &error_code,
2467  num_tables,
2468  join_hash_table_ptrs);
2469  } else {
2470  try {
2471  query_exe_context->launchGpuCode(ra_exe_unit,
2472  compilation_result.native_functions,
2473  hoist_literals,
2474  hoist_buf,
2475  col_buffers,
2476  num_rows,
2477  frag_offsets,
2478  scan_limit,
2479  data_mgr,
2480  blockSize(),
2481  gridSize(),
2482  device_id,
2483  &error_code,
2484  num_tables,
2485  join_hash_table_ptrs,
2486  render_allocator_map_ptr);
2487  } catch (const OutOfMemory&) {
2488  return ERR_OUT_OF_GPU_MEM;
2489  } catch (const OutOfRenderMemory&) {
2490  return ERR_OUT_OF_RENDER_MEM;
2491  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2493  } catch (const std::bad_alloc&) {
2494  return ERR_SPECULATIVE_TOP_OOM;
2495  } catch (const std::exception& e) {
2496  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2497  }
2498  }
2499 
2500  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2501  error_code == Executor::ERR_DIV_BY_ZERO ||
2502  error_code == Executor::ERR_OUT_OF_TIME ||
2503  error_code == Executor::ERR_INTERRUPTED ||
2505  return error_code;
2506  }
2507 
2508  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2509  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2510  results =
2511  query_exe_context->getRowSet(ra_exe_unit, query_exe_context->query_mem_desc_);
2512  CHECK(results);
2513  results->holdLiterals(hoist_buf);
2514  }
2515  if (error_code < 0 && render_allocator_map_ptr) {
2516  // More rows passed the filter than available slots. We don't have a count to check,
2517  // so assume we met the limit if a scan limit is set
2518  if (scan_limit != 0) {
2519  return 0;
2520  } else {
2521  return error_code;
2522  }
2523  }
2524  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
2525  return error_code; // unlucky, not enough results and we ran out of slots
2526  }
2527 
2528  return 0;
2529 }
2530 
2531 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
2532  const int device_id) {
2533  std::vector<int64_t> table_ptrs;
2534  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
2535  for (auto hash_table : join_hash_tables) {
2536  if (!hash_table) {
2537  CHECK(table_ptrs.empty());
2538  return {};
2539  }
2540  table_ptrs.push_back(hash_table->getJoinHashBuffer(
2541  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
2542  }
2543  return table_ptrs;
2544 }
2545 
2546 namespace {
2547 
2548 template <class T>
2549 int64_t insert_one_dict_str(T* col_data,
2550  const std::string& columnName,
2551  const SQLTypeInfo& columnType,
2552  const Analyzer::Constant* col_cv,
2553  const Catalog_Namespace::Catalog& catalog) {
2554  if (col_cv->get_is_null()) {
2555  *col_data = inline_fixed_encoding_null_val(columnType);
2556  } else {
2557  const int dict_id = columnType.get_comp_param();
2558  const auto col_datum = col_cv->get_constval();
2559  const auto& str = *col_datum.stringval;
2560  const auto dd = catalog.getMetadataForDict(dict_id);
2561  CHECK(dd && dd->stringDict);
2562  int32_t str_id = dd->stringDict->getOrAdd(str);
2563  if (!dd->dictIsTemp) {
2564  const auto checkpoint_ok = dd->stringDict->checkpoint();
2565  if (!checkpoint_ok) {
2566  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2567  columnName);
2568  }
2569  }
2570  const bool invalid = str_id > max_valid_int_value<T>();
2571  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2572  if (invalid) {
2573  LOG(ERROR) << "Could not encode string: " << str
2574  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2575  << " bits. Will store NULL instead.";
2576  }
2577  str_id = inline_fixed_encoding_null_val(columnType);
2578  }
2579  *col_data = str_id;
2580  }
2581  return *col_data;
2582 }
2583 
2584 template <class T>
2585 int64_t insert_one_dict_str(T* col_data,
2586  const ColumnDescriptor* cd,
2587  const Analyzer::Constant* col_cv,
2588  const Catalog_Namespace::Catalog& catalog) {
2589  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2590 }
2591 
2592 } // namespace
2593 
2594 namespace Importer_NS {
2595 
2596 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti);
2597 
2598 } // namespace Importer_NS
2599 
2601  const auto plan = root_plan->get_plan();
2602  CHECK(plan);
2603  const auto values_plan = dynamic_cast<const Planner::ValuesScan*>(plan);
2604  if (!values_plan) {
2605  throw std::runtime_error(
2606  "Only simple INSERT of immediate tuples is currently supported");
2607  }
2608  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
2609  const auto& targets = values_plan->get_targetlist();
2610  const int table_id = root_plan->get_result_table_id();
2611  const auto& col_id_list = root_plan->get_result_col_list();
2612  std::vector<const ColumnDescriptor*> col_descriptors;
2613  std::vector<int> col_ids;
2614  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2615  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2616  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2617  auto& cat = root_plan->getCatalog();
2618  const auto table_descriptor = cat.getMetadataForTable(table_id);
2619  const auto shard_tables = cat.getPhysicalTablesDescriptors(table_descriptor);
2620  const TableDescriptor* shard{nullptr};
2621  for (const int col_id : col_id_list) {
2622  const auto cd = get_column_descriptor(col_id, table_id, cat);
2623  const auto col_enc = cd->columnType.get_compression();
2624  if (cd->columnType.is_string()) {
2625  switch (col_enc) {
2626  case kENCODING_NONE: {
2627  auto it_ok =
2628  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2629  CHECK(it_ok.second);
2630  break;
2631  }
2632  case kENCODING_DICT: {
2633  const auto dd = cat.getMetadataForDict(cd->columnType.get_comp_param());
2634  CHECK(dd);
2635  const auto it_ok = col_buffers.emplace(
2636  col_id, std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_size()]));
2637  CHECK(it_ok.second);
2638  break;
2639  }
2640  default:
2641  CHECK(false);
2642  }
2643  } else if (cd->columnType.is_geometry()) {
2644  auto it_ok =
2645  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2646  CHECK(it_ok.second);
2647  } else if (cd->columnType.is_array()) {
2648  auto it_ok =
2649  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2650  CHECK(it_ok.second);
2651  } else {
2652  const auto it_ok = col_buffers.emplace(
2653  col_id,
2654  std::unique_ptr<uint8_t[]>(
2655  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2656  // the buffer
2657  CHECK(it_ok.second);
2658  }
2659  col_descriptors.push_back(cd);
2660  col_ids.push_back(col_id);
2661  }
2662  size_t col_idx = 0;
2664  insert_data.databaseId = cat.getCurrentDB().dbId;
2665  insert_data.tableId = table_id;
2666  int64_t int_col_val{0};
2667  for (auto target_entry : targets) {
2668  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2669  if (!col_cv) {
2670  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2671  CHECK(col_cast);
2672  CHECK_EQ(kCAST, col_cast->get_optype());
2673  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2674  }
2675  CHECK(col_cv);
2676  const auto cd = col_descriptors[col_idx];
2677  auto col_datum = col_cv->get_constval();
2678  auto col_type = cd->columnType.get_type();
2679  uint8_t* col_data_bytes{nullptr};
2680  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2681  (!cd->columnType.is_string() ||
2682  cd->columnType.get_compression() == kENCODING_DICT)) {
2683  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2684  CHECK(col_data_bytes_it != col_buffers.end());
2685  col_data_bytes = col_data_bytes_it->second.get();
2686  }
2687  switch (col_type) {
2688  case kBOOLEAN: {
2689  auto col_data = col_data_bytes;
2690  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2691  : (col_datum.boolval ? 1 : 0);
2692  break;
2693  }
2694  case kTINYINT: {
2695  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2696  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2697  : col_datum.tinyintval;
2698  int_col_val = col_datum.tinyintval;
2699  break;
2700  }
2701  case kSMALLINT: {
2702  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2703  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2704  : col_datum.smallintval;
2705  int_col_val = col_datum.smallintval;
2706  break;
2707  }
2708  case kINT: {
2709  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2710  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2711  : col_datum.intval;
2712  int_col_val = col_datum.intval;
2713  break;
2714  }
2715  case kBIGINT:
2716  case kDECIMAL:
2717  case kNUMERIC: {
2718  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2719  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2720  : col_datum.bigintval;
2721  int_col_val = col_datum.bigintval;
2722  break;
2723  }
2724  case kFLOAT: {
2725  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2726  *col_data = col_datum.floatval;
2727  break;
2728  }
2729  case kDOUBLE: {
2730  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2731  *col_data = col_datum.doubleval;
2732  break;
2733  }
2734  case kTEXT:
2735  case kVARCHAR:
2736  case kCHAR: {
2737  switch (cd->columnType.get_compression()) {
2738  case kENCODING_NONE:
2739  str_col_buffers[col_ids[col_idx]].push_back(
2740  col_datum.stringval ? *col_datum.stringval : "");
2741  break;
2742  case kENCODING_DICT: {
2743  switch (cd->columnType.get_size()) {
2744  case 1:
2745  int_col_val = insert_one_dict_str(
2746  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat);
2747  break;
2748  case 2:
2749  int_col_val = insert_one_dict_str(
2750  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat);
2751  break;
2752  case 4:
2753  int_col_val = insert_one_dict_str(
2754  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat);
2755  break;
2756  default:
2757  CHECK(false);
2758  }
2759  break;
2760  }
2761  default:
2762  CHECK(false);
2763  }
2764  break;
2765  }
2766  case kTIME:
2767  case kTIMESTAMP:
2768  case kDATE: {
2769  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2770  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2771  : col_datum.bigintval;
2772  break;
2773  }
2774  case kARRAY: {
2775  const auto is_null = col_cv->get_is_null();
2776  const auto size = cd->columnType.get_size();
2777  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2778  if (is_null) {
2779  if (size > 0) {
2780  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2781  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2782  throw std::runtime_error("Column " + cd->columnName +
2783  " doesn't accept NULL values");
2784  }
2785  int8_t* buf = (int8_t*)checked_malloc(size);
2786  put_null_array(static_cast<void*>(buf), elem_ti, "");
2787  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2788  p += elem_ti.get_size()) {
2789  put_null(static_cast<void*>(p), elem_ti, "");
2790  }
2791  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2792  } else {
2793  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2794  }
2795  break;
2796  }
2797  const auto l = col_cv->get_value_list();
2798  size_t len = l.size() * elem_ti.get_size();
2799  if (size > 0 && static_cast<size_t>(size) != len) {
2800  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2801  std::to_string(size / elem_ti.get_size()) +
2802  " values, " + "received " + std::to_string(l.size()));
2803  }
2804  if (elem_ti.is_string()) {
2805  CHECK(kENCODING_DICT == elem_ti.get_compression());
2806  CHECK(4 == elem_ti.get_size());
2807 
2808  int8_t* buf = (int8_t*)checked_malloc(len);
2809  int32_t* p = reinterpret_cast<int32_t*>(buf);
2810 
2811  int elemIndex = 0;
2812  for (auto& e : l) {
2813  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2814  CHECK(c);
2815 
2816  int_col_val =
2817  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat);
2818 
2819  elemIndex++;
2820  }
2821  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2822 
2823  } else {
2824  int8_t* buf = (int8_t*)checked_malloc(len);
2825  int8_t* p = buf;
2826  for (auto& e : l) {
2827  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2828  CHECK(c);
2829  p = Importer_NS::appendDatum(p, c->get_constval(), elem_ti);
2830  }
2831  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2832  }
2833  break;
2834  }
2835  case kPOINT:
2836  case kLINESTRING:
2837  case kPOLYGON:
2838  case kMULTIPOLYGON:
2839  str_col_buffers[col_ids[col_idx]].push_back(
2840  col_datum.stringval ? *col_datum.stringval : "");
2841  break;
2842  default:
2843  CHECK(false);
2844  }
2845  ++col_idx;
2846  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2847  const auto shard_count = shard_tables.size();
2848  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2849  shard = shard_tables[shard_idx];
2850  }
2851  }
2852  for (const auto& kv : col_buffers) {
2853  insert_data.columnIds.push_back(kv.first);
2854  DataBlockPtr p;
2855  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2856  insert_data.data.push_back(p);
2857  }
2858  for (auto& kv : str_col_buffers) {
2859  insert_data.columnIds.push_back(kv.first);
2860  DataBlockPtr p;
2861  p.stringsPtr = &kv.second;
2862  insert_data.data.push_back(p);
2863  }
2864  for (auto& kv : arr_col_buffers) {
2865  insert_data.columnIds.push_back(kv.first);
2866  DataBlockPtr p;
2867  p.arraysPtr = &kv.second;
2868  insert_data.data.push_back(p);
2869  }
2870  insert_data.numRows = 1;
2871  if (shard) {
2872  shard->fragmenter->insertData(insert_data);
2873  } else {
2874  table_descriptor->fragmenter->insertData(insert_data);
2875  }
2876 }
2877 
2878 void Executor::nukeOldState(const bool allow_lazy_fetch,
2879  const std::vector<InputTableInfo>& query_infos,
2880  const RelAlgExecutionUnit* ra_exe_unit) {
2881  const bool contains_left_deep_outer_join =
2882  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
2883  ra_exe_unit->join_quals.end(),
2884  [](const JoinCondition& join_condition) {
2885  return join_condition.type == JoinType::LEFT;
2886  }) != ra_exe_unit->join_quals.end();
2887  cgen_state_.reset(new CgenState(query_infos, contains_left_deep_outer_join));
2888  plan_state_.reset(
2889  new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join, this));
2890 }
2891 
2892 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
2893  const std::vector<InputTableInfo>& query_infos) {
2894  const auto ld_count = input_descs.size();
2895  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
2896  for (size_t i = 0; i < ld_count; ++i) {
2897  CHECK_LT(i, query_infos.size());
2898  const auto frag_count = query_infos[i].info.fragments.size();
2899  if (i > 0) {
2900  cgen_state_->frag_offsets_.push_back(nullptr);
2901  } else {
2902  if (frag_count > 1) {
2903  cgen_state_->frag_offsets_.push_back(
2904  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
2905  } else {
2906  cgen_state_->frag_offsets_.push_back(nullptr);
2907  }
2908  }
2909  }
2910 }
2911 
2913  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
2914  const std::vector<InputTableInfo>& query_infos,
2915  const MemoryLevel memory_level,
2916  const JoinHashTableInterface::HashType preferred_hash_type,
2917  ColumnCacheMap& column_cache) {
2918  std::shared_ptr<JoinHashTableInterface> join_hash_table;
2919  const int device_count = deviceCountForMemoryLevel(memory_level);
2920  CHECK_GT(device_count, 0);
2921  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
2922  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
2923  }
2924  try {
2925  if (qual_bin_oper->is_overlaps_oper()) {
2926  join_hash_table = OverlapsJoinHashTable::getInstance(
2927  qual_bin_oper, query_infos, memory_level, device_count, column_cache, this);
2928  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2929  qual_bin_oper->get_left_operand())) {
2930  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
2931  query_infos,
2932  memory_level,
2933  preferred_hash_type,
2934  device_count,
2935  column_cache,
2936  this);
2937  } else {
2938  try {
2939  join_hash_table = JoinHashTable::getInstance(qual_bin_oper,
2940  query_infos,
2941  memory_level,
2942  preferred_hash_type,
2943  device_count,
2944  column_cache,
2945  this);
2946  } catch (TooManyHashEntries&) {
2947  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
2948  CHECK_EQ(join_quals.size(), size_t(1));
2949  const auto join_qual =
2950  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
2951  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
2952  query_infos,
2953  memory_level,
2954  preferred_hash_type,
2955  device_count,
2956  column_cache,
2957  this);
2958  }
2959  }
2960  CHECK(join_hash_table);
2961  return {join_hash_table, ""};
2962  } catch (const HashJoinFail& e) {
2963  return {nullptr, e.what()};
2964  }
2965  CHECK(false);
2966  return {nullptr, ""};
2967 }
2968 
2969 int8_t Executor::warpSize() const {
2970  CHECK(catalog_);
2971  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2972  CHECK(cuda_mgr);
2973  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2974  CHECK(!dev_props.empty());
2975  return dev_props.front().warpSize;
2976 }
2977 
2978 unsigned Executor::gridSize() const {
2979  CHECK(catalog_);
2980  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2981  CHECK(cuda_mgr);
2982  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2983  return grid_size_x_ ? grid_size_x_ : 2 * dev_props.front().numMPs;
2984 }
2985 
2986 unsigned Executor::blockSize() const {
2987  CHECK(catalog_);
2988  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2989  CHECK(cuda_mgr);
2990  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2991  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
2992 }
2993 
2994 int64_t Executor::deviceCycles(int milliseconds) const {
2995  CHECK(catalog_);
2996  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2997  CHECK(cuda_mgr);
2998  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2999  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3000 }
3001 
3002 llvm::Value* Executor::castToFP(llvm::Value* val) {
3003  if (!val->getType()->isIntegerTy()) {
3004  return val;
3005  }
3006 
3007  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
3008  llvm::Type* dest_ty{nullptr};
3009  switch (val_width) {
3010  case 32:
3011  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
3012  break;
3013  case 64:
3014  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
3015  break;
3016  default:
3017  CHECK(false);
3018  }
3019  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3020 }
3021 
3022 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3023  CHECK(val->getType()->isPointerTy());
3024 
3025  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3026  const auto val_type = val_ptr_type->getElementType();
3027  size_t val_width = 0;
3028  if (val_type->isIntegerTy()) {
3029  val_width = val_type->getIntegerBitWidth();
3030  } else {
3031  if (val_type->isFloatTy()) {
3032  val_width = 32;
3033  } else {
3034  CHECK(val_type->isDoubleTy());
3035  val_width = 64;
3036  }
3037  }
3038  CHECK_LT(size_t(0), val_width);
3039  if (bitWidth == val_width) {
3040  return val;
3041  }
3042  return cgen_state_->ir_builder_.CreateBitCast(
3043  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3044 }
3045 
3046 #define EXECUTE_INCLUDE
3047 #include "ArrayOps.cpp"
3048 #include "DateAdd.cpp"
3049 #include "StringFunctions.cpp"
3050 #undef EXECUTE_INCLUDE
3051 
3053  auto ra_exe_unit_with_deleted = ra_exe_unit;
3054  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3055  if (input_table.getSourceType() != InputSourceType::TABLE) {
3056  continue;
3057  }
3058  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3059  CHECK(td);
3060  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3061  if (!deleted_cd) {
3062  continue;
3063  }
3064  CHECK(deleted_cd->columnType.is_boolean());
3065  // check deleted column is not already present
3066  bool found = false;
3067  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3068  if (input_col.get()->getColId() == deleted_cd->columnId &&
3069  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3070  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3071  found = true;
3072  }
3073  }
3074  if (!found) {
3075  // add deleted column
3076  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3077  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3078  }
3079  }
3080  return ra_exe_unit_with_deleted;
3081 }
3082 
3083 namespace {
3084 
3085 int64_t get_hpt_scaled_value(const int64_t& val,
3086  const int32_t& ldim,
3087  const int32_t& rdim) {
3088  CHECK(ldim != rdim);
3089  return ldim > rdim ? val / DateTimeUtils::get_timestamp_precision_scale(ldim - rdim)
3090  : val * DateTimeUtils::get_timestamp_precision_scale(rdim - ldim);
3091 }
3092 
3093 } // namespace
3094 
3095 std::pair<bool, int64_t> Executor::skipFragment(
3096  const InputDescriptor& table_desc,
3097  const Fragmenter_Namespace::FragmentInfo& fragment,
3098  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3099  const std::vector<uint64_t>& frag_offsets,
3100  const size_t frag_idx) {
3101  const int table_id = table_desc.getTableId();
3102  for (const auto simple_qual : simple_quals) {
3103  const auto comp_expr =
3104  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3105  if (!comp_expr) {
3106  // is this possible?
3107  return {false, -1};
3108  }
3109  const auto lhs = comp_expr->get_left_operand();
3110  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3111  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3112  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3113  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3114  if (lhs_uexpr) {
3115  CHECK(lhs_uexpr->get_optype() ==
3116  kCAST); // We should have only been passed a cast expression
3117  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3118  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3119  continue;
3120  }
3121  } else {
3122  continue;
3123  }
3124  }
3125  const auto rhs = comp_expr->get_right_operand();
3126  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3127  if (!rhs_const) {
3128  // is this possible?
3129  return {false, -1};
3130  }
3131  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3132  continue;
3133  }
3134  const int col_id = lhs_col->get_column_id();
3135  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3136  int64_t chunk_min{0};
3137  int64_t chunk_max{0};
3138  bool is_rowid{false};
3139  size_t start_rowid{0};
3140  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3141  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3142  if (cd->isVirtualCol) {
3143  CHECK(cd->columnName == "rowid");
3144  const auto& table_generation = getTableGeneration(table_id);
3145  start_rowid = table_generation.start_rowid;
3146  chunk_min = frag_offsets[frag_idx] + start_rowid;
3147  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3148  is_rowid = true;
3149  }
3150  } else {
3151  const auto& chunk_type = lhs_col->get_type_info();
3152  chunk_min = extract_min_stat(chunk_meta_it->second.chunkStats, chunk_type);
3153  chunk_max = extract_max_stat(chunk_meta_it->second.chunkStats, chunk_type);
3154  }
3155  if (lhs->get_type_info().is_timestamp() &&
3156  (lhs_col->get_type_info().get_dimension() !=
3157  rhs_const->get_type_info().get_dimension()) &&
3158  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3159  rhs_const->get_type_info().is_high_precision_timestamp())) {
3160  // If original timestamp lhs col has different precision,
3161  // column metadata holds value in original precision
3162  // therefore adjust value to match rhs precision
3163  const auto lhs_dimen = lhs_col->get_type_info().get_dimension();
3164  const auto rhs_dimen = rhs_const->get_type_info().get_dimension();
3165  chunk_min = get_hpt_scaled_value(chunk_min, lhs_dimen, rhs_dimen);
3166  chunk_max = get_hpt_scaled_value(chunk_max, lhs_dimen, rhs_dimen);
3167  }
3168  CodeGenerator code_generator(this);
3169  const auto rhs_val = code_generator.codegenIntConst(rhs_const)->getSExtValue();
3170  switch (comp_expr->get_optype()) {
3171  case kGE:
3172  if (chunk_max < rhs_val) {
3173  return {true, -1};
3174  }
3175  break;
3176  case kGT:
3177  if (chunk_max <= rhs_val) {
3178  return {true, -1};
3179  }
3180  break;
3181  case kLE:
3182  if (chunk_min > rhs_val) {
3183  return {true, -1};
3184  }
3185  break;
3186  case kLT:
3187  if (chunk_min >= rhs_val) {
3188  return {true, -1};
3189  }
3190  break;
3191  case kEQ:
3192  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3193  return {true, -1};
3194  } else if (is_rowid) {
3195  return {false, rhs_val - start_rowid};
3196  }
3197  break;
3198  default:
3199  break;
3200  }
3201  }
3202  return {false, -1};
3203 }
3204 
3205 /*
3206  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3207  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3208  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3209  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3210  * if at least one of these skipFragment calls return a true statment in its first value.
3211  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3212  * second value only if its first value is "false".
3213  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3214  * i.e., we only skip if none of the quals trigger the code to update the
3215  * rowid_lookup_key
3216  * - Only AND operations are valid and considered:
3217  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3218  * the skip
3219  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3220  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3221  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3222  * implemented differently, which cause the first one to skip correctly, but the second
3223  * one will not skip.
3224  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3225  * possible
3226  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3227  * projection, no skipping
3228  */
3229 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3230  const InputDescriptor& table_desc,
3231  const RelAlgExecutionUnit& ra_exe_unit,
3232  const Fragmenter_Namespace::FragmentInfo& fragment,
3233  const std::vector<uint64_t>& frag_offsets,
3234  const size_t frag_idx) {
3235  std::pair<bool, int64_t> skip_frag{false, -1};
3236  for (auto& inner_join : ra_exe_unit.join_quals) {
3237  if (inner_join.type != JoinType::INNER) {
3238  continue;
3239  }
3240 
3241  // extracting all the conjunctive simple_quals from the quals stored for the inner
3242  // join
3243  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3244  for (auto& qual : inner_join.quals) {
3245  auto temp_qual = qual_to_conjunctive_form(qual);
3246  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3247  temp_qual.simple_quals.begin(),
3248  temp_qual.simple_quals.end());
3249  }
3250  auto temp_skip_frag = skipFragment(
3251  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3252  if (temp_skip_frag.second != -1) {
3253  skip_frag.second = temp_skip_frag.second;
3254  return skip_frag;
3255  } else {
3256  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3257  }
3258  }
3259  return skip_frag;
3260 }
3261 
3263  const std::unordered_set<PhysicalInput>& phys_inputs) {
3264  AggregatedColRange agg_col_range_cache;
3265  CHECK(catalog_);
3266  std::unordered_set<int> phys_table_ids;
3267  for (const auto& phys_input : phys_inputs) {
3268  phys_table_ids.insert(phys_input.table_id);
3269  }
3270  std::vector<InputTableInfo> query_infos;
3271  for (const int table_id : phys_table_ids) {
3272  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3273  }
3274  for (const auto& phys_input : phys_inputs) {
3275  const auto cd =
3276  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3277  CHECK(cd);
3278  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3279  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3280  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3281  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3282  agg_col_range_cache.setColRange(phys_input, col_range);
3283  }
3284  }
3285  return agg_col_range_cache;
3286 }
3287 
3289  const std::unordered_set<PhysicalInput>& phys_inputs) {
3290  StringDictionaryGenerations string_dictionary_generations;
3291  CHECK(catalog_);
3292  for (const auto& phys_input : phys_inputs) {
3293  const auto cd =
3294  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3295  CHECK(cd);
3296  const auto& col_ti =
3297  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3298  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3299  const int dict_id = col_ti.get_comp_param();
3300  const auto dd = catalog_->getMetadataForDict(dict_id);
3301  CHECK(dd && dd->stringDict);
3302  string_dictionary_generations.setGeneration(dict_id,
3303  dd->stringDict->storageEntryCount());
3304  }
3305  }
3306  return string_dictionary_generations;
3307 }
3308 
3310  std::unordered_set<int> phys_table_ids) {
3311  TableGenerations table_generations;
3312  for (const int table_id : phys_table_ids) {
3313  const auto table_info = getTableInfo(table_id);
3314  table_generations.setGeneration(
3315  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
3316  }
3317  return table_generations;
3318 }
3319 
3320 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3321  const std::unordered_set<int>& phys_table_ids) {
3322  CHECK(catalog_);
3323  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
3326  table_generations_ = computeTableGenerations(phys_table_ids);
3327 }
3328 
3329 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
3330 std::mutex Executor::execute_mutex_;
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr *> &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1480
int32_t getIdOfString(const std::string &str) const
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
const std::string debug_dir_
Definition: Execute.h:954
bool is_agg(const Analyzer::Expr *expr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr *> &target_exprs) const
Definition: Execute.cpp:284
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:1939
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3262
SQLAgg
Definition: sqldefs.h:71
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:201
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:83
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1676
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1930
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
int get_physical_coord_cols() const
Definition: sqltypes.h:362
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:871
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:91
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static auto getMetaTypeClass(SQL_TYPE_INFO const &sql_type_info) -> MetaTypeClassContainer
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:3002
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:219
bool is_time() const
Definition: sqltypes.h:483
double g_bump_allocator_step_reduction
Definition: Execute.cpp:100
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &)
Definition: Execute.cpp:2024
int get_column_id() const
Definition: Analyzer.h:195
ssize_t getGeneration(const uint32_t id) const
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:980
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
Definition: sqltypes.h:52
bool is_fp() const
Definition: sqltypes.h:481
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Definition: Execute.h:940
void setEntryCount(const size_t val)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1077
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
int get_result_table_id() const
Definition: Planner.h:291
bool g_cluster
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:90
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
std::unique_ptr< llvm::Module > udf_gpu_module
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t *> &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2179
bool g_enable_direct_columnarization
Definition: Execute.cpp:101
void dispatchFragments(const std::function< void(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)> dispatch, const ExecutionDispatch &execution_dispatch, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, QueryFragmentDescriptor &fragment_descriptor, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1702
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1011
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1356
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:127
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
const std::vector< uint64_t > & getFragOffsets() const
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:957
static const int max_gpu_count
Definition: Execute.h:932
std::unordered_map< int, CgenState::LiteralValues > literal_values
Definition: Execute.h:453
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:190
int64_t float_to_double_bin(int32_t val, bool nullable=false)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
static const size_t code_cache_size
Definition: Execute.h:950
#define LOG(tag)
Definition: Logger.h:188
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:66
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const InputTableInfo &table_info, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
int64_t insert_one_dict_str(T *col_data, const ColumnDescriptor *cd, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:2585
bool g_enable_columnar_output
Definition: Execute.cpp:86
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:847
InputSourceType getSourceType() const
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:968
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
void assignFragsToMultiDispatch(DISPATCH_FCN f) const
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2336
int getTableId() const
unsigned gridSize() const
Definition: Execute.cpp:2978
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:300
Definition: sqldefs.h:36
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr *> &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2269
const ExecutorOptLevel opt_level_
std::string join(T const &container, std::string const &delim)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3309
bool is_varlen() const
Definition: sqltypes.h:491
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:230
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
OutVecOwner(const std::vector< int64_t *> &out_vec)
Definition: Execute.cpp:2161
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3052
#define CHECK_GE(x, y)
Definition: Logger.h:206
int64_t const int32_t sz
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:611
Definition: sqldefs.h:49
Definition: sqldefs.h:30
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:301
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:85
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:962
ResultSetPtr collectAllDeviceResults(ExecutionDispatch &execution_dispatch, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1517
std::shared_ptr< ResultSet > ResultSetPtr
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:914
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
bool g_enable_experimental_string_functions
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:981
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:121
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:76
uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:936
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:238
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:458
static void invalidateCaches()
int64_t getAggInitValForIndex(const size_t index) const
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3022
size_t cuda_block_size
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
int getDeviceCount() const
Definition: CudaMgr.h:88
const std::vector< InputDescriptor > input_descs
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define CHECK_GT(x, y)
Definition: Logger.h:205
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Container for compilation results and assorted options for a single execution unit.
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1607
std::vector< FragmentsPerTable > FragmentsList
ReductionCode codegen() const
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:298
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:87
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:157
int8_t get_min_byte_width()
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:78
void executeSimpleInsert(const Planner::RootPlan *root_plan)
Definition: Execute.cpp:2600
const Executor * getExecutor() const
ExecutorDispatchMode
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:2994
bool useCudaBuffers() const
Definition: RenderInfo.cpp:66
static const size_t high_scan_limit
Definition: Execute.h:409
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:807
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
CodeCache gpu_code_cache_
Definition: Execute.h:946
Definition: sqldefs.h:73
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
std::unique_ptr< QueryMemoryInitializer > query_buffers_
const InputDescriptor & getScanDesc() const
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:2531
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:2892
bool g_null_div_by_zero
Definition: Execute.cpp:75
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:108
QueryFeatureDescriptor query_features
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:984
Datum get_constval() const
Definition: Analyzer.h:329
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:232
const size_t limit
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:971
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
bool is_integer() const
Definition: sqltypes.h:479
static SysCatalog & instance()
Definition: SysCatalog.h:257
const bool allow_multifrag
bool g_from_table_reordering
Definition: Execute.cpp:77
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:116
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:242
Classes representing a parse tree.
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:969
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:170
const Catalog_Namespace::Catalog & getCatalog() const
Definition: Planner.h:293
const SortInfo sort_info
void init(LogOptions const &log_opts)
Definition: Logger.cpp:272
std::mutex str_dict_mutex_
Definition: Execute.h:941
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:958
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
#define INJECT_TIMER(DESC)
Definition: measure.h:91
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:329
#define CHECK_NE(x, y)
Definition: Logger.h:202
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:975
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:246
const JoinQualsPerNestingLevel join_quals
int getNestLevel() const
std::shared_timed_mutex mapd_shared_mutex
bool is_decimal() const
Definition: sqltypes.h:480
const std::string debug_file_
Definition: Execute.h:955
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:594
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:930
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:234
friend class QueryMemoryDescriptor
Definition: Execute.h:993
ExecutorDeviceType getDeviceType() const
size_t cuda_grid_size
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1409
size_t g_big_group_threshold
Definition: Execute.cpp:92
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:977
float g_filter_push_down_high_frac
Definition: Execute.cpp:84
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:930
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:929
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:979
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
void setGeneration(const uint32_t id, const size_t generation)
size_t g_max_memory_allocation_size
Definition: Execute.cpp:95
const bool register_intel_jit_listener_
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:117
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1031
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const unsigned block_size_x_
Definition: Execute.h:952
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:2878
const unsigned grid_size_x_
Definition: Execute.h:953
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
specifies the content in-memory of a row in the column metadata table
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:659
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:89
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:985
const std::shared_ptr< Analyzer::Estimator > estimator
int64_t get_hpt_scaled_value(const int64_t &val, const int32_t &ldim, const int32_t &rdim)
Definition: Execute.cpp:3085
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:966
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:176
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:972
const RelAlgExecutionUnit & getExecutionUnit() const
ResultSetPtr resultsUnion(ExecutionDispatch &execution_dispatch)
Definition: Execute.cpp:821
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1101
SQLAgg agg_kind
Definition: TargetInfo.h:41
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1377
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:254
ExecutorDeviceType device_type_
std::string * stringval
Definition: sqltypes.h:134
bool isCPUOnly() const
Definition: Execute.cpp:208
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
bool g_enable_window_functions
Definition: Execute.cpp:93
int8_t groupColWidth(const size_t key_idx) const
Definition: sqldefs.h:34
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments *> &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition)
Definition: Execute.cpp:1826
bool is_boolean() const
Definition: sqltypes.h:484
size_t g_min_memory_allocation_size
Definition: Execute.cpp:96
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:604
bool get_is_null() const
Definition: Analyzer.h:328
const ExecutorExplainType explain_type_
#define CHECK_LT(x, y)
Definition: Logger.h:203
Definition: sqltypes.h:55
Definition: sqltypes.h:56
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:191
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2112
const Plan * get_plan() const
Definition: Planner.h:289
#define CHECK_LE(x, y)
Definition: Logger.h:204
size_t getNumBytesForFetchedRow() const
Definition: Execute.cpp:258
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2417
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:332
StringDictionaryGenerations string_dictionary_generations_
Definition: Execute.h:963
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:961
std::vector< std::pair< void *, void * > > native_functions
Definition: Execute.h:452
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1119
CodeCache cpu_code_cache_
Definition: Execute.h:945
int get_table_id() const
Definition: Analyzer.h:194
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3095
Definition: sqldefs.h:76
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2005
int8_t warpSize() const
Definition: Execute.cpp:2969
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
const int64_t const uint32_t const uint32_t const uint32_t const bool const bool const int32_t frag_idx
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1557
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:117
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3288
std::unique_ptr< llvm::Module > udf_cpu_module
Definition: sqltypes.h:44
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2410
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3229
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2126
TableGenerations table_generations_
Definition: Execute.h:964
llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant)
Definition: ConstantIR.cpp:88
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
void assignFragsToKernelDispatch(DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:567
bool g_cache_string_hash
Definition: Execute.cpp:88
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1023
bool g_enable_filter_push_down
Definition: Execute.cpp:82
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool is_number() const
Definition: sqltypes.h:482
bool g_enable_bump_allocator
Definition: Execute.cpp:99
static const int32_t ERR_SPECULATIVE_TOP_OOM
Definition: Execute.h:978
#define CHECK(condition)
Definition: Logger.h:193
#define DEBUG_TIMER(name)
Definition: Logger.h:299
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
MetaTypeClassHandler< SPECIALIZED_HANDLER, Geometry > GeoVsNonGeoClassHandler
const int8_t * literals
int32_t getOrAddTransient(const std::string &str)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:973
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t *> &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2240
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool is_geometry() const
Definition: sqltypes.h:489
bool g_allow_cpu_retry
Definition: Execute.cpp:74
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
const std::list< int > & get_result_col_list() const
Definition: Planner.h:292
bool g_enable_watchdog
Definition: Execute.cpp:71
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
ResultSetPtr collectAllDeviceShardedTopResults(ExecutionDispatch &execution_dispatch) const
Definition: Execute.cpp:1628
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1160
static bool typeSupportsRange(const SQLTypeInfo &ti)
size_t getColOffInBytes(const size_t col_idx) const
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1388
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:1958
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:77
Definition: sqltypes.h:48
const std::map< int, ChunkMetadata > & getChunkMetadataMap() const
SQLTypeInfo columnType
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:182
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
specifies the content in-memory of a row in the table metadata table
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1311
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:250
const bool with_dynamic_watchdog_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
Definition: Execute.h:906
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:213
int8_t * numbersPtr
Definition: sqltypes.h:140
const size_t offset
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1868
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:73
QueryDescriptionType getQueryDescriptionType() const
Definition: sqldefs.h:74
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
int cpu_threads()
Definition: thread_count.h:25
static std::mutex execute_mutex_
Definition: Execute.h:967
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1424
std::string columnName
Descriptor for the fragments required for a query.
Definition: sqldefs.h:72
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
bool is_string() const
Definition: sqltypes.h:477
ExpressionRange getColRange(const PhysicalInput &) const
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1691
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2912
#define IS_GEO(T)
Definition: sqltypes.h:167
const QueryMemoryDescriptor query_mem_desc_
#define VLOG(n)
Definition: Logger.h:283
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
Functions to support array operations used by the executor.
bool interrupted_
Definition: Execute.h:938
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int db_id_
Definition: Execute.h:957
const bool with_watchdog
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3320
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:122
void clearMetaInfoCache()
Definition: Execute.cpp:325
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:141
bool g_enable_table_functions
Definition: Execute.cpp:94
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
const TableGeneration & getGeneration(const uint32_t id) const
const TemporaryTables * temporary_tables_
Definition: Execute.h:959
unsigned blockSize() const
Definition: Execute.cpp:2986
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1382
const Expr * get_left_operand() const
Definition: Analyzer.h:436
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)