OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "BaselineJoinHashTable.h"
21 #include "CodeGenerator.h"
22 #include "ColumnFetcher.h"
25 #include "DynamicWatchdog.h"
26 #include "EquiJoinCondition.h"
27 #include "ErrorHandling.h"
28 #include "ExpressionRewrite.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
32 #include "JsonAccessors.h"
34 #include "OverlapsJoinHashTable.h"
35 #include "QueryRewrite.h"
36 #include "QueryTemplateGenerator.h"
37 #include "ResultSetReductionJIT.h"
38 #include "RuntimeFunctions.h"
39 #include "SpeculativeTopN.h"
40 
43 
44 #include "CudaMgr/CudaMgr.h"
46 #include "Parser/ParserNode.h"
48 #include "Shared/MapDParameters.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/scope.h"
53 #include "Shared/shard_key.h"
54 
55 #include "AggregatedColRange.h"
57 
58 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
59 #include <boost/filesystem/operations.hpp>
60 #include <boost/filesystem/path.hpp>
61 
62 #ifdef HAVE_CUDA
63 #include <cuda.h>
64 #endif // HAVE_CUDA
65 #include <future>
66 #include <memory>
67 #include <numeric>
68 #include <set>
69 #include <thread>
70 
71 bool g_enable_watchdog{false};
74 bool g_allow_cpu_retry{true};
75 bool g_null_div_by_zero{false};
79 extern bool g_enable_smem_group_by;
80 extern std::unique_ptr<llvm::Module> udf_gpu_module;
81 extern std::unique_ptr<llvm::Module> udf_cpu_module;
88 bool g_cache_string_hash{false};
89 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
92 size_t g_big_group_threshold{20000};
95 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
97  256}; // minimum memory allocation required for projection query output buffer
98  // without pre-flight count
103 
104 int const Executor::max_gpu_count;
105 
106 Executor::Executor(const int db_id,
107  const size_t block_size_x,
108  const size_t grid_size_x,
109  const std::string& debug_dir,
110  const std::string& debug_file,
111  ::QueryRenderer::QueryRenderManager* render_manager)
112  : cgen_state_(new CgenState({}, false))
114  , interrupted_(false)
115  , cpu_code_cache_(code_cache_size)
116  , gpu_code_cache_(code_cache_size)
117  , render_manager_(render_manager)
118  , block_size_x_(block_size_x)
119  , grid_size_x_(grid_size_x)
120  , debug_dir_(debug_dir)
121  , debug_file_(debug_file)
122  , db_id_(db_id)
123  , catalog_(nullptr)
124  , temporary_tables_(nullptr)
126 
127 std::shared_ptr<Executor> Executor::getExecutor(
128  const int db_id,
129  const std::string& debug_dir,
130  const std::string& debug_file,
131  const MapDParameters mapd_parameters,
132  ::QueryRenderer::QueryRenderManager* render_manager) {
133  INJECT_TIMER(getExecutor);
134  const auto executor_key = std::make_pair(db_id, render_manager);
135  {
136  mapd_shared_lock<mapd_shared_mutex> read_lock(executors_cache_mutex_);
137  auto it = executors_.find(executor_key);
138  if (it != executors_.end()) {
139  return it->second;
140  }
141  }
142  {
143  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
144  auto it = executors_.find(executor_key);
145  if (it != executors_.end()) {
146  return it->second;
147  }
148  auto executor = std::make_shared<Executor>(db_id,
149  mapd_parameters.cuda_block_size,
150  mapd_parameters.cuda_grid_size,
151  debug_dir,
152  debug_file,
153  render_manager);
154  auto it_ok = executors_.insert(std::make_pair(executor_key, executor));
155  CHECK(it_ok.second);
156  return executor;
157  }
158 }
159 
161  switch (memory_level) {
164  std::lock_guard<std::mutex> flush_lock(
165  execute_mutex_); // Don't flush memory while queries are running
166 
168  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
169  // The hash table cache uses CPU memory not managed by the buffer manager. In the
170  // future, we should manage these allocations with the buffer manager directly.
171  // For now, assume the user wants to purge the hash table cache when they clear
172  // CPU memory (currently used in ExecuteTest to lower memory pressure)
174  }
175  break;
176  }
177  default: {
178  throw std::runtime_error(
179  "Clearing memory levels other than the CPU level or GPU level is not "
180  "supported.");
181  }
182  }
183 }
184 
186  const int dict_id_in,
187  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
188  const bool with_generation) const {
189  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
190  CHECK(catalog_);
191  const auto dd = catalog_->getMetadataForDict(dict_id);
192  std::lock_guard<std::mutex> lock(str_dict_mutex_);
193  if (dd) {
194  CHECK(dd->stringDict);
195  CHECK_LE(dd->dictNBits, 32);
196  CHECK(row_set_mem_owner);
197  const auto generation = with_generation
198  ? string_dictionary_generations_.getGeneration(dict_id)
199  : ssize_t(-1);
200  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
201  }
202  CHECK_EQ(0, dict_id);
203  if (!lit_str_dict_proxy_) {
204  std::shared_ptr<StringDictionary> tsd =
205  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
206  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
207  }
208  return lit_str_dict_proxy_.get();
209 }
210 
211 bool Executor::isCPUOnly() const {
212  CHECK(catalog_);
213  return !catalog_->getDataMgr().getCudaMgr();
214 }
215 
217  const Analyzer::ColumnVar* col_var) const {
219  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
220 }
221 
223  const Analyzer::ColumnVar* col_var,
224  int n) const {
225  const auto cd = getColumnDescriptor(col_var);
226  if (!cd || n > cd->columnType.get_physical_cols()) {
227  return nullptr;
228  }
230  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
231 }
232 
234  return catalog_;
235 }
236 
238  catalog_ = catalog;
239 }
240 
241 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
242  return row_set_mem_owner_;
243 }
244 
246  return temporary_tables_;
247 }
248 
250  return input_table_info_cache_.getTableInfo(table_id);
251 }
252 
253 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
254  return table_generations_.getGeneration(table_id);
255 }
256 
258  return agg_col_range_cache_.getColRange(phys_input);
259 }
260 
262  size_t num_bytes = 0;
263  if (!plan_state_) {
264  return 0;
265  }
266  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
267  if (fetched_col_pair.first < 0) {
268  num_bytes += 8;
269  } else {
270  const auto cd =
271  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
272  const auto& ti = cd->columnType;
273  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
274  ? 4
275  : ti.get_size();
276  if (sz < 0) {
277  // for varlen types, only account for the pointer/size for each row, for now
278  num_bytes += 16;
279  } else {
280  num_bytes += sz;
281  }
282  }
283  }
284  return num_bytes;
285 }
286 
287 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
288  const std::vector<Analyzer::Expr*>& target_exprs) const {
289  CHECK(plan_state_);
290  CHECK(catalog_);
291  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
292  for (const auto target_expr : target_exprs) {
293  if (!plan_state_->isLazyFetchColumn(target_expr)) {
294  col_lazy_fetch_info.emplace_back(
295  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
296  } else {
297  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
298  CHECK(col_var);
299  auto col_id = col_var->get_column_id();
300  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
301  auto cd = (col_var->get_table_id() > 0)
302  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
303  : nullptr;
304  if (cd && IS_GEO(cd->columnType.get_type())) {
305  // Geo coords cols will be processed in sequence. So we only need to track the
306  // first coords col in lazy fetch info.
307  {
308  auto cd0 =
309  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
310  auto col0_ti = cd0->columnType;
311  CHECK(!cd0->isVirtualCol);
312  auto col0_var = makeExpr<Analyzer::ColumnVar>(
313  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
314  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
315  col_lazy_fetch_info.emplace_back(
316  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
317  }
318  } else {
319  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
320  const auto& col_ti = col_var->get_type_info();
321  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
322  }
323  }
324  }
325  return col_lazy_fetch_info;
326 }
327 
329  input_table_info_cache_.clear();
330  agg_col_range_cache_.clear();
331  string_dictionary_generations_.clear();
332  table_generations_.clear();
333 }
334 
335 std::vector<int8_t> Executor::serializeLiterals(
336  const std::unordered_map<int, CgenState::LiteralValues>& literals,
337  const int device_id) {
338  if (literals.empty()) {
339  return {};
340  }
341  const auto dev_literals_it = literals.find(device_id);
342  CHECK(dev_literals_it != literals.end());
343  const auto& dev_literals = dev_literals_it->second;
344  size_t lit_buf_size{0};
345  std::vector<std::string> real_strings;
346  std::vector<std::vector<double>> double_array_literals;
347  std::vector<std::vector<int8_t>> align64_int8_array_literals;
348  std::vector<std::vector<int32_t>> int32_array_literals;
349  std::vector<std::vector<int8_t>> align32_int8_array_literals;
350  std::vector<std::vector<int8_t>> int8_array_literals;
351  for (const auto& lit : dev_literals) {
352  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
353  if (lit.which() == 7) {
354  const auto p = boost::get<std::string>(&lit);
355  CHECK(p);
356  real_strings.push_back(*p);
357  } else if (lit.which() == 8) {
358  const auto p = boost::get<std::vector<double>>(&lit);
359  CHECK(p);
360  double_array_literals.push_back(*p);
361  } else if (lit.which() == 9) {
362  const auto p = boost::get<std::vector<int32_t>>(&lit);
363  CHECK(p);
364  int32_array_literals.push_back(*p);
365  } else if (lit.which() == 10) {
366  const auto p = boost::get<std::vector<int8_t>>(&lit);
367  CHECK(p);
368  int8_array_literals.push_back(*p);
369  } else if (lit.which() == 11) {
370  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
371  CHECK(p);
372  if (p->second == 64) {
373  align64_int8_array_literals.push_back(p->first);
374  } else if (p->second == 32) {
375  align32_int8_array_literals.push_back(p->first);
376  } else {
377  CHECK(false);
378  }
379  }
380  }
381  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
382  throw TooManyLiterals();
383  }
384  int16_t crt_real_str_off = lit_buf_size;
385  for (const auto& real_str : real_strings) {
386  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
387  lit_buf_size += real_str.size();
388  }
389  if (double_array_literals.size() > 0) {
390  lit_buf_size = align(lit_buf_size, sizeof(double));
391  }
392  int16_t crt_double_arr_lit_off = lit_buf_size;
393  for (const auto& double_array_literal : double_array_literals) {
394  CHECK_LE(double_array_literal.size(),
395  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
396  lit_buf_size += double_array_literal.size() * sizeof(double);
397  }
398  if (align64_int8_array_literals.size() > 0) {
399  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
400  }
401  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
402  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
403  CHECK_LE(align64_int8_array_literals.size(),
404  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
405  lit_buf_size += align64_int8_array_literal.size();
406  }
407  if (int32_array_literals.size() > 0) {
408  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
409  }
410  int16_t crt_int32_arr_lit_off = lit_buf_size;
411  for (const auto& int32_array_literal : int32_array_literals) {
412  CHECK_LE(int32_array_literal.size(),
413  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
414  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
415  }
416  if (align32_int8_array_literals.size() > 0) {
417  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
418  }
419  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
420  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
421  CHECK_LE(align32_int8_array_literals.size(),
422  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
423  lit_buf_size += align32_int8_array_literal.size();
424  }
425  int16_t crt_int8_arr_lit_off = lit_buf_size;
426  for (const auto& int8_array_literal : int8_array_literals) {
427  CHECK_LE(int8_array_literal.size(),
428  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
429  lit_buf_size += int8_array_literal.size();
430  }
431  unsigned crt_real_str_idx = 0;
432  unsigned crt_double_arr_lit_idx = 0;
433  unsigned crt_align64_int8_arr_lit_idx = 0;
434  unsigned crt_int32_arr_lit_idx = 0;
435  unsigned crt_align32_int8_arr_lit_idx = 0;
436  unsigned crt_int8_arr_lit_idx = 0;
437  std::vector<int8_t> serialized(lit_buf_size);
438  size_t off{0};
439  for (const auto& lit : dev_literals) {
440  const auto lit_bytes = CgenState::literalBytes(lit);
441  off = CgenState::addAligned(off, lit_bytes);
442  switch (lit.which()) {
443  case 0: {
444  const auto p = boost::get<int8_t>(&lit);
445  CHECK(p);
446  serialized[off - lit_bytes] = *p;
447  break;
448  }
449  case 1: {
450  const auto p = boost::get<int16_t>(&lit);
451  CHECK(p);
452  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
453  break;
454  }
455  case 2: {
456  const auto p = boost::get<int32_t>(&lit);
457  CHECK(p);
458  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
459  break;
460  }
461  case 3: {
462  const auto p = boost::get<int64_t>(&lit);
463  CHECK(p);
464  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
465  break;
466  }
467  case 4: {
468  const auto p = boost::get<float>(&lit);
469  CHECK(p);
470  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
471  break;
472  }
473  case 5: {
474  const auto p = boost::get<double>(&lit);
475  CHECK(p);
476  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
477  break;
478  }
479  case 6: {
480  const auto p = boost::get<std::pair<std::string, int>>(&lit);
481  CHECK(p);
482  const auto str_id =
484  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
485  ->getOrAddTransient(p->first)
486  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
487  ->getIdOfString(p->first);
488  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
489  break;
490  }
491  case 7: {
492  const auto p = boost::get<std::string>(&lit);
493  CHECK(p);
494  int32_t off_and_len = crt_real_str_off << 16;
495  const auto& crt_real_str = real_strings[crt_real_str_idx];
496  off_and_len |= static_cast<int16_t>(crt_real_str.size());
497  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
498  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
499  ++crt_real_str_idx;
500  crt_real_str_off += crt_real_str.size();
501  break;
502  }
503  case 8: {
504  const auto p = boost::get<std::vector<double>>(&lit);
505  CHECK(p);
506  int32_t off_and_len = crt_double_arr_lit_off << 16;
507  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
508  int32_t len = crt_double_arr_lit.size();
509  CHECK_EQ((len >> 16), 0);
510  off_and_len |= static_cast<int16_t>(len);
511  int32_t double_array_bytesize = len * sizeof(double);
512  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
513  memcpy(&serialized[crt_double_arr_lit_off],
514  crt_double_arr_lit.data(),
515  double_array_bytesize);
516  ++crt_double_arr_lit_idx;
517  crt_double_arr_lit_off += double_array_bytesize;
518  break;
519  }
520  case 9: {
521  const auto p = boost::get<std::vector<int32_t>>(&lit);
522  CHECK(p);
523  int32_t off_and_len = crt_int32_arr_lit_off << 16;
524  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
525  int32_t len = crt_int32_arr_lit.size();
526  CHECK_EQ((len >> 16), 0);
527  off_and_len |= static_cast<int16_t>(len);
528  int32_t int32_array_bytesize = len * sizeof(int32_t);
529  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
530  memcpy(&serialized[crt_int32_arr_lit_off],
531  crt_int32_arr_lit.data(),
532  int32_array_bytesize);
533  ++crt_int32_arr_lit_idx;
534  crt_int32_arr_lit_off += int32_array_bytesize;
535  break;
536  }
537  case 10: {
538  const auto p = boost::get<std::vector<int8_t>>(&lit);
539  CHECK(p);
540  int32_t off_and_len = crt_int8_arr_lit_off << 16;
541  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
542  int32_t len = crt_int8_arr_lit.size();
543  CHECK_EQ((len >> 16), 0);
544  off_and_len |= static_cast<int16_t>(len);
545  int32_t int8_array_bytesize = len;
546  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
547  memcpy(&serialized[crt_int8_arr_lit_off],
548  crt_int8_arr_lit.data(),
549  int8_array_bytesize);
550  ++crt_int8_arr_lit_idx;
551  crt_int8_arr_lit_off += int8_array_bytesize;
552  break;
553  }
554  case 11: {
555  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
556  CHECK(p);
557  if (p->second == 64) {
558  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
559  const auto& crt_align64_int8_arr_lit =
560  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
561  int32_t len = crt_align64_int8_arr_lit.size();
562  CHECK_EQ((len >> 16), 0);
563  off_and_len |= static_cast<int16_t>(len);
564  int32_t align64_int8_array_bytesize = len;
565  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
566  memcpy(&serialized[crt_align64_int8_arr_lit_off],
567  crt_align64_int8_arr_lit.data(),
568  align64_int8_array_bytesize);
569  ++crt_align64_int8_arr_lit_idx;
570  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
571  } else if (p->second == 32) {
572  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
573  const auto& crt_align32_int8_arr_lit =
574  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
575  int32_t len = crt_align32_int8_arr_lit.size();
576  CHECK_EQ((len >> 16), 0);
577  off_and_len |= static_cast<int16_t>(len);
578  int32_t align32_int8_array_bytesize = len;
579  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
580  memcpy(&serialized[crt_align32_int8_arr_lit_off],
581  crt_align32_int8_arr_lit.data(),
582  align32_int8_array_bytesize);
583  ++crt_align32_int8_arr_lit_idx;
584  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
585  } else {
586  CHECK(false);
587  }
588  break;
589  }
590  default:
591  CHECK(false);
592  }
593  }
594  return serialized;
595 }
596 
597 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
598  if (device_type == ExecutorDeviceType::GPU) {
599  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
600  CHECK(cuda_mgr);
601  return cuda_mgr->getDeviceCount();
602  } else {
603  return 1;
604  }
605 }
606 
608  const Data_Namespace::MemoryLevel memory_level) const {
609  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
610  : deviceCount(ExecutorDeviceType::CPU);
611 }
612 
613 // TODO(alex): remove or split
614 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
615  const SQLTypeInfo& ti,
616  const int64_t agg_init_val,
617  const int8_t out_byte_width,
618  const int64_t* out_vec,
619  const size_t out_vec_sz,
620  const bool is_group_by,
621  const bool float_argument_input) {
622  switch (agg) {
623  case kAVG:
624  case kSUM:
625  if (0 != agg_init_val) {
626  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
627  int64_t agg_result = agg_init_val;
628  for (size_t i = 0; i < out_vec_sz; ++i) {
629  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
630  }
631  return {agg_result, 0};
632  } else {
633  CHECK(ti.is_fp());
634  switch (out_byte_width) {
635  case 4: {
636  int agg_result = static_cast<int32_t>(agg_init_val);
637  for (size_t i = 0; i < out_vec_sz; ++i) {
639  &agg_result,
640  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
641  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
642  }
643  const int64_t converted_bin =
644  float_argument_input
645  ? static_cast<int64_t>(agg_result)
646  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
647  return {converted_bin, 0};
648  break;
649  }
650  case 8: {
651  int64_t agg_result = agg_init_val;
652  for (size_t i = 0; i < out_vec_sz; ++i) {
654  &agg_result,
655  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
656  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
657  }
658  return {agg_result, 0};
659  break;
660  }
661  default:
662  CHECK(false);
663  }
664  }
665  }
666  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
667  int64_t agg_result = 0;
668  for (size_t i = 0; i < out_vec_sz; ++i) {
669  agg_result += out_vec[i];
670  }
671  return {agg_result, 0};
672  } else {
673  CHECK(ti.is_fp());
674  switch (out_byte_width) {
675  case 4: {
676  float r = 0.;
677  for (size_t i = 0; i < out_vec_sz; ++i) {
678  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
679  }
680  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
681  const int64_t converted_bin =
682  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
683  return {converted_bin, 0};
684  }
685  case 8: {
686  double r = 0.;
687  for (size_t i = 0; i < out_vec_sz; ++i) {
688  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
689  }
690  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
691  }
692  default:
693  CHECK(false);
694  }
695  }
696  break;
697  case kCOUNT: {
698  uint64_t agg_result = 0;
699  for (size_t i = 0; i < out_vec_sz; ++i) {
700  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
701  agg_result += out;
702  }
703  return {static_cast<int64_t>(agg_result), 0};
704  }
705  case kMIN: {
706  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
707  int64_t agg_result = agg_init_val;
708  for (size_t i = 0; i < out_vec_sz; ++i) {
709  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
710  }
711  return {agg_result, 0};
712  } else {
713  switch (out_byte_width) {
714  case 4: {
715  int32_t agg_result = static_cast<int32_t>(agg_init_val);
716  for (size_t i = 0; i < out_vec_sz; ++i) {
718  &agg_result,
719  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
720  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
721  }
722  const int64_t converted_bin =
723  float_argument_input
724  ? static_cast<int64_t>(agg_result)
725  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
726  return {converted_bin, 0};
727  }
728  case 8: {
729  int64_t agg_result = agg_init_val;
730  for (size_t i = 0; i < out_vec_sz; ++i) {
732  &agg_result,
733  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
734  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
735  }
736  return {agg_result, 0};
737  }
738  default:
739  CHECK(false);
740  }
741  }
742  }
743  case kMAX:
744  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
745  int64_t agg_result = agg_init_val;
746  for (size_t i = 0; i < out_vec_sz; ++i) {
747  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
748  }
749  return {agg_result, 0};
750  } else {
751  switch (out_byte_width) {
752  case 4: {
753  int32_t agg_result = static_cast<int32_t>(agg_init_val);
754  for (size_t i = 0; i < out_vec_sz; ++i) {
756  &agg_result,
757  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
758  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
759  }
760  const int64_t converted_bin =
761  float_argument_input ? static_cast<int64_t>(agg_result)
762  : float_to_double_bin(agg_result, !ti.get_notnull());
763  return {converted_bin, 0};
764  }
765  case 8: {
766  int64_t agg_result = agg_init_val;
767  for (size_t i = 0; i < out_vec_sz; ++i) {
769  &agg_result,
770  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
771  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
772  }
773  return {agg_result, 0};
774  }
775  default:
776  CHECK(false);
777  }
778  }
779  case kSAMPLE: {
780  int64_t agg_result = agg_init_val;
781  for (size_t i = 0; i < out_vec_sz; ++i) {
782  if (out_vec[i] != agg_init_val) {
783  agg_result = out_vec[i];
784  break;
785  }
786  }
787  return {agg_result, 0};
788  }
789  default:
790  CHECK(false);
791  }
792  abort();
793 }
794 
795 namespace {
796 
798  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
799  auto& first = results_per_device.front().first;
800  CHECK(first);
801  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
802  const auto& next = results_per_device[dev_idx].first;
803  CHECK(next);
804  first->append(*next);
805  }
806  return std::move(first);
807 }
808 
809 } // namespace
810 
812  auto& results_per_device = execution_dispatch.getFragmentResults();
813  if (results_per_device.empty()) {
814  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
815  std::vector<TargetInfo> targets;
816  for (const auto target_expr : ra_exe_unit.target_exprs) {
817  targets.push_back(get_target_info(target_expr, g_bigint_count));
818  }
819  return std::make_shared<ResultSet>(targets,
822  row_set_mem_owner_,
823  this);
824  }
825  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
826  std::sort(results_per_device.begin(),
827  results_per_device.end(),
828  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
829  CHECK_GE(lhs.second.size(), size_t(1));
830  CHECK_GE(rhs.second.size(), size_t(1));
831  return lhs.second.front() < rhs.second.front();
832  });
833 
834  return get_merged_result(results_per_device);
835 }
836 
838  const RelAlgExecutionUnit& ra_exe_unit,
839  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
840  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
841  const QueryMemoryDescriptor& query_mem_desc) const {
842  if (ra_exe_unit.estimator) {
843  return reduce_estimator_results(ra_exe_unit, results_per_device);
844  }
845 
846  if (results_per_device.empty()) {
847  std::vector<TargetInfo> targets;
848  for (const auto target_expr : ra_exe_unit.target_exprs) {
849  targets.push_back(get_target_info(target_expr, g_bigint_count));
850  }
851  return std::make_shared<ResultSet>(
852  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
853  }
854 
855  return reduceMultiDeviceResultSets(
856  results_per_device,
857  row_set_mem_owner,
858  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
859 }
860 
862  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
863  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
864  const QueryMemoryDescriptor& query_mem_desc) const {
865  auto timer = DEBUG_TIMER(__func__);
866  std::shared_ptr<ResultSet> reduced_results;
867 
868  const auto& first = results_per_device.front().first;
869 
870  if (query_mem_desc.getQueryDescriptionType() ==
872  results_per_device.size() > 1) {
873  const auto total_entry_count = std::accumulate(
874  results_per_device.begin(),
875  results_per_device.end(),
876  size_t(0),
877  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
878  const auto& r = rs.first;
879  return init + r->getQueryMemDesc().getEntryCount();
880  });
881  CHECK(total_entry_count);
882  auto query_mem_desc = first->getQueryMemDesc();
883  query_mem_desc.setEntryCount(total_entry_count);
884  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
887  row_set_mem_owner,
888  this);
889  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
890  reduced_results->initializeStorage();
891  switch (query_mem_desc.getEffectiveKeyWidth()) {
892  case 4:
893  first->getStorage()->moveEntriesToBuffer<int32_t>(
894  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
895  break;
896  case 8:
897  first->getStorage()->moveEntriesToBuffer<int64_t>(
898  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
899  break;
900  default:
901  CHECK(false);
902  }
903  } else {
904  reduced_results = first;
905  }
906 
907  const auto& this_result_set = results_per_device[0].first;
908  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
909  this_result_set->getTargetInfos(),
910  this_result_set->getTargetInitVals());
911  const auto reduction_code = reduction_jit.codegen();
912  for (size_t i = 1; i < results_per_device.size(); ++i) {
913  reduced_results->getStorage()->reduce(
914  *(results_per_device[i].first->getStorage()), {}, reduction_code);
915  }
916 
917  return reduced_results;
918 }
919 
921  const RelAlgExecutionUnit& ra_exe_unit,
922  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
923  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
924  const QueryMemoryDescriptor& query_mem_desc) const {
925  if (results_per_device.size() == 1) {
926  return std::move(results_per_device.front().first);
927  }
928  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
930  for (const auto& result : results_per_device) {
931  auto rows = result.first;
932  CHECK(rows);
933  if (!rows) {
934  continue;
935  }
936  SpeculativeTopNMap that(
937  *rows,
938  ra_exe_unit.target_exprs,
939  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
940  m.reduce(that);
941  }
942  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
943  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
944  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
945 }
946 
947 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
948  std::unordered_set<int> available_gpus;
949  if (cat.getDataMgr().gpusPresent()) {
950  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
951  CHECK_GT(gpu_count, 0);
952  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
953  available_gpus.insert(gpu_id);
954  }
955  }
956  return available_gpus;
957 }
958 
959 size_t get_context_count(const ExecutorDeviceType device_type,
960  const size_t cpu_count,
961  const size_t gpu_count) {
962  return device_type == ExecutorDeviceType::GPU ? gpu_count
963  : static_cast<size_t>(cpu_count);
964 }
965 
966 namespace {
967 
968 // Compute a very conservative entry count for the output buffer entry count using no
969 // other information than the number of tuples in each table and multiplying them
970 // together.
971 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
973  // Check for overflows since we're multiplying potentially big table sizes.
974  using checked_size_t = boost::multiprecision::number<
975  boost::multiprecision::cpp_int_backend<64,
976  64,
977  boost::multiprecision::unsigned_magnitude,
978  boost::multiprecision::checked,
979  void>>;
980  checked_size_t max_groups_buffer_entry_guess = 1;
981  for (const auto& query_info : query_infos) {
982  CHECK(!query_info.info.fragments.empty());
983  auto it = std::max_element(query_info.info.fragments.begin(),
984  query_info.info.fragments.end(),
985  [](const FragmentInfo& f1, const FragmentInfo& f2) {
986  return f1.getNumTuples() < f2.getNumTuples();
987  });
988  max_groups_buffer_entry_guess *= it->getNumTuples();
989  }
990  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
991  // baseline group layout with that many entries anyway.
992  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
993  try {
994  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
995  max_groups_buffer_entry_guess_cap);
996  } catch (...) {
997  return max_groups_buffer_entry_guess_cap;
998  }
999 }
1000 
1001 std::string get_table_name(const InputDescriptor& input_desc,
1002  const Catalog_Namespace::Catalog& cat) {
1003  const auto source_type = input_desc.getSourceType();
1004  if (source_type == InputSourceType::TABLE) {
1005  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1006  CHECK(td);
1007  return td->tableName;
1008  } else {
1009  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1010  }
1011 }
1012 
1013 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1014  const int device_count) {
1015  if (device_type == ExecutorDeviceType::GPU) {
1016  return device_count * Executor::high_scan_limit;
1017  }
1019 }
1020 
1022  const std::vector<InputTableInfo>& table_infos,
1023  const Catalog_Namespace::Catalog& cat,
1024  const ExecutorDeviceType device_type,
1025  const int device_count) {
1026  for (const auto target_expr : ra_exe_unit.target_exprs) {
1027  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1028  return;
1029  }
1030  }
1031  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1032  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1033  // Allow a query with no scan limit to run on small tables
1034  return;
1035  }
1036  if (ra_exe_unit.use_bump_allocator) {
1037  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1038  // relative to the size of the input), so we bypass this check for now
1039  return;
1040  }
1041  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1042  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1043  (!ra_exe_unit.scan_limit ||
1044  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1045  std::vector<std::string> table_names;
1046  const auto& input_descs = ra_exe_unit.input_descs;
1047  for (const auto& input_desc : input_descs) {
1048  table_names.push_back(get_table_name(input_desc, cat));
1049  }
1050  if (!ra_exe_unit.scan_limit) {
1051  throw WatchdogException(
1052  "Projection query would require a scan without a limit on table(s): " +
1053  boost::algorithm::join(table_names, ", "));
1054  } else {
1055  throw WatchdogException(
1056  "Projection query output result set on table(s): " +
1057  boost::algorithm::join(table_names, ", ") + " would contain " +
1058  std::to_string(ra_exe_unit.scan_limit) +
1059  " rows, which is more than the current system limit of " +
1060  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1061  }
1062  }
1063 }
1064 
1065 } // namespace
1066 
1067 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1068  const RelAlgExecutionUnit& ra_exe_unit) {
1069  if (ra_exe_unit.input_descs.size() < 2) {
1070  return false;
1071  }
1072 
1073  // We only support loop join at the end of folded joins
1074  // where ra_exe_unit.input_descs.size() > 2 for now.
1075  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1076 
1077  ssize_t inner_table_idx = -1;
1078  for (size_t i = 0; i < query_infos.size(); ++i) {
1079  if (query_infos[i].table_id == inner_table_id) {
1080  inner_table_idx = i;
1081  break;
1082  }
1083  }
1084  CHECK_NE(ssize_t(-1), inner_table_idx);
1085  return query_infos[inner_table_idx].info.getNumTuples() <=
1087 }
1088 
1089 namespace {
1090 
1092  const size_t new_scan_limit) {
1093  return {ra_exe_unit_in.input_descs,
1094  ra_exe_unit_in.input_col_descs,
1095  ra_exe_unit_in.simple_quals,
1096  ra_exe_unit_in.quals,
1097  ra_exe_unit_in.join_quals,
1098  ra_exe_unit_in.groupby_exprs,
1099  ra_exe_unit_in.target_exprs,
1100  ra_exe_unit_in.estimator,
1101  ra_exe_unit_in.sort_info,
1102  new_scan_limit,
1103  ra_exe_unit_in.query_features,
1104  ra_exe_unit_in.use_bump_allocator};
1105 }
1106 
1107 } // namespace
1108 
1110  size_t& max_groups_buffer_entry_guess,
1111  const bool is_agg,
1112  const std::vector<InputTableInfo>& query_infos,
1113  const RelAlgExecutionUnit& ra_exe_unit_in,
1114  const CompilationOptions& co,
1115  const ExecutionOptions& eo,
1116  const Catalog_Namespace::Catalog& cat,
1117  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1118  RenderInfo* render_info,
1119  const bool has_cardinality_estimation,
1120  ColumnCacheMap& column_cache) {
1121  try {
1122  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1123  is_agg,
1124  true,
1125  query_infos,
1126  ra_exe_unit_in,
1127  co,
1128  eo,
1129  cat,
1130  row_set_mem_owner,
1131  render_info,
1132  has_cardinality_estimation,
1133  column_cache);
1134  } catch (const CompilationRetryNewScanLimit& e) {
1135  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1136  is_agg,
1137  false,
1138  query_infos,
1139  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1140  co,
1141  eo,
1142  cat,
1143  row_set_mem_owner,
1144  render_info,
1145  has_cardinality_estimation,
1146  column_cache);
1147  }
1148 }
1149 
1151  size_t& max_groups_buffer_entry_guess,
1152  const bool is_agg,
1153  const bool allow_single_frag_table_opt,
1154  const std::vector<InputTableInfo>& query_infos,
1155  const RelAlgExecutionUnit& ra_exe_unit_in,
1156  const CompilationOptions& co,
1157  const ExecutionOptions& eo,
1158  const Catalog_Namespace::Catalog& cat,
1159  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1160  RenderInfo* render_info,
1161  const bool has_cardinality_estimation,
1162  ColumnCacheMap& column_cache) {
1163  INJECT_TIMER(Exec_executeWorkUnit);
1164  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1165  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type_);
1166  CHECK(!query_infos.empty());
1167  if (!max_groups_buffer_entry_guess) {
1168  // The query has failed the first execution attempt because of running out
1169  // of group by slots. Make the conservative choice: allocate fragment size
1170  // slots and run on the CPU.
1171  CHECK(device_type == ExecutorDeviceType::CPU);
1172  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1173  }
1174 
1175  int8_t crt_min_byte_width{get_min_byte_width()};
1176  do {
1177  ExecutionDispatch execution_dispatch(
1178  this, ra_exe_unit, query_infos, cat, row_set_mem_owner, render_info);
1179  ColumnFetcher column_fetcher(this, column_cache);
1180  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1181  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1182  try {
1183  INJECT_TIMER(execution_dispatch_comp);
1184  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1185  execution_dispatch.compile(max_groups_buffer_entry_guess,
1186  crt_min_byte_width,
1187  {device_type,
1188  co.hoist_literals_,
1189  co.opt_level_,
1191  co.explain_type_,
1193  eo,
1194  column_fetcher,
1195  has_cardinality_estimation);
1196  CHECK(query_comp_desc_owned);
1197  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1198  } catch (CompilationRetryNoCompaction&) {
1199  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1200  continue;
1201  }
1202  if (eo.just_explain) {
1203  return executeExplain(*query_comp_desc_owned);
1204  }
1205 
1206  for (const auto target_expr : ra_exe_unit.target_exprs) {
1207  plan_state_->target_exprs_.push_back(target_expr);
1208  }
1209 
1210  auto dispatch = [&execution_dispatch,
1211  &column_fetcher,
1212  &eo,
1213  parent_thread_id = std::this_thread::get_id()](
1214  const ExecutorDeviceType chosen_device_type,
1215  int chosen_device_id,
1216  const QueryCompilationDescriptor& query_comp_desc,
1218  const FragmentsList& frag_list,
1219  const ExecutorDispatchMode kernel_dispatch_mode,
1220  const int64_t rowid_lookup_key) {
1221  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1222  INJECT_TIMER(execution_dispatch_run);
1223  execution_dispatch.run(chosen_device_type,
1224  chosen_device_id,
1225  eo,
1226  column_fetcher,
1227  query_comp_desc,
1228  query_mem_desc,
1229  frag_list,
1230  kernel_dispatch_mode,
1231  rowid_lookup_key);
1232  };
1233 
1234  QueryFragmentDescriptor fragment_descriptor(
1235  ra_exe_unit,
1236  query_infos,
1237  query_comp_desc_owned->getDeviceType() == ExecutorDeviceType::GPU
1239  : std::vector<Data_Namespace::MemoryInfo>{},
1240  eo.gpu_input_mem_limit_percent);
1241 
1242  if (!eo.just_validate) {
1243  int available_cpus = cpu_threads();
1244  auto available_gpus = get_available_gpus(cat);
1245 
1246  const auto context_count =
1247  get_context_count(device_type, available_cpus, available_gpus.size());
1248  try {
1249  dispatchFragments(dispatch,
1250  execution_dispatch,
1251  query_infos,
1252  eo,
1253  is_agg,
1254  allow_single_frag_table_opt,
1255  context_count,
1256  *query_comp_desc_owned,
1257  *query_mem_desc_owned,
1258  fragment_descriptor,
1259  available_gpus,
1260  available_cpus);
1261  } catch (QueryExecutionError& e) {
1262  if (eo.with_dynamic_watchdog && interrupted_ &&
1263  e.getErrorCode() == ERR_OUT_OF_TIME) {
1264  throw QueryExecutionError(ERR_INTERRUPTED);
1265  }
1266  cat.getDataMgr().freeAllBuffers();
1267  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1268  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1269  crt_min_byte_width <<= 1;
1270  continue;
1271  }
1272  throw;
1273  }
1274  }
1275  cat.getDataMgr().freeAllBuffers();
1276  if (is_agg) {
1277  try {
1278  return collectAllDeviceResults(execution_dispatch,
1279  ra_exe_unit.target_exprs,
1280  *query_mem_desc_owned,
1281  query_comp_desc_owned->getDeviceType(),
1282  row_set_mem_owner);
1283  } catch (ReductionRanOutOfSlots&) {
1284  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1285  } catch (OverflowOrUnderflow&) {
1286  crt_min_byte_width <<= 1;
1287  continue;
1288  }
1289  }
1290  return resultsUnion(execution_dispatch);
1291 
1292  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1293 
1294  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1297  nullptr,
1298  this);
1299 }
1300 
1302  const InputTableInfo& table_info,
1303  const CompilationOptions& co,
1304  const ExecutionOptions& eo,
1305  const Catalog_Namespace::Catalog& cat,
1306  PerFragmentCallBack& cb) {
1307  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1308  ColumnCacheMap column_cache;
1309 
1310  std::vector<InputTableInfo> table_infos{table_info};
1311  // TODO(adb): ensure this is under a try / catch
1312  ExecutionDispatch execution_dispatch(
1313  this, ra_exe_unit, table_infos, cat, row_set_mem_owner_, nullptr);
1314  ColumnFetcher column_fetcher(this, column_cache);
1315  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1316  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1317  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1318  execution_dispatch.compile(0, 8, co, eo, column_fetcher, false);
1319  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1320  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1321  const auto& outer_fragments = table_info.info.fragments;
1322  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1323  ++fragment_index) {
1324  // We may want to consider in the future allowing this to execute on devices other
1325  // than CPU
1326  execution_dispatch.run(co.device_type_,
1327  0,
1328  eo,
1329  column_fetcher,
1330  *query_comp_desc_owned,
1331  *query_mem_desc_owned,
1332  {{table_id, {fragment_index}}},
1334  -1);
1335  }
1336 
1337  const auto& all_fragment_results = execution_dispatch.getFragmentResults();
1338 
1339  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1340  ++fragment_index) {
1341  const auto fragment_results = all_fragment_results[fragment_index];
1342  cb(fragment_results.first, outer_fragments[fragment_index]);
1343  }
1344 }
1345 
1347  const TableFunctionExecutionUnit exe_unit,
1348  const std::vector<InputTableInfo>& table_infos,
1349  const CompilationOptions& co,
1350  const ExecutionOptions& eo,
1351  const Catalog_Namespace::Catalog& cat) {
1352  INJECT_TIMER(Exec_executeTableFunction);
1353  nukeOldState(false, table_infos, nullptr);
1354 
1355  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1356  // framework, we may want to move this up a level
1357 
1358  ColumnFetcher column_fetcher(this, column_cache);
1359  TableFunctionCompilationContext compilation_context;
1360  compilation_context.compile(exe_unit, co, this);
1361 
1362  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1363  CHECK_EQ(table_infos.size(), size_t(1));
1364  return exe_context.execute(exe_unit,
1365  table_infos.front(),
1366  &compilation_context,
1367  column_fetcher,
1368  co.device_type_,
1369  this);
1370 }
1371 
1373  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1374 }
1375 
1376 // Looks at the targets and returns a feasible device type. We only punt
1377 // to CPU for count distinct and we should probably fix it and remove this.
1379  const RelAlgExecutionUnit& ra_exe_unit,
1380  const ExecutorDeviceType requested_device_type) {
1381  for (const auto target_expr : ra_exe_unit.target_exprs) {
1382  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1383  if (!ra_exe_unit.groupby_exprs.empty() &&
1384  !isArchPascalOrLater(requested_device_type)) {
1385  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1386  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1387  return ExecutorDeviceType::CPU;
1388  }
1389  }
1390  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1391  return ExecutorDeviceType::CPU;
1392  }
1393  }
1394  return requested_device_type;
1395 }
1396 
1397 namespace {
1398 
1399 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1400  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1401  if (ti.is_fp()) {
1402  if (float_argument_input && ti.get_type() == kFLOAT) {
1403  int64_t float_null_val = 0;
1404  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1405  static_cast<float>(inline_fp_null_val(ti));
1406  return float_null_val;
1407  }
1408  const auto double_null_val = inline_fp_null_val(ti);
1409  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1410  }
1411  return inline_int_null_val(ti);
1412 }
1413 
1414 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1415  std::vector<int64_t>& entry,
1416  const std::vector<Analyzer::Expr*>& target_exprs,
1418  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1419  const auto target_expr = target_exprs[target_idx];
1420  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1421  CHECK(agg_info.is_agg);
1422  target_infos.push_back(agg_info);
1423  if (g_cluster) {
1424  const auto executor = query_mem_desc.getExecutor();
1425  CHECK(executor);
1426  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1427  CHECK(row_set_mem_owner);
1428  const auto& count_distinct_desc =
1429  query_mem_desc.getCountDistinctDescriptor(target_idx);
1430  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1431  auto count_distinct_buffer = static_cast<int8_t*>(
1432  checked_calloc(count_distinct_desc.bitmapPaddedSizeBytes(), 1));
1433  CHECK(row_set_mem_owner);
1434  row_set_mem_owner->addCountDistinctBuffer(
1435  count_distinct_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), true);
1436  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1437  continue;
1438  }
1439  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1440  auto count_distinct_set = new std::set<int64_t>();
1441  CHECK(row_set_mem_owner);
1442  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1443  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1444  continue;
1445  }
1446  }
1447  const bool float_argument_input = takes_float_argument(agg_info);
1448  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1449  entry.push_back(0);
1450  } else if (agg_info.agg_kind == kAVG) {
1451  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1452  entry.push_back(0);
1453  } else if (agg_info.agg_kind == kSAMPLE) {
1454  if (agg_info.sql_type.is_geometry()) {
1455  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1456  entry.push_back(0);
1457  }
1458  } else if (agg_info.sql_type.is_varlen()) {
1459  entry.push_back(0);
1460  entry.push_back(0);
1461  } else {
1462  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1463  }
1464  } else {
1465  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1466  }
1467  }
1468 }
1469 
1471  const std::vector<Analyzer::Expr*>& target_exprs_in,
1473  const ExecutorDeviceType device_type) {
1474  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1475  std::vector<Analyzer::Expr*> target_exprs;
1476  for (const auto target_expr : target_exprs_in) {
1477  const auto target_expr_copy =
1478  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1479  CHECK(target_expr_copy);
1480  auto ti = target_expr->get_type_info();
1481  ti.set_notnull(false);
1482  target_expr_copy->set_type_info(ti);
1483  if (target_expr_copy->get_arg()) {
1484  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1485  arg_ti.set_notnull(false);
1486  target_expr_copy->get_arg()->set_type_info(arg_ti);
1487  }
1488  target_exprs_owned_copies.push_back(target_expr_copy);
1489  target_exprs.push_back(target_expr_copy.get());
1490  }
1491  std::vector<TargetInfo> target_infos;
1492  std::vector<int64_t> entry;
1493  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1494  const auto executor = query_mem_desc.getExecutor();
1495  CHECK(executor);
1496  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1497  CHECK(row_set_mem_owner);
1498  auto rs = std::make_shared<ResultSet>(
1499  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1500  rs->allocateStorage();
1501  rs->fillOneEntry(entry);
1502  return rs;
1503 }
1504 
1505 } // namespace
1506 
1508  ExecutionDispatch& execution_dispatch,
1509  const std::vector<Analyzer::Expr*>& target_exprs,
1511  const ExecutorDeviceType device_type,
1512  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1513  auto timer = DEBUG_TIMER(__func__);
1514  auto& result_per_device = execution_dispatch.getFragmentResults();
1515  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1517  return build_row_for_empty_input(target_exprs, query_mem_desc, device_type);
1518  }
1519  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1520  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1521  return reduceSpeculativeTopN(
1522  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1523  }
1524  const auto shard_count =
1525  device_type == ExecutorDeviceType::GPU
1527  : 0;
1528 
1529  if (shard_count && !result_per_device.empty()) {
1530  return collectAllDeviceShardedTopResults(execution_dispatch);
1531  }
1532  return reduceMultiDeviceResults(
1533  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1534 }
1535 
1536 namespace {
1547 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1548  const QueryMemoryDescriptor& input_query_mem_desc,
1549  const ResultSetStorage* output_storage,
1550  size_t output_row_index,
1551  const QueryMemoryDescriptor& output_query_mem_desc,
1552  const std::vector<uint32_t>& top_permutation) {
1553  const auto output_buffer = output_storage->getUnderlyingBuffer();
1554  const auto input_buffer = input_storage->getUnderlyingBuffer();
1555  for (const auto sorted_idx : top_permutation) {
1556  // permuting all group-columns in this result set into the final buffer:
1557  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1558  group_idx++) {
1559  const auto input_column_ptr =
1560  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1561  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1562  const auto output_column_ptr =
1563  output_buffer +
1564  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1565  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1566  memcpy(output_column_ptr,
1567  input_column_ptr,
1568  output_query_mem_desc.groupColWidth(group_idx));
1569  }
1570  // permuting all agg-columns in this result set into the final buffer:
1571  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1572  slot_idx++) {
1573  const auto input_column_ptr =
1574  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1575  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1576  const auto output_column_ptr =
1577  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1578  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1579  memcpy(output_column_ptr,
1580  input_column_ptr,
1581  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1582  }
1583  ++output_row_index;
1584  }
1585  return output_row_index;
1586 }
1587 
1597 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1598  const ResultSetStorage* output_storage,
1599  size_t output_row_index,
1600  const QueryMemoryDescriptor& output_query_mem_desc,
1601  const std::vector<uint32_t>& top_permutation) {
1602  const auto output_buffer = output_storage->getUnderlyingBuffer();
1603  const auto input_buffer = input_storage->getUnderlyingBuffer();
1604  for (const auto sorted_idx : top_permutation) {
1605  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1606  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1607  row_ptr,
1608  output_query_mem_desc.getRowSize());
1609  ++output_row_index;
1610  }
1611  return output_row_index;
1612 }
1613 } // namespace
1614 
1615 // Collect top results from each device, stitch them together and sort. Partial
1616 // results from each device are guaranteed to be disjunct because we only go on
1617 // this path when one of the columns involved is a shard key.
1619  ExecutionDispatch& execution_dispatch) const {
1620  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1621  auto& result_per_device = execution_dispatch.getFragmentResults();
1622  const auto first_result_set = result_per_device.front().first;
1623  CHECK(first_result_set);
1624  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1625  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1626  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1627  top_query_mem_desc.setEntryCount(0);
1628  for (auto& result : result_per_device) {
1629  const auto result_set = result.first;
1630  CHECK(result_set);
1631  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1632  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1633  top_query_mem_desc.setEntryCount(new_entry_cnt);
1634  }
1635  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1636  first_result_set->getDeviceType(),
1637  top_query_mem_desc,
1638  first_result_set->getRowSetMemOwner(),
1639  this);
1640  auto top_storage = top_result_set->allocateStorage();
1641  size_t top_output_row_idx{0};
1642  for (auto& result : result_per_device) {
1643  const auto result_set = result.first;
1644  CHECK(result_set);
1645  const auto& top_permutation = result_set->getPermutationBuffer();
1646  CHECK_LE(top_permutation.size(), top_n);
1647  if (top_query_mem_desc.didOutputColumnar()) {
1648  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1649  result_set->getQueryMemDesc(),
1650  top_storage,
1651  top_output_row_idx,
1652  top_query_mem_desc,
1653  top_permutation);
1654  } else {
1655  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1656  top_storage,
1657  top_output_row_idx,
1658  top_query_mem_desc,
1659  top_permutation);
1660  }
1661  }
1662  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1663  return top_result_set;
1664 }
1665 
1666 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1667  const {
1668  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1669  const auto& join_info = plan_state_->join_info_;
1670  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1671  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1672  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1673  id_to_cond.insert(
1674  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1675  }
1676  return id_to_cond;
1677 }
1678 
1679 namespace {
1680 
1681 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1682  for (const auto& col : fetched_cols) {
1683  if (col.is_lazily_fetched) {
1684  return true;
1685  }
1686  }
1687  return false;
1688 }
1689 
1690 } // namespace
1691 
1693  const std::function<void(const ExecutorDeviceType chosen_device_type,
1694  int chosen_device_id,
1695  const QueryCompilationDescriptor& query_comp_desc,
1697  const FragmentsList& frag_list,
1698  const ExecutorDispatchMode kernel_dispatch_mode,
1699  const int64_t rowid_lookup_key)> dispatch,
1700  const ExecutionDispatch& execution_dispatch,
1701  const std::vector<InputTableInfo>& table_infos,
1702  const ExecutionOptions& eo,
1703  const bool is_agg,
1704  const bool allow_single_frag_table_opt,
1705  const size_t context_count,
1706  const QueryCompilationDescriptor& query_comp_desc,
1707  const QueryMemoryDescriptor& query_mem_desc,
1708  QueryFragmentDescriptor& fragment_descriptor,
1709  std::unordered_set<int>& available_gpus,
1710  int& available_cpus) {
1711  std::vector<std::future<void>> query_threads;
1712  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1713  CHECK(!ra_exe_unit.input_descs.empty());
1714 
1715  const auto device_type = query_comp_desc.getDeviceType();
1716  const bool uses_lazy_fetch =
1717  plan_state_->allow_lazy_fetch_ &&
1718  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
1719  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1720  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1721 
1722  const auto device_count = deviceCount(device_type);
1723  CHECK_GT(device_count, 0);
1724 
1725  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1726  execution_dispatch.getFragOffsets(),
1727  device_count,
1728  device_type,
1729  use_multifrag_kernel,
1731  this);
1732  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1733  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1734  }
1735 
1736  if (use_multifrag_kernel) {
1737  VLOG(1) << "Dispatching multifrag kernels";
1738  VLOG(1) << query_mem_desc.toString();
1739 
1740  // NB: We should never be on this path when the query is retried because of running
1741  // out of group by slots; also, for scan only queries on CPU we want the
1742  // high-granularity, fragment by fragment execution instead. For scan only queries on
1743  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1744  // buffer per fragment.
1745  auto multifrag_kernel_dispatch =
1746  [&query_threads, &dispatch, query_comp_desc, query_mem_desc](
1747  const int device_id,
1748  const FragmentsList& frag_list,
1749  const int64_t rowid_lookup_key) {
1750  query_threads.push_back(std::async(std::launch::async,
1751  dispatch,
1753  device_id,
1754  query_comp_desc,
1755  query_mem_desc,
1756  frag_list,
1758  rowid_lookup_key));
1759  };
1760  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
1761  } else {
1762  VLOG(1) << "Dispatching kernel per fragment";
1763  VLOG(1) << query_mem_desc.toString();
1764 
1765  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
1767  table_infos.size() == 1 && table_infos.front().table_id > 0) {
1768  const auto max_frag_size =
1769  table_infos.front().info.getFragmentNumTuplesUpperBound();
1770  if (max_frag_size < query_mem_desc.getEntryCount()) {
1771  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
1772  << " to match max fragment size " << max_frag_size
1773  << " for kernel per fragment execution path.";
1774  throw CompilationRetryNewScanLimit(max_frag_size);
1775  }
1776  }
1777 
1778  size_t frag_list_idx{0};
1779  auto fragment_per_kernel_dispatch = [&query_threads,
1780  &dispatch,
1781  &frag_list_idx,
1782  &device_type,
1783  query_comp_desc,
1784  query_mem_desc](const int device_id,
1785  const FragmentsList& frag_list,
1786  const int64_t rowid_lookup_key) {
1787  if (!frag_list.size()) {
1788  return;
1789  }
1790  CHECK_GE(device_id, 0);
1791 
1792  query_threads.push_back(std::async(std::launch::async,
1793  dispatch,
1794  device_type,
1795  device_id,
1796  query_comp_desc,
1797  query_mem_desc,
1798  frag_list,
1800  rowid_lookup_key));
1801 
1802  ++frag_list_idx;
1803  };
1804 
1805  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
1806  ra_exe_unit);
1807  }
1808  for (auto& child : query_threads) {
1809  child.wait();
1810  }
1811  for (auto& child : query_threads) {
1812  child.get();
1813  }
1814 }
1815 
1817  const RelAlgExecutionUnit& ra_exe_unit,
1818  const ExecutorDeviceType device_type,
1819  const size_t table_idx,
1820  const size_t outer_frag_idx,
1821  std::map<int, const TableFragments*>& selected_tables_fragments,
1822  const std::unordered_map<int, const Analyzer::BinOper*>&
1823  inner_table_id_to_join_condition) {
1824  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1825  auto table_frags_it = selected_tables_fragments.find(table_id);
1826  CHECK(table_frags_it != selected_tables_fragments.end());
1827  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
1828  const auto outer_table_fragments_it =
1829  selected_tables_fragments.find(outer_input_desc.getTableId());
1830  const auto outer_table_fragments = outer_table_fragments_it->second;
1831  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
1832  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
1833  if (!table_idx) {
1834  return {outer_frag_idx};
1835  }
1836  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
1837  auto& inner_frags = table_frags_it->second;
1838  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
1839  std::vector<size_t> all_frag_ids;
1840  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
1841  ++inner_frag_idx) {
1842  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
1843  if (skipFragmentPair(outer_fragment_info,
1844  inner_frag_info,
1845  table_idx,
1846  inner_table_id_to_join_condition,
1847  ra_exe_unit,
1848  device_type)) {
1849  continue;
1850  }
1851  all_frag_ids.push_back(inner_frag_idx);
1852  }
1853  return all_frag_ids;
1854 }
1855 
1856 // Returns true iff the join between two fragments cannot yield any results, per
1857 // shard information. The pair can be skipped to avoid full broadcast.
1859  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
1860  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
1861  const int table_idx,
1862  const std::unordered_map<int, const Analyzer::BinOper*>&
1863  inner_table_id_to_join_condition,
1864  const RelAlgExecutionUnit& ra_exe_unit,
1865  const ExecutorDeviceType device_type) {
1866  if (device_type != ExecutorDeviceType::GPU) {
1867  return false;
1868  }
1869  CHECK(table_idx >= 0 &&
1870  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
1871  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1872  // Both tables need to be sharded the same way.
1873  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
1874  outer_fragment_info.shard == inner_fragment_info.shard) {
1875  return false;
1876  }
1877  const Analyzer::BinOper* join_condition{nullptr};
1878  if (ra_exe_unit.join_quals.empty()) {
1879  CHECK(!inner_table_id_to_join_condition.empty());
1880  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
1881  CHECK(condition_it != inner_table_id_to_join_condition.end());
1882  join_condition = condition_it->second;
1883  CHECK(join_condition);
1884  } else {
1885  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
1886  plan_state_->join_info_.join_hash_tables_.size());
1887  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
1888  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
1889  table_idx) {
1890  CHECK(!join_condition);
1891  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
1892  }
1893  }
1894  }
1895  if (!join_condition) {
1896  return false;
1897  }
1898  // TODO(adb): support fragment skipping based on the overlaps operator
1899  if (join_condition->is_overlaps_oper()) {
1900  return false;
1901  }
1902  size_t shard_count{0};
1903  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
1904  join_condition->get_left_operand())) {
1905  auto inner_outer_pairs =
1906  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
1908  join_condition, this, inner_outer_pairs);
1909  } else {
1910  shard_count = get_shard_count(join_condition, this);
1911  }
1912  if (shard_count && !ra_exe_unit.join_quals.empty()) {
1913  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
1914  }
1915  return shard_count;
1916 }
1917 
1918 namespace {
1919 
1921  const Catalog_Namespace::Catalog& cat) {
1922  const int table_id = col_desc->getScanDesc().getTableId();
1923  const int col_id = col_desc->getColId();
1924  return get_column_descriptor_maybe(col_id, table_id, cat);
1925 }
1926 
1927 } // namespace
1928 
1929 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
1930  const std::vector<InputDescriptor>& input_descs,
1931  const std::map<int, const TableFragments*>& all_tables_fragments) {
1932  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
1933  for (auto& desc : input_descs) {
1934  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
1935  CHECK(fragments_it != all_tables_fragments.end());
1936  const auto& fragments = *fragments_it->second;
1937  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
1938  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
1939  frag_offsets[i] = off;
1940  off += fragments[i].getNumTuples();
1941  }
1942  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
1943  }
1944  return tab_id_to_frag_offsets;
1945 }
1946 
1947 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
1949  const RelAlgExecutionUnit& ra_exe_unit,
1950  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
1951  const std::vector<InputDescriptor>& input_descs,
1952  const std::map<int, const TableFragments*>& all_tables_fragments) {
1953  std::vector<std::vector<int64_t>> all_num_rows;
1954  std::vector<std::vector<uint64_t>> all_frag_offsets;
1955  const auto tab_id_to_frag_offsets =
1956  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
1957  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
1958  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
1959  std::vector<int64_t> num_rows;
1960  std::vector<uint64_t> frag_offsets;
1961  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
1962  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
1963  const auto frag_id = selected_frag_ids[tab_idx];
1964  const auto fragments_it =
1965  all_tables_fragments.find(input_descs[tab_idx].getTableId());
1966  CHECK(fragments_it != all_tables_fragments.end());
1967  const auto& fragments = *fragments_it->second;
1968  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
1969  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
1970  const auto& fragment = fragments[frag_id];
1971  num_rows.push_back(fragment.getNumTuples());
1972  } else {
1973  size_t total_row_count{0};
1974  for (const auto& fragment : fragments) {
1975  total_row_count += fragment.getNumTuples();
1976  }
1977  num_rows.push_back(total_row_count);
1978  }
1979  const auto frag_offsets_it =
1980  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
1981  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
1982  const auto& offsets = frag_offsets_it->second;
1983  CHECK_LT(frag_id, offsets.size());
1984  frag_offsets.push_back(offsets[frag_id]);
1985  }
1986  all_num_rows.push_back(num_rows);
1987  // Fragment offsets of outer table should be ONLY used by rowid for now.
1988  all_frag_offsets.push_back(frag_offsets);
1989  }
1990  return {all_num_rows, all_frag_offsets};
1991 }
1992 
1993 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
1994 // all the table fragments.
1996  const RelAlgExecutionUnit& ra_exe_unit,
1997  const FragmentsList& selected_fragments) const {
1998  const auto& input_descs = ra_exe_unit.input_descs;
1999  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2000  if (nest_level < 1 ||
2001  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2002  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2003  (ra_exe_unit.join_quals.empty() &&
2004  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2005  return false;
2006  }
2007  const int table_id = inner_col_desc.getScanDesc().getTableId();
2008  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2009  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2010  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2011  return fragments.size() > 1;
2012 }
2013 
2015  const ColumnFetcher& column_fetcher,
2016  const RelAlgExecutionUnit& ra_exe_unit,
2017  const int device_id,
2018  const Data_Namespace::MemoryLevel memory_level,
2019  const std::map<int, const TableFragments*>& all_tables_fragments,
2020  const FragmentsList& selected_fragments,
2021  const Catalog_Namespace::Catalog& cat,
2022  std::list<ChunkIter>& chunk_iterators,
2023  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
2024  auto timer = DEBUG_TIMER(__func__);
2025  INJECT_TIMER(fetchChunks);
2026  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2027  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2028  std::vector<size_t> local_col_to_frag_pos;
2029  buildSelectedFragsMapping(selected_fragments_crossjoin,
2030  local_col_to_frag_pos,
2031  col_global_ids,
2032  selected_fragments,
2033  ra_exe_unit);
2034 
2036  selected_fragments_crossjoin);
2037 
2038  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2039  std::vector<std::vector<int64_t>> all_num_rows;
2040  std::vector<std::vector<uint64_t>> all_frag_offsets;
2041 
2042  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2043  std::vector<const int8_t*> frag_col_buffers(
2044  plan_state_->global_to_local_col_ids_.size());
2045  for (const auto& col_id : col_global_ids) {
2046  CHECK(col_id);
2047  const int table_id = col_id->getScanDesc().getTableId();
2048  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2049  if (cd && cd->isVirtualCol) {
2050  CHECK_EQ("rowid", cd->columnName);
2051  continue;
2052  }
2053  const auto fragments_it = all_tables_fragments.find(table_id);
2054  CHECK(fragments_it != all_tables_fragments.end());
2055  const auto fragments = fragments_it->second;
2056  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2057  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2058  CHECK_LT(static_cast<size_t>(it->second),
2059  plan_state_->global_to_local_col_ids_.size());
2060  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2061  if (!fragments->size()) {
2062  return {};
2063  }
2064  CHECK_LT(frag_id, fragments->size());
2065  auto memory_level_for_column = memory_level;
2066  if (plan_state_->columns_to_fetch_.find(
2067  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2068  plan_state_->columns_to_fetch_.end()) {
2069  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2070  }
2071  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2072  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2073  col_id.get(), memory_level_for_column, device_id);
2074  } else {
2075  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2076  frag_col_buffers[it->second] =
2077  column_fetcher.getAllTableColumnFragments(table_id,
2078  col_id->getColId(),
2079  all_tables_fragments,
2080  memory_level_for_column,
2081  device_id);
2082  } else {
2083  frag_col_buffers[it->second] =
2084  column_fetcher.getOneTableColumnFragment(table_id,
2085  frag_id,
2086  col_id->getColId(),
2087  all_tables_fragments,
2088  chunks,
2089  chunk_iterators,
2090  memory_level_for_column,
2091  device_id);
2092  }
2093  }
2094  }
2095  all_frag_col_buffers.push_back(frag_col_buffers);
2096  }
2097  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2098  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2099  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2100 }
2101 
2102 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2103  const size_t scan_idx,
2104  const RelAlgExecutionUnit& ra_exe_unit) {
2105  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2106  scan_idx > 0 &&
2107  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2108  !selected_fragments[scan_idx].fragment_ids.empty()) {
2109  // Fetch all fragments
2110  return {size_t(0)};
2111  }
2112 
2113  return selected_fragments[scan_idx].fragment_ids;
2114 }
2115 
2117  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2118  std::vector<size_t>& local_col_to_frag_pos,
2119  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2120  const FragmentsList& selected_fragments,
2121  const RelAlgExecutionUnit& ra_exe_unit) {
2122  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2123  size_t frag_pos{0};
2124  const auto& input_descs = ra_exe_unit.input_descs;
2125  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2126  const int table_id = input_descs[scan_idx].getTableId();
2127  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2128  selected_fragments_crossjoin.push_back(
2129  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2130  for (const auto& col_id : col_global_ids) {
2131  CHECK(col_id);
2132  const auto& input_desc = col_id->getScanDesc();
2133  if (input_desc.getTableId() != table_id ||
2134  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2135  continue;
2136  }
2137  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2138  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2139  CHECK_LT(static_cast<size_t>(it->second),
2140  plan_state_->global_to_local_col_ids_.size());
2141  local_col_to_frag_pos[it->second] = frag_pos;
2142  }
2143  ++frag_pos;
2144  }
2145 }
2146 
2147 namespace {
2148 
2150  public:
2151  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2153  for (auto out : out_vec_) {
2154  delete[] out;
2155  }
2156  }
2157 
2158  private:
2159  std::vector<int64_t*> out_vec_;
2160 };
2161 } // namespace
2162 
2163 template <typename META_TYPE_CLASS>
2165  public:
2166  using ReturnType = void;
2167 
2168  // TODO: Avoid parameter struct indirection and forward directly
2169  ReturnType operator()(int const entry_count,
2170  int& error_code,
2171  TargetInfo const& agg_info,
2172  size_t& out_vec_idx,
2173  std::vector<int64_t*>& out_vec,
2174  std::vector<int64_t>& reduced_outs,
2175  QueryExecutionContext* query_exe_context) {
2176  int64_t val1;
2177  const bool float_argument_input = takes_float_argument(agg_info);
2178  if (is_distinct_target(agg_info)) {
2179  CHECK(agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2180  val1 = out_vec[out_vec_idx][0];
2181  error_code = 0;
2182  } else {
2183  const auto chosen_bytes = static_cast<size_t>(
2184  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2185  std::tie(val1, error_code) =
2187  agg_info.sql_type,
2188  query_exe_context->getAggInitValForIndex(out_vec_idx),
2189  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2190  out_vec[out_vec_idx],
2191  entry_count,
2192  false,
2193  float_argument_input);
2194  }
2195  if (error_code) {
2196  return;
2197  }
2198  reduced_outs.push_back(val1);
2199  if (agg_info.agg_kind == kAVG ||
2200  (agg_info.agg_kind == kSAMPLE &&
2201  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2202  const auto chosen_bytes = static_cast<size_t>(
2203  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx + 1));
2204  int64_t val2;
2205  std::tie(val2, error_code) = Executor::reduceResults(
2206  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2207  agg_info.sql_type,
2208  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2209  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2210  out_vec[out_vec_idx + 1],
2211  entry_count,
2212  false,
2213  false);
2214  if (error_code) {
2215  return;
2216  }
2217  reduced_outs.push_back(val2);
2218  ++out_vec_idx;
2219  }
2220  ++out_vec_idx;
2221  }
2222 };
2223 
2224 // Handles reduction for geo-types
2225 template <>
2226 class AggregateReductionEgress<Experimental::MetaTypeClass<Experimental::Geometry>> {
2227  public:
2228  using ReturnType = void;
2229 
2230  ReturnType operator()(int const entry_count,
2231  int& error_code,
2232  TargetInfo const& agg_info,
2233  size_t& out_vec_idx,
2234  std::vector<int64_t*>& out_vec,
2235  std::vector<int64_t>& reduced_outs,
2236  QueryExecutionContext* query_exe_context) {
2237  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2238  int64_t val1;
2239  const auto chosen_bytes = static_cast<size_t>(
2240  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2241  std::tie(val1, error_code) =
2243  agg_info.sql_type,
2244  query_exe_context->getAggInitValForIndex(out_vec_idx),
2245  chosen_bytes,
2246  out_vec[out_vec_idx],
2247  entry_count,
2248  false,
2249  false);
2250  if (error_code) {
2251  return;
2252  }
2253  reduced_outs.push_back(val1);
2254  out_vec_idx++;
2255  }
2256  }
2257 };
2258 
2260  const RelAlgExecutionUnit& ra_exe_unit,
2261  const CompilationResult& compilation_result,
2262  const bool hoist_literals,
2263  ResultSetPtr& results,
2264  const std::vector<Analyzer::Expr*>& target_exprs,
2265  const ExecutorDeviceType device_type,
2266  std::vector<std::vector<const int8_t*>>& col_buffers,
2267  QueryExecutionContext* query_exe_context,
2268  const std::vector<std::vector<int64_t>>& num_rows,
2269  const std::vector<std::vector<uint64_t>>& frag_offsets,
2270  Data_Namespace::DataMgr* data_mgr,
2271  const int device_id,
2272  const uint32_t start_rowid,
2273  const uint32_t num_tables,
2274  RenderInfo* render_info) {
2275  INJECT_TIMER(executePlanWithoutGroupBy);
2276  auto timer = DEBUG_TIMER(__func__);
2277  CHECK(!results);
2278  if (col_buffers.empty()) {
2279  return 0;
2280  }
2281 
2282  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2283  if (render_info) {
2284  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2285  // here, we are in non-insitu mode.
2286  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2287  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2288  "currently unsupported.";
2289  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2290  }
2291 
2292  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2293  std::vector<int64_t*> out_vec;
2294  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2295  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2296  std::unique_ptr<OutVecOwner> output_memory_scope;
2298  return ERR_INTERRUPTED;
2299  }
2300  if (device_type == ExecutorDeviceType::CPU) {
2301  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2302  compilation_result.native_functions,
2303  hoist_literals,
2304  hoist_buf,
2305  col_buffers,
2306  num_rows,
2307  frag_offsets,
2308  0,
2309  &error_code,
2310  num_tables,
2311  join_hash_table_ptrs);
2312  output_memory_scope.reset(new OutVecOwner(out_vec));
2313  } else {
2314  try {
2315  out_vec = query_exe_context->launchGpuCode(ra_exe_unit,
2316  compilation_result.native_functions,
2317  hoist_literals,
2318  hoist_buf,
2319  col_buffers,
2320  num_rows,
2321  frag_offsets,
2322  0,
2323  data_mgr,
2324  blockSize(),
2325  gridSize(),
2326  device_id,
2327  &error_code,
2328  num_tables,
2329  join_hash_table_ptrs,
2330  render_allocator_map_ptr);
2331  output_memory_scope.reset(new OutVecOwner(out_vec));
2332  } catch (const OutOfMemory&) {
2333  return ERR_OUT_OF_GPU_MEM;
2334  } catch (const std::exception& e) {
2335  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2336  }
2337  }
2338  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2339  error_code == Executor::ERR_DIV_BY_ZERO ||
2340  error_code == Executor::ERR_OUT_OF_TIME ||
2341  error_code == Executor::ERR_INTERRUPTED) {
2342  return error_code;
2343  }
2344  if (ra_exe_unit.estimator) {
2345  CHECK(!error_code);
2346  results =
2347  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2348  return 0;
2349  }
2350  std::vector<int64_t> reduced_outs;
2351  const auto num_frags = col_buffers.size();
2352  const size_t entry_count = device_type == ExecutorDeviceType::GPU
2353  ? num_frags * blockSize() * gridSize()
2354  : num_frags;
2355  if (size_t(1) == entry_count) {
2356  for (auto out : out_vec) {
2357  CHECK(out);
2358  reduced_outs.push_back(*out);
2359  }
2360  } else {
2361  size_t out_vec_idx = 0;
2362 
2363  for (const auto target_expr : target_exprs) {
2364  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2365  CHECK(agg_info.is_agg);
2366 
2367  auto meta_class(
2369  auto agg_reduction_impl =
2371  agg_reduction_impl(meta_class,
2372  entry_count,
2373  error_code,
2374  agg_info,
2375  out_vec_idx,
2376  out_vec,
2377  reduced_outs,
2378  query_exe_context);
2379  if (error_code) {
2380  break;
2381  }
2382  }
2383  }
2384 
2385  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2386  auto rows_ptr = std::shared_ptr<ResultSet>(
2387  query_exe_context->query_buffers_->result_sets_[0].release());
2388  rows_ptr->fillOneEntry(reduced_outs);
2389  results = std::move(rows_ptr);
2390  return error_code;
2391 }
2392 
2393 namespace {
2394 
2395 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2396  CHECK(scan_limit);
2397  return results && results->rowCount() < scan_limit;
2398 }
2399 
2400 } // namespace
2401 
2403  const RelAlgExecutionUnit& ra_exe_unit,
2404  const CompilationResult& compilation_result,
2405  const bool hoist_literals,
2406  ResultSetPtr& results,
2407  const ExecutorDeviceType device_type,
2408  std::vector<std::vector<const int8_t*>>& col_buffers,
2409  const std::vector<size_t> outer_tab_frag_ids,
2410  QueryExecutionContext* query_exe_context,
2411  const std::vector<std::vector<int64_t>>& num_rows,
2412  const std::vector<std::vector<uint64_t>>& frag_offsets,
2413  Data_Namespace::DataMgr* data_mgr,
2414  const int device_id,
2415  const int64_t scan_limit,
2416  const uint32_t start_rowid,
2417  const uint32_t num_tables,
2418  RenderInfo* render_info) {
2419  auto timer = DEBUG_TIMER(__func__);
2420  INJECT_TIMER(executePlanWithGroupBy);
2421  CHECK(!results);
2422  if (col_buffers.empty()) {
2423  return 0;
2424  }
2425  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2426  // TODO(alex):
2427  // 1. Optimize size (make keys more compact).
2428  // 2. Resize on overflow.
2429  // 3. Optimize runtime.
2430  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2431  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2432  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2434  return ERR_INTERRUPTED;
2435  }
2436 
2437  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2438  if (render_info && render_info->useCudaBuffers()) {
2439  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2440  }
2441 
2442  if (device_type == ExecutorDeviceType::CPU) {
2443  query_exe_context->launchCpuCode(ra_exe_unit,
2444  compilation_result.native_functions,
2445  hoist_literals,
2446  hoist_buf,
2447  col_buffers,
2448  num_rows,
2449  frag_offsets,
2450  scan_limit,
2451  &error_code,
2452  num_tables,
2453  join_hash_table_ptrs);
2454  } else {
2455  try {
2456  query_exe_context->launchGpuCode(ra_exe_unit,
2457  compilation_result.native_functions,
2458  hoist_literals,
2459  hoist_buf,
2460  col_buffers,
2461  num_rows,
2462  frag_offsets,
2463  scan_limit,
2464  data_mgr,
2465  blockSize(),
2466  gridSize(),
2467  device_id,
2468  &error_code,
2469  num_tables,
2470  join_hash_table_ptrs,
2471  render_allocator_map_ptr);
2472  } catch (const OutOfMemory&) {
2473  return ERR_OUT_OF_GPU_MEM;
2474  } catch (const OutOfRenderMemory&) {
2475  return ERR_OUT_OF_RENDER_MEM;
2476  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2477  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
2478  } catch (const std::bad_alloc&) {
2479  return ERR_SPECULATIVE_TOP_OOM;
2480  } catch (const std::exception& e) {
2481  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2482  }
2483  }
2484 
2485  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2486  error_code == Executor::ERR_DIV_BY_ZERO ||
2487  error_code == Executor::ERR_OUT_OF_TIME ||
2488  error_code == Executor::ERR_INTERRUPTED) {
2489  return error_code;
2490  }
2491 
2492  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2493  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2494  results =
2495  query_exe_context->getRowSet(ra_exe_unit, query_exe_context->query_mem_desc_);
2496  CHECK(results);
2497  results->holdLiterals(hoist_buf);
2498  }
2499  if (error_code < 0 && render_allocator_map_ptr) {
2500  // More rows passed the filter than available slots. We don't have a count to check,
2501  // so assume we met the limit if a scan limit is set
2502  if (scan_limit != 0) {
2503  return 0;
2504  } else {
2505  return error_code;
2506  }
2507  }
2508  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
2509  return error_code; // unlucky, not enough results and we ran out of slots
2510  }
2511 
2512  return 0;
2513 }
2514 
2515 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
2516  const int device_id) {
2517  std::vector<int64_t> table_ptrs;
2518  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
2519  for (auto hash_table : join_hash_tables) {
2520  if (!hash_table) {
2521  CHECK(table_ptrs.empty());
2522  return {};
2523  }
2524  table_ptrs.push_back(hash_table->getJoinHashBuffer(
2525  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
2526  }
2527  return table_ptrs;
2528 }
2529 
2530 namespace {
2531 
2532 template <class T>
2533 int64_t insert_one_dict_str(T* col_data,
2534  const std::string& columnName,
2535  const SQLTypeInfo& columnType,
2536  const Analyzer::Constant* col_cv,
2537  const Catalog_Namespace::Catalog& catalog) {
2538  if (col_cv->get_is_null()) {
2539  *col_data = inline_fixed_encoding_null_val(columnType);
2540  } else {
2541  const int dict_id = columnType.get_comp_param();
2542  const auto col_datum = col_cv->get_constval();
2543  const auto& str = *col_datum.stringval;
2544  const auto dd = catalog.getMetadataForDict(dict_id);
2545  CHECK(dd && dd->stringDict);
2546  int32_t str_id = dd->stringDict->getOrAdd(str);
2547  const bool checkpoint_ok = dd->stringDict->checkpoint();
2548  if (!checkpoint_ok) {
2549  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2550  columnName);
2551  }
2552  const bool invalid = str_id > max_valid_int_value<T>();
2553  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2554  if (invalid) {
2555  LOG(ERROR) << "Could not encode string: " << str
2556  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2557  << " bits. Will store NULL instead.";
2558  }
2559  str_id = inline_fixed_encoding_null_val(columnType);
2560  }
2561  *col_data = str_id;
2562  }
2563  return *col_data;
2564 }
2565 
2566 template <class T>
2567 int64_t insert_one_dict_str(T* col_data,
2568  const ColumnDescriptor* cd,
2569  const Analyzer::Constant* col_cv,
2570  const Catalog_Namespace::Catalog& catalog) {
2571  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2572 }
2573 
2574 } // namespace
2575 
2576 namespace Importer_NS {
2577 
2578 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti);
2579 
2580 } // namespace Importer_NS
2581 
2583  const auto plan = root_plan->get_plan();
2584  CHECK(plan);
2585  const auto values_plan = dynamic_cast<const Planner::ValuesScan*>(plan);
2586  if (!values_plan) {
2587  throw std::runtime_error(
2588  "Only simple INSERT of immediate tuples is currently supported");
2589  }
2590  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
2591  const auto& targets = values_plan->get_targetlist();
2592  const int table_id = root_plan->get_result_table_id();
2593  const auto& col_id_list = root_plan->get_result_col_list();
2594  std::vector<const ColumnDescriptor*> col_descriptors;
2595  std::vector<int> col_ids;
2596  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2597  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2598  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2599  auto& cat = root_plan->getCatalog();
2600  const auto table_descriptor = cat.getMetadataForTable(table_id);
2601  const auto shard_tables = cat.getPhysicalTablesDescriptors(table_descriptor);
2602  const TableDescriptor* shard{nullptr};
2603  for (const int col_id : col_id_list) {
2604  const auto cd = get_column_descriptor(col_id, table_id, cat);
2605  const auto col_enc = cd->columnType.get_compression();
2606  if (cd->columnType.is_string()) {
2607  switch (col_enc) {
2608  case kENCODING_NONE: {
2609  auto it_ok =
2610  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2611  CHECK(it_ok.second);
2612  break;
2613  }
2614  case kENCODING_DICT: {
2615  const auto dd = cat.getMetadataForDict(cd->columnType.get_comp_param());
2616  CHECK(dd);
2617  const auto it_ok = col_buffers.emplace(
2618  col_id, std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_size()]));
2619  CHECK(it_ok.second);
2620  break;
2621  }
2622  default:
2623  CHECK(false);
2624  }
2625  } else if (cd->columnType.is_geometry()) {
2626  auto it_ok =
2627  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2628  CHECK(it_ok.second);
2629  } else if (cd->columnType.is_array()) {
2630  auto it_ok =
2631  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2632  CHECK(it_ok.second);
2633  } else {
2634  const auto it_ok = col_buffers.emplace(
2635  col_id,
2636  std::unique_ptr<uint8_t[]>(
2637  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2638  // the buffer
2639  CHECK(it_ok.second);
2640  }
2641  col_descriptors.push_back(cd);
2642  col_ids.push_back(col_id);
2643  }
2644  size_t col_idx = 0;
2646  insert_data.databaseId = cat.getCurrentDB().dbId;
2647  insert_data.tableId = table_id;
2648  int64_t int_col_val{0};
2649  for (auto target_entry : targets) {
2650  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2651  if (!col_cv) {
2652  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2653  CHECK(col_cast);
2654  CHECK_EQ(kCAST, col_cast->get_optype());
2655  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2656  }
2657  CHECK(col_cv);
2658  const auto cd = col_descriptors[col_idx];
2659  auto col_datum = col_cv->get_constval();
2660  auto col_type = cd->columnType.get_type();
2661  uint8_t* col_data_bytes{nullptr};
2662  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2663  (!cd->columnType.is_string() ||
2664  cd->columnType.get_compression() == kENCODING_DICT)) {
2665  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2666  CHECK(col_data_bytes_it != col_buffers.end());
2667  col_data_bytes = col_data_bytes_it->second.get();
2668  }
2669  switch (col_type) {
2670  case kBOOLEAN: {
2671  auto col_data = col_data_bytes;
2672  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2673  : (col_datum.boolval ? 1 : 0);
2674  break;
2675  }
2676  case kTINYINT: {
2677  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2678  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2679  : col_datum.tinyintval;
2680  int_col_val = col_datum.tinyintval;
2681  break;
2682  }
2683  case kSMALLINT: {
2684  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2685  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2686  : col_datum.smallintval;
2687  int_col_val = col_datum.smallintval;
2688  break;
2689  }
2690  case kINT: {
2691  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2692  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2693  : col_datum.intval;
2694  int_col_val = col_datum.intval;
2695  break;
2696  }
2697  case kBIGINT:
2698  case kDECIMAL:
2699  case kNUMERIC: {
2700  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2701  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2702  : col_datum.bigintval;
2703  int_col_val = col_datum.bigintval;
2704  break;
2705  }
2706  case kFLOAT: {
2707  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2708  *col_data = col_datum.floatval;
2709  break;
2710  }
2711  case kDOUBLE: {
2712  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2713  *col_data = col_datum.doubleval;
2714  break;
2715  }
2716  case kTEXT:
2717  case kVARCHAR:
2718  case kCHAR: {
2719  switch (cd->columnType.get_compression()) {
2720  case kENCODING_NONE:
2721  str_col_buffers[col_ids[col_idx]].push_back(
2722  col_datum.stringval ? *col_datum.stringval : "");
2723  break;
2724  case kENCODING_DICT: {
2725  switch (cd->columnType.get_size()) {
2726  case 1:
2727  int_col_val = insert_one_dict_str(
2728  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat);
2729  break;
2730  case 2:
2731  int_col_val = insert_one_dict_str(
2732  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat);
2733  break;
2734  case 4:
2735  int_col_val = insert_one_dict_str(
2736  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat);
2737  break;
2738  default:
2739  CHECK(false);
2740  }
2741  break;
2742  }
2743  default:
2744  CHECK(false);
2745  }
2746  break;
2747  }
2748  case kTIME:
2749  case kTIMESTAMP:
2750  case kDATE: {
2751  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2752  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2753  : col_datum.bigintval;
2754  break;
2755  }
2756  case kARRAY: {
2757  const auto is_null = col_cv->get_is_null();
2758  const auto size = cd->columnType.get_size();
2759  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2760  if (is_null) {
2761  if (size > 0) {
2762  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2763  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2764  throw std::runtime_error("Column " + cd->columnName +
2765  " doesn't accept NULL values");
2766  }
2767  int8_t* buf = (int8_t*)checked_malloc(size);
2768  put_null_array(static_cast<void*>(buf), elem_ti, "");
2769  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2770  p += elem_ti.get_size()) {
2771  put_null(static_cast<void*>(p), elem_ti, "");
2772  }
2773  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2774  } else {
2775  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2776  }
2777  break;
2778  }
2779  const auto l = col_cv->get_value_list();
2780  size_t len = l.size() * elem_ti.get_size();
2781  if (size > 0 && static_cast<size_t>(size) != len) {
2782  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2783  std::to_string(size / elem_ti.get_size()) +
2784  " values, " + "received " + std::to_string(l.size()));
2785  }
2786  if (elem_ti.is_string()) {
2787  CHECK(kENCODING_DICT == elem_ti.get_compression());
2788  CHECK(4 == elem_ti.get_size());
2789 
2790  int8_t* buf = (int8_t*)checked_malloc(len);
2791  int32_t* p = reinterpret_cast<int32_t*>(buf);
2792 
2793  int elemIndex = 0;
2794  for (auto& e : l) {
2795  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2796  CHECK(c);
2797 
2798  int_col_val =
2799  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat);
2800 
2801  elemIndex++;
2802  }
2803  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2804 
2805  } else {
2806  int8_t* buf = (int8_t*)checked_malloc(len);
2807  int8_t* p = buf;
2808  for (auto& e : l) {
2809  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2810  CHECK(c);
2811  p = Importer_NS::appendDatum(p, c->get_constval(), elem_ti);
2812  }
2813  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2814  }
2815  break;
2816  }
2817  case kPOINT:
2818  case kLINESTRING:
2819  case kPOLYGON:
2820  case kMULTIPOLYGON:
2821  str_col_buffers[col_ids[col_idx]].push_back(
2822  col_datum.stringval ? *col_datum.stringval : "");
2823  break;
2824  default:
2825  CHECK(false);
2826  }
2827  ++col_idx;
2828  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2829  const auto shard_count = shard_tables.size();
2830  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2831  shard = shard_tables[shard_idx];
2832  }
2833  }
2834  for (const auto& kv : col_buffers) {
2835  insert_data.columnIds.push_back(kv.first);
2836  DataBlockPtr p;
2837  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2838  insert_data.data.push_back(p);
2839  }
2840  for (auto& kv : str_col_buffers) {
2841  insert_data.columnIds.push_back(kv.first);
2842  DataBlockPtr p;
2843  p.stringsPtr = &kv.second;
2844  insert_data.data.push_back(p);
2845  }
2846  for (auto& kv : arr_col_buffers) {
2847  insert_data.columnIds.push_back(kv.first);
2848  DataBlockPtr p;
2849  p.arraysPtr = &kv.second;
2850  insert_data.data.push_back(p);
2851  }
2852  insert_data.numRows = 1;
2853  if (shard) {
2854  shard->fragmenter->insertData(insert_data);
2855  } else {
2856  table_descriptor->fragmenter->insertData(insert_data);
2857  }
2858 }
2859 
2860 void Executor::nukeOldState(const bool allow_lazy_fetch,
2861  const std::vector<InputTableInfo>& query_infos,
2862  const RelAlgExecutionUnit* ra_exe_unit) {
2863  const bool contains_left_deep_outer_join =
2864  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
2865  ra_exe_unit->join_quals.end(),
2866  [](const JoinCondition& join_condition) {
2867  return join_condition.type == JoinType::LEFT;
2868  }) != ra_exe_unit->join_quals.end();
2869  cgen_state_.reset(new CgenState(query_infos, contains_left_deep_outer_join));
2870  plan_state_.reset(
2871  new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join, this));
2872 }
2873 
2874 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
2875  const std::vector<InputTableInfo>& query_infos) {
2876  const auto ld_count = input_descs.size();
2877  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
2878  for (size_t i = 0; i < ld_count; ++i) {
2879  CHECK_LT(i, query_infos.size());
2880  const auto frag_count = query_infos[i].info.fragments.size();
2881  if (i > 0) {
2882  cgen_state_->frag_offsets_.push_back(nullptr);
2883  } else {
2884  if (frag_count > 1) {
2885  cgen_state_->frag_offsets_.push_back(
2886  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
2887  } else {
2888  cgen_state_->frag_offsets_.push_back(nullptr);
2889  }
2890  }
2891  }
2892 }
2893 
2895  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
2896  const std::vector<InputTableInfo>& query_infos,
2897  const MemoryLevel memory_level,
2898  const JoinHashTableInterface::HashType preferred_hash_type,
2899  ColumnCacheMap& column_cache) {
2900  std::shared_ptr<JoinHashTableInterface> join_hash_table;
2901  const int device_count = deviceCountForMemoryLevel(memory_level);
2902  CHECK_GT(device_count, 0);
2903  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
2904  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
2905  }
2906  try {
2907  if (qual_bin_oper->is_overlaps_oper()) {
2908  join_hash_table = OverlapsJoinHashTable::getInstance(
2909  qual_bin_oper, query_infos, memory_level, device_count, column_cache, this);
2910  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2911  qual_bin_oper->get_left_operand())) {
2912  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
2913  query_infos,
2914  memory_level,
2915  preferred_hash_type,
2916  device_count,
2917  column_cache,
2918  this);
2919  } else {
2920  try {
2921  join_hash_table = JoinHashTable::getInstance(qual_bin_oper,
2922  query_infos,
2923  memory_level,
2924  preferred_hash_type,
2925  device_count,
2926  column_cache,
2927  this);
2928  } catch (TooManyHashEntries&) {
2929  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
2930  CHECK_EQ(join_quals.size(), size_t(1));
2931  const auto join_qual =
2932  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
2933  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
2934  query_infos,
2935  memory_level,
2936  preferred_hash_type,
2937  device_count,
2938  column_cache,
2939  this);
2940  }
2941  }
2942  CHECK(join_hash_table);
2943  return {join_hash_table, ""};
2944  } catch (const HashJoinFail& e) {
2945  return {nullptr, e.what()};
2946  }
2947  CHECK(false);
2948  return {nullptr, ""};
2949 }
2950 
2951 int8_t Executor::warpSize() const {
2952  CHECK(catalog_);
2953  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2954  CHECK(cuda_mgr);
2955  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2956  CHECK(!dev_props.empty());
2957  return dev_props.front().warpSize;
2958 }
2959 
2960 unsigned Executor::gridSize() const {
2961  CHECK(catalog_);
2962  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2963  CHECK(cuda_mgr);
2964  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2965  return grid_size_x_ ? grid_size_x_ : 2 * dev_props.front().numMPs;
2966 }
2967 
2968 unsigned Executor::blockSize() const {
2969  CHECK(catalog_);
2970  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2971  CHECK(cuda_mgr);
2972  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2973  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
2974 }
2975 
2976 int64_t Executor::deviceCycles(int milliseconds) const {
2977  CHECK(catalog_);
2978  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2979  CHECK(cuda_mgr);
2980  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2981  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
2982 }
2983 
2984 llvm::Value* Executor::castToFP(llvm::Value* val) {
2985  if (!val->getType()->isIntegerTy()) {
2986  return val;
2987  }
2988 
2989  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
2990  llvm::Type* dest_ty{nullptr};
2991  switch (val_width) {
2992  case 32:
2993  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
2994  break;
2995  case 64:
2996  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
2997  break;
2998  default:
2999  CHECK(false);
3000  }
3001  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3002 }
3003 
3004 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3005  CHECK(val->getType()->isPointerTy());
3006 
3007  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3008  const auto val_type = val_ptr_type->getElementType();
3009  size_t val_width = 0;
3010  if (val_type->isIntegerTy()) {
3011  val_width = val_type->getIntegerBitWidth();
3012  } else {
3013  if (val_type->isFloatTy()) {
3014  val_width = 32;
3015  } else {
3016  CHECK(val_type->isDoubleTy());
3017  val_width = 64;
3018  }
3019  }
3020  CHECK_LT(size_t(0), val_width);
3021  if (bitWidth == val_width) {
3022  return val;
3023  }
3024  return cgen_state_->ir_builder_.CreateBitCast(
3025  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3026 }
3027 
3028 #define EXECUTE_INCLUDE
3029 #include "ArrayOps.cpp"
3030 #include "DateAdd.cpp"
3031 #include "StringFunctions.cpp"
3032 #undef EXECUTE_INCLUDE
3033 
3035  auto ra_exe_unit_with_deleted = ra_exe_unit;
3036  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3037  if (input_table.getSourceType() != InputSourceType::TABLE) {
3038  continue;
3039  }
3040  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3041  CHECK(td);
3042  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3043  if (!deleted_cd) {
3044  continue;
3045  }
3046  CHECK(deleted_cd->columnType.is_boolean());
3047  // check deleted column is not already present
3048  bool found = false;
3049  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3050  if (input_col.get()->getColId() == deleted_cd->columnId &&
3051  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3052  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3053  found = true;
3054  }
3055  }
3056  if (!found) {
3057  // add deleted column
3058  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3059  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3060  }
3061  }
3062  return ra_exe_unit_with_deleted;
3063 }
3064 
3065 namespace {
3066 
3067 int64_t get_hpt_scaled_value(const int64_t& val,
3068  const int32_t& ldim,
3069  const int32_t& rdim) {
3070  CHECK(ldim != rdim);
3071  return ldim > rdim ? val / DateTimeUtils::get_timestamp_precision_scale(ldim - rdim)
3072  : val * DateTimeUtils::get_timestamp_precision_scale(rdim - ldim);
3073 }
3074 
3075 } // namespace
3076 
3077 std::pair<bool, int64_t> Executor::skipFragment(
3078  const InputDescriptor& table_desc,
3079  const Fragmenter_Namespace::FragmentInfo& fragment,
3080  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3081  const std::vector<uint64_t>& frag_offsets,
3082  const size_t frag_idx) {
3083  const int table_id = table_desc.getTableId();
3084  for (const auto simple_qual : simple_quals) {
3085  const auto comp_expr =
3086  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3087  if (!comp_expr) {
3088  // is this possible?
3089  return {false, -1};
3090  }
3091  const auto lhs = comp_expr->get_left_operand();
3092  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3093  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3094  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3095  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3096  if (lhs_uexpr) {
3097  CHECK(lhs_uexpr->get_optype() ==
3098  kCAST); // We should have only been passed a cast expression
3099  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3100  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3101  continue;
3102  }
3103  } else {
3104  continue;
3105  }
3106  }
3107  const auto rhs = comp_expr->get_right_operand();
3108  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3109  if (!rhs_const) {
3110  // is this possible?
3111  return {false, -1};
3112  }
3113  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3114  continue;
3115  }
3116  const int col_id = lhs_col->get_column_id();
3117  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3118  int64_t chunk_min{0};
3119  int64_t chunk_max{0};
3120  bool is_rowid{false};
3121  size_t start_rowid{0};
3122  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3123  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3124  if (cd->isVirtualCol) {
3125  CHECK(cd->columnName == "rowid");
3126  const auto& table_generation = getTableGeneration(table_id);
3127  start_rowid = table_generation.start_rowid;
3128  chunk_min = frag_offsets[frag_idx] + start_rowid;
3129  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3130  is_rowid = true;
3131  }
3132  } else {
3133  const auto& chunk_type = lhs_col->get_type_info();
3134  chunk_min = extract_min_stat(chunk_meta_it->second.chunkStats, chunk_type);
3135  chunk_max = extract_max_stat(chunk_meta_it->second.chunkStats, chunk_type);
3136  }
3137  if (lhs->get_type_info().is_timestamp() &&
3138  (lhs_col->get_type_info().get_dimension() !=
3139  rhs_const->get_type_info().get_dimension()) &&
3140  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3141  rhs_const->get_type_info().is_high_precision_timestamp())) {
3142  // If original timestamp lhs col has different precision,
3143  // column metadata holds value in original precision
3144  // therefore adjust value to match rhs precision
3145  const auto lhs_dimen = lhs_col->get_type_info().get_dimension();
3146  const auto rhs_dimen = rhs_const->get_type_info().get_dimension();
3147  chunk_min = get_hpt_scaled_value(chunk_min, lhs_dimen, rhs_dimen);
3148  chunk_max = get_hpt_scaled_value(chunk_max, lhs_dimen, rhs_dimen);
3149  }
3150  CodeGenerator code_generator(this);
3151  const auto rhs_val = code_generator.codegenIntConst(rhs_const)->getSExtValue();
3152  switch (comp_expr->get_optype()) {
3153  case kGE:
3154  if (chunk_max < rhs_val) {
3155  return {true, -1};
3156  }
3157  break;
3158  case kGT:
3159  if (chunk_max <= rhs_val) {
3160  return {true, -1};
3161  }
3162  break;
3163  case kLE:
3164  if (chunk_min > rhs_val) {
3165  return {true, -1};
3166  }
3167  break;
3168  case kLT:
3169  if (chunk_min >= rhs_val) {
3170  return {true, -1};
3171  }
3172  break;
3173  case kEQ:
3174  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3175  return {true, -1};
3176  } else if (is_rowid) {
3177  return {false, rhs_val - start_rowid};
3178  }
3179  break;
3180  default:
3181  break;
3182  }
3183  }
3184  return {false, -1};
3185 }
3186 
3187 /*
3188  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3189  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3190  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3191  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3192  * if at least one of these skipFragment calls return a true statment in its first value.
3193  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3194  * second value only if its first value is "false".
3195  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3196  * i.e., we only skip if none of the quals trigger the code to update the
3197  * rowid_lookup_key
3198  * - Only AND operations are valid and considered:
3199  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3200  * the skip
3201  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3202  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3203  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3204  * implemented differently, which cause the first one to skip correctly, but the second
3205  * one will not skip.
3206  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3207  * possible
3208  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3209  * projection, no skipping
3210  */
3211 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3212  const InputDescriptor& table_desc,
3213  const RelAlgExecutionUnit& ra_exe_unit,
3214  const Fragmenter_Namespace::FragmentInfo& fragment,
3215  const std::vector<uint64_t>& frag_offsets,
3216  const size_t frag_idx) {
3217  std::pair<bool, int64_t> skip_frag{false, -1};
3218  for (auto& inner_join : ra_exe_unit.join_quals) {
3219  if (inner_join.type != JoinType::INNER) {
3220  continue;
3221  }
3222 
3223  // extracting all the conjunctive simple_quals from the quals stored for the inner
3224  // join
3225  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3226  for (auto& qual : inner_join.quals) {
3227  auto temp_qual = qual_to_conjunctive_form(qual);
3228  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3229  temp_qual.simple_quals.begin(),
3230  temp_qual.simple_quals.end());
3231  }
3232  auto temp_skip_frag = skipFragment(
3233  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3234  if (temp_skip_frag.second != -1) {
3235  skip_frag.second = temp_skip_frag.second;
3236  return skip_frag;
3237  } else {
3238  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3239  }
3240  }
3241  return skip_frag;
3242 }
3243 
3245  const std::unordered_set<PhysicalInput>& phys_inputs) {
3246  AggregatedColRange agg_col_range_cache;
3247  CHECK(catalog_);
3248  std::unordered_set<int> phys_table_ids;
3249  for (const auto& phys_input : phys_inputs) {
3250  phys_table_ids.insert(phys_input.table_id);
3251  }
3252  std::vector<InputTableInfo> query_infos;
3253  for (const int table_id : phys_table_ids) {
3254  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3255  }
3256  for (const auto& phys_input : phys_inputs) {
3257  const auto cd =
3258  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3259  CHECK(cd);
3260  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3261  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3262  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3263  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3264  agg_col_range_cache.setColRange(phys_input, col_range);
3265  }
3266  }
3267  return agg_col_range_cache;
3268 }
3269 
3271  const std::unordered_set<PhysicalInput>& phys_inputs) {
3272  StringDictionaryGenerations string_dictionary_generations;
3273  CHECK(catalog_);
3274  for (const auto& phys_input : phys_inputs) {
3275  const auto cd =
3276  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3277  CHECK(cd);
3278  const auto& col_ti =
3279  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3280  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3281  const int dict_id = col_ti.get_comp_param();
3282  const auto dd = catalog_->getMetadataForDict(dict_id);
3283  CHECK(dd && dd->stringDict);
3284  string_dictionary_generations.setGeneration(dict_id,
3285  dd->stringDict->storageEntryCount());
3286  }
3287  }
3288  return string_dictionary_generations;
3289 }
3290 
3292  std::unordered_set<int> phys_table_ids) {
3293  TableGenerations table_generations;
3294  for (const int table_id : phys_table_ids) {
3295  const auto table_info = getTableInfo(table_id);
3296  table_generations.setGeneration(
3297  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
3298  }
3299  return table_generations;
3300 }
3301 
3302 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3303  const std::unordered_set<int>& phys_table_ids) {
3304  CHECK(catalog_);
3305  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
3306  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3307  string_dictionary_generations_ = computeStringDictionaryGenerations(phys_inputs);
3308  table_generations_ = computeTableGenerations(phys_table_ids);
3309 }
3310 
3311 std::map<std::pair<int, ::QueryRenderer::QueryRenderManager*>, std::shared_ptr<Executor>>
3313 std::mutex Executor::execute_mutex_;
int get_table_id() const
Definition: Analyzer.h:194
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file,::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:106
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:117
bool is_fp() const
Definition: sqltypes.h:481
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
bool is_boolean() const
Definition: sqltypes.h:484
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3244
const Plan * get_plan() const
Definition: Planner.h:289
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:198
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:83
ResultSetPtr collectAllDeviceResults(ExecutionDispatch &execution_dispatch, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1507
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1920
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:91
bool useCudaBuffers() const
Definition: RenderInfo.cpp:66
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static auto getMetaTypeClass(SQL_TYPE_INFO const &sql_type_info) -> MetaTypeClassContainer
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
db_id_(db_id)
ExecutorDeviceType getDeviceType() const
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:2984
double g_bump_allocator_step_reduction
Definition: Execute.cpp:100
gpu_active_modules_device_mask_(0x0)
int get_result_table_id() const
Definition: Planner.h:291
block_size_x_(block_size_x)
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1042
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
Definition: sqltypes.h:52
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:125
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(),::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:127
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1067
bool g_enable_bump_allocator
Definition: Execute.cpp:99
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
bool g_cluster
grid_size_x_(grid_size_x)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:90
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
std::unique_ptr< llvm::Module > udf_gpu_module
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:241
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:1816
bool g_enable_direct_columnarization
Definition: Execute.cpp:101
void dispatchFragments(const std::function< void(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)> dispatch, const ExecutionDispatch &execution_dispatch, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, QueryFragmentDescriptor &fragment_descriptor, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1692
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1001
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1346
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:947
static const int max_gpu_count
Definition: Execute.h:991
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2151
std::unordered_map< int, CgenState::LiteralValues > literal_values
Definition: Execute.h:512
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
int64_t float_to_double_bin(int32_t val, bool nullable=false)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
#define LOG(tag)
Definition: Logger.h:185
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:65
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const InputTableInfo &table_info, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1031
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:287
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:300
Definition: sqldefs.h:36
const ExecutorOptLevel opt_level_
std::string join(T const &container, std::string const &delim)
bool get_is_null() const
Definition: Analyzer.h:328
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3291
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3034
debug_dir_(debug_dir)
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
#define CHECK_GE(x, y)
Definition: Logger.h:203
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:614
Definition: sqldefs.h:49
Definition: sqldefs.h:30
void assignFragsToKernelDispatch(DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:298
render_manager_(render_manager)
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:85
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
const Catalog_Namespace::Catalog & getCatalog() const
Definition: Planner.h:293
const RelAlgExecutionUnit & getExecutionUnit() const
bool is_varlen() const
Definition: sqltypes.h:491
std::shared_ptr< ResultSet > ResultSetPtr
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1414
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:971
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:120
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:76
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:597
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3004
size_t cuda_block_size
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const std::vector< InputDescriptor > input_descs
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t * > &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2230
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define CHECK_GT(x, y)
Definition: Logger.h:202
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:211
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1597
std::vector< FragmentsPerTable > FragmentsList
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:1995
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:296
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:87
gpu_code_cache_(code_cache_size)
size_t g_max_memory_allocation_size
Definition: Execute.cpp:95
bool g_enable_watchdog
bool is_number() const
Definition: sqltypes.h:482
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:160
int8_t get_min_byte_width()
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:77
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:78
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:249
void executeSimpleInsert(const Planner::RootPlan *root_plan)
Definition: Execute.cpp:2582
cpu_code_cache_(code_cache_size)
ExecutorDispatchMode
static const size_t high_scan_limit
Definition: Execute.h:468
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:797
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
Definition: sqldefs.h:71
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:2515
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:2874
bool g_null_div_by_zero
Definition: Execute.cpp:75
QueryFeatureDescriptor query_features
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:175
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:232
int8_t warpSize() const
Definition: Execute.cpp:2951
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:920
bool g_enable_columnar_output
Definition: Execute.cpp:86
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:222
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1033
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
static SysCatalog & instance()
Definition: SysCatalog.h:242
const bool allow_multifrag
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
bool g_from_table_reordering
Definition: Execute.cpp:77
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:117
Classes representing a parse tree.
size_t g_min_memory_allocation_size
Definition: Execute.cpp:96
int getDeviceCount() const
Definition: CudaMgr.h:93
bool is_time() const
Definition: sqltypes.h:483
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:959
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:2976
const SortInfo sort_info
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
#define INJECT_TIMER(DESC)
Definition: measure.h:91
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:327
#define CHECK_NE(x, y)
Definition: Logger.h:199
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:837
std::shared_timed_mutex mapd_shared_mutex
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:2533
int get_physical_coord_cols() const
Definition: sqltypes.h:362
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const std::list< int > & get_result_col_list() const
Definition: Planner.h:292
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:237
const std::vector< uint64_t > & getFragOffsets() const
size_t cuda_grid_size
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:1929
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1399
size_t g_big_group_threshold
Definition: Execute.cpp:92
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1039
float g_filter_push_down_high_frac
Definition: Execute.cpp:84
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1041
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:71
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:253
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
void setGeneration(const uint32_t id, const size_t generation)
const bool register_intel_jit_listener_
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:116
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1021
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:216
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:2860
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1350
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
interrupted_(false)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:89
const std::shared_ptr< Analyzer::Estimator > estimator
int64_t get_hpt_scaled_value(const int64_t &val, const int32_t &ldim, const int32_t &rdim)
Definition: Execute.cpp:3067
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:176
ResultSetPtr resultsUnion(ExecutionDispatch &execution_dispatch)
Definition: Execute.cpp:811
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1091
SQLAgg agg_kind
Definition: TargetInfo.h:41
bool is_geometry() const
Definition: sqltypes.h:489
ExecutorDeviceType device_type_
QueryDescriptionType getQueryDescriptionType() const
std::string * stringval
Definition: sqltypes.h:134
const std::map< int, ChunkMetadata > & getChunkMetadataMap() const
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:233
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2259
bool g_enable_window_functions
Definition: Execute.cpp:93
ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
InputSourceType getSourceType() const
const ExecutorExplainType explain_type_
#define CHECK_LT(x, y)
Definition: Logger.h:200
Definition: sqltypes.h:55
Definition: sqltypes.h:56
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:191
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2102
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &)
Definition: Execute.cpp:2014
#define CHECK_LE(x, y)
Definition: Logger.h:201
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:335
std::vector< std::pair< void *, void * > > native_functions
Definition: Execute.h:511
bool is_integer() const
Definition: sqltypes.h:479
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
Datum get_constval() const
Definition: Analyzer.h:329
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1109
ResultSetPtr collectAllDeviceShardedTopResults(ExecutionDispatch &execution_dispatch) const
Definition: Execute.cpp:1618
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3077
bool g_enable_experimental_string_functions
Definition: sqldefs.h:71
unsigned gridSize() const
Definition: Execute.cpp:2960
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:1948
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t frag_idx
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1547
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3270
std::unique_ptr< llvm::Module > udf_cpu_module
Definition: sqltypes.h:44
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
bool g_cache_string_hash
Definition: Execute.cpp:88
void assignFragsToMultiDispatch(DISPATCH_FCN f) const
bool g_enable_table_functions
Definition: Execute.cpp:94
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2395
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3211
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2116
llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant)
Definition: ConstantIR.cpp:88
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1470
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:626
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1013
bool g_enable_filter_push_down
Definition: Execute.cpp:82
std::list< std::shared_ptr< Analyzer::Expr > > quals
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:861
bool is_string() const
Definition: sqltypes.h:477
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
#define DEBUG_TIMER(name)
Definition: Logger.h:296
size_t getNumBytesForFetchedRow() const
Definition: Execute.cpp:261
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
MetaTypeClassHandler< SPECIALIZED_HANDLER, Geometry > GeoVsNonGeoClassHandler
const int8_t * literals
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:48
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2402
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t * > &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2169
bool g_allow_cpu_retry
Definition: Execute.cpp:74
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
const Expr * get_left_operand() const
Definition: Analyzer.h:436
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1150
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1666
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1378
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:257
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
Definition: sqltypes.h:48
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:659
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:195
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
specifies the content in-memory of a row in the table metadata table
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1301
unsigned blockSize() const
Definition: Execute.cpp:2968
const bool with_dynamic_watchdog_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
static std::map< std::pair< int,::QueryRenderer::QueryRenderManager * >, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1029
int8_t * numbersPtr
Definition: sqltypes.h:140
const size_t offset
bool is_decimal() const
Definition: sqltypes.h:480
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:73
Definition: sqldefs.h:71
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
int cpu_threads()
Definition: thread_count.h:25
static std::mutex execute_mutex_
Definition: Execute.h:1030
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:607
temporary_tables_(nullptr)
std::string columnName
Descriptor for the fragments required for a query.
Definition: sqldefs.h:71
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1681
int getNestLevel() const
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2894
const InputDescriptor & getScanDesc() const
#define IS_GEO(T)
Definition: sqltypes.h:167
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:185
const QueryMemoryDescriptor query_mem_desc_
#define VLOG(n)
Definition: Logger.h:280
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:245
const Executor * getExecutor() const
const bool with_watchdog
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3302
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:122
void clearMetaInfoCache()
Definition: Execute.cpp:328
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:142
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1858
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1372
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)