OmniSciDB  5ade3759e0
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "BaselineJoinHashTable.h"
21 #include "CodeGenerator.h"
22 #include "ColumnFetcher.h"
25 #include "DynamicWatchdog.h"
26 #include "EquiJoinCondition.h"
27 #include "ErrorHandling.h"
28 #include "ExpressionRewrite.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
32 #include "JsonAccessors.h"
34 #include "OverlapsJoinHashTable.h"
35 #include "QueryRewrite.h"
36 #include "QueryTemplateGenerator.h"
37 #include "ResultSetReductionJIT.h"
38 #include "RuntimeFunctions.h"
39 #include "SpeculativeTopN.h"
40 
41 #include "CudaMgr/CudaMgr.h"
43 #include "Parser/ParserNode.h"
45 #include "Shared/MapDParameters.h"
47 #include "Shared/checked_alloc.h"
48 #include "Shared/measure.h"
49 #include "Shared/scope.h"
50 #include "Shared/shard_key.h"
51 
52 #include "AggregatedColRange.h"
54 
55 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
56 #include <boost/filesystem/operations.hpp>
57 #include <boost/filesystem/path.hpp>
58 
59 #ifdef HAVE_CUDA
60 #include <cuda.h>
61 #endif // HAVE_CUDA
62 #include <future>
63 #include <memory>
64 #include <numeric>
65 #include <set>
66 #include <thread>
67 
69 bool g_enable_watchdog{false};
72 bool g_allow_cpu_retry{true};
73 bool g_null_div_by_zero{false};
77 extern bool g_enable_smem_group_by;
78 extern std::unique_ptr<llvm::Module> udf_gpu_module;
79 extern std::unique_ptr<llvm::Module> udf_cpu_module;
86 bool g_cache_string_hash{false};
87 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
90 size_t g_big_group_threshold{20000};
92 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
94  256}; // minimum memory allocation required for projection query output buffer
95  // without pre-flight count
98 
99 int const Executor::max_gpu_count;
100 
101 Executor::Executor(const int db_id,
102  const size_t block_size_x,
103  const size_t grid_size_x,
104  const std::string& debug_dir,
105  const std::string& debug_file,
106  ::QueryRenderer::QueryRenderManager* render_manager)
107  : cgen_state_(new CgenState({}, false))
109  , interrupted_(false)
112  , render_manager_(render_manager)
113  , block_size_x_(block_size_x)
114  , grid_size_x_(grid_size_x)
115  , debug_dir_(debug_dir)
116  , debug_file_(debug_file)
117  , db_id_(db_id)
118  , catalog_(nullptr)
119  , temporary_tables_(nullptr)
120  , input_table_info_cache_(this) {}
121 
122 std::shared_ptr<Executor> Executor::getExecutor(
123  const int db_id,
124  const std::string& debug_dir,
125  const std::string& debug_file,
126  const MapDParameters mapd_parameters,
127  ::QueryRenderer::QueryRenderManager* render_manager) {
129  const auto executor_key = std::make_pair(db_id, render_manager);
130  {
131  mapd_shared_lock<mapd_shared_mutex> read_lock(executors_cache_mutex_);
132  auto it = executors_.find(executor_key);
133  if (it != executors_.end()) {
134  return it->second;
135  }
136  }
137  {
138  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
139  auto it = executors_.find(executor_key);
140  if (it != executors_.end()) {
141  return it->second;
142  }
143  auto executor = std::make_shared<Executor>(db_id,
144  mapd_parameters.cuda_block_size,
145  mapd_parameters.cuda_grid_size,
146  debug_dir,
147  debug_file,
148  render_manager);
149  auto it_ok = executors_.insert(std::make_pair(executor_key, executor));
150  CHECK(it_ok.second);
151  return executor;
152  }
153 }
154 
156  switch (memory_level) {
159  std::lock_guard<std::mutex> flush_lock(
160  execute_mutex_); // Don't flush memory while queries are running
161 
163  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
164  // The hash table cache uses CPU memory not managed by the buffer manager. In the
165  // future, we should manage these allocations with the buffer manager directly.
166  // For now, assume the user wants to purge the hash table cache when they clear
167  // CPU memory (currently used in ExecuteTest to lower memory pressure)
169  }
170  break;
171  }
172  default: {
173  throw std::runtime_error(
174  "Clearing memory levels other than the CPU level or GPU level is not "
175  "supported.");
176  }
177  }
178 }
179 
181  const int dict_id_in,
182  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
183  const bool with_generation) const {
184  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
185  CHECK(catalog_);
186  const auto dd = catalog_->getMetadataForDict(dict_id);
187  std::lock_guard<std::mutex> lock(str_dict_mutex_);
188  if (dd) {
189  CHECK(dd->stringDict);
190  CHECK_LE(dd->dictNBits, 32);
191  CHECK(row_set_mem_owner);
192  const auto generation = with_generation
194  : ssize_t(-1);
195  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
196  }
197  CHECK_EQ(0, dict_id);
198  if (!lit_str_dict_proxy_) {
199  std::shared_ptr<StringDictionary> tsd =
200  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
201  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
202  }
203  return lit_str_dict_proxy_.get();
204 }
205 
206 bool Executor::isCPUOnly() const {
207  CHECK(catalog_);
208  return !catalog_->getDataMgr().getCudaMgr();
209 }
210 
212  const Analyzer::ColumnVar* col_var) const {
214  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
215 }
216 
218  const Analyzer::ColumnVar* col_var,
219  int n) const {
220  const auto cd = getColumnDescriptor(col_var);
221  if (!cd || n > cd->columnType.get_physical_cols()) {
222  return nullptr;
223  }
225  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
226 }
227 
229  return catalog_;
230 }
231 
232 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
233  return row_set_mem_owner_;
234 }
235 
237  return temporary_tables_;
238 }
239 
241  return input_table_info_cache_.getTableInfo(table_id);
242 }
243 
244 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
245  return table_generations_.getGeneration(table_id);
246 }
247 
249  return agg_col_range_cache_.getColRange(phys_input);
250 }
251 
253  size_t num_bytes = 0;
254  if (!plan_state_) {
255  return 0;
256  }
257  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
258  if (fetched_col_pair.first < 0) {
259  num_bytes += 8;
260  } else {
261  const auto cd =
262  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
263  const auto& ti = cd->columnType;
264  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
265  ? 4
266  : ti.get_size();
267  if (sz < 0) {
268  // for varlen types, only account for the pointer/size for each row, for now
269  num_bytes += 16;
270  } else {
271  num_bytes += sz;
272  }
273  }
274  }
275  return num_bytes;
276 }
277 
278 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
279  const std::vector<Analyzer::Expr*>& target_exprs) const {
281  CHECK(catalog_);
282  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
283  for (const auto target_expr : target_exprs) {
284  if (!plan_state_->isLazyFetchColumn(target_expr)) {
285  col_lazy_fetch_info.emplace_back(
286  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
287  } else {
288  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
289  CHECK(col_var);
290  auto col_id = col_var->get_column_id();
291  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
292  auto cd = (col_var->get_table_id() > 0)
293  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
294  : nullptr;
295  if (cd && IS_GEO(cd->columnType.get_type())) {
296  // Geo coords cols will be processed in sequence. So we only need to track the
297  // first coords col in lazy fetch info.
298  {
299  auto cd0 =
300  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
301  auto col0_ti = cd0->columnType;
302  CHECK(!cd0->isVirtualCol);
303  auto col0_var = makeExpr<Analyzer::ColumnVar>(
304  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
305  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
306  col_lazy_fetch_info.emplace_back(
307  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
308  }
309  } else {
310  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
311  const auto& col_ti = col_var->get_type_info();
312  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
313  }
314  }
315  }
316  return col_lazy_fetch_info;
317 }
318 
324 }
325 
326 std::vector<int8_t> Executor::serializeLiterals(
327  const std::unordered_map<int, CgenState::LiteralValues>& literals,
328  const int device_id) {
329  if (literals.empty()) {
330  return {};
331  }
332  const auto dev_literals_it = literals.find(device_id);
333  CHECK(dev_literals_it != literals.end());
334  const auto& dev_literals = dev_literals_it->second;
335  size_t lit_buf_size{0};
336  std::vector<std::string> real_strings;
337  std::vector<std::vector<double>> double_array_literals;
338  std::vector<std::vector<int8_t>> align64_int8_array_literals;
339  std::vector<std::vector<int32_t>> int32_array_literals;
340  std::vector<std::vector<int8_t>> align32_int8_array_literals;
341  std::vector<std::vector<int8_t>> int8_array_literals;
342  for (const auto& lit : dev_literals) {
343  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
344  if (lit.which() == 7) {
345  const auto p = boost::get<std::string>(&lit);
346  CHECK(p);
347  real_strings.push_back(*p);
348  } else if (lit.which() == 8) {
349  const auto p = boost::get<std::vector<double>>(&lit);
350  CHECK(p);
351  double_array_literals.push_back(*p);
352  } else if (lit.which() == 9) {
353  const auto p = boost::get<std::vector<int32_t>>(&lit);
354  CHECK(p);
355  int32_array_literals.push_back(*p);
356  } else if (lit.which() == 10) {
357  const auto p = boost::get<std::vector<int8_t>>(&lit);
358  CHECK(p);
359  int8_array_literals.push_back(*p);
360  } else if (lit.which() == 11) {
361  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
362  CHECK(p);
363  if (p->second == 64) {
364  align64_int8_array_literals.push_back(p->first);
365  } else if (p->second == 32) {
366  align32_int8_array_literals.push_back(p->first);
367  } else {
368  CHECK(false);
369  }
370  }
371  }
372  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
373  throw TooManyLiterals();
374  }
375  int16_t crt_real_str_off = lit_buf_size;
376  for (const auto& real_str : real_strings) {
377  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
378  lit_buf_size += real_str.size();
379  }
380  if (double_array_literals.size() > 0) {
381  lit_buf_size = align(lit_buf_size, sizeof(double));
382  }
383  int16_t crt_double_arr_lit_off = lit_buf_size;
384  for (const auto& double_array_literal : double_array_literals) {
385  CHECK_LE(double_array_literal.size(),
386  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
387  lit_buf_size += double_array_literal.size() * sizeof(double);
388  }
389  if (align64_int8_array_literals.size() > 0) {
390  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
391  }
392  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
393  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
394  CHECK_LE(align64_int8_array_literals.size(),
395  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
396  lit_buf_size += align64_int8_array_literal.size();
397  }
398  if (int32_array_literals.size() > 0) {
399  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
400  }
401  int16_t crt_int32_arr_lit_off = lit_buf_size;
402  for (const auto& int32_array_literal : int32_array_literals) {
403  CHECK_LE(int32_array_literal.size(),
404  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
405  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
406  }
407  if (align32_int8_array_literals.size() > 0) {
408  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
409  }
410  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
411  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
412  CHECK_LE(align32_int8_array_literals.size(),
413  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
414  lit_buf_size += align32_int8_array_literal.size();
415  }
416  int16_t crt_int8_arr_lit_off = lit_buf_size;
417  for (const auto& int8_array_literal : int8_array_literals) {
418  CHECK_LE(int8_array_literal.size(),
419  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
420  lit_buf_size += int8_array_literal.size();
421  }
422  unsigned crt_real_str_idx = 0;
423  unsigned crt_double_arr_lit_idx = 0;
424  unsigned crt_align64_int8_arr_lit_idx = 0;
425  unsigned crt_int32_arr_lit_idx = 0;
426  unsigned crt_align32_int8_arr_lit_idx = 0;
427  unsigned crt_int8_arr_lit_idx = 0;
428  std::vector<int8_t> serialized(lit_buf_size);
429  size_t off{0};
430  for (const auto& lit : dev_literals) {
431  const auto lit_bytes = CgenState::literalBytes(lit);
432  off = CgenState::addAligned(off, lit_bytes);
433  switch (lit.which()) {
434  case 0: {
435  const auto p = boost::get<int8_t>(&lit);
436  CHECK(p);
437  serialized[off - lit_bytes] = *p;
438  break;
439  }
440  case 1: {
441  const auto p = boost::get<int16_t>(&lit);
442  CHECK(p);
443  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
444  break;
445  }
446  case 2: {
447  const auto p = boost::get<int32_t>(&lit);
448  CHECK(p);
449  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
450  break;
451  }
452  case 3: {
453  const auto p = boost::get<int64_t>(&lit);
454  CHECK(p);
455  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
456  break;
457  }
458  case 4: {
459  const auto p = boost::get<float>(&lit);
460  CHECK(p);
461  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
462  break;
463  }
464  case 5: {
465  const auto p = boost::get<double>(&lit);
466  CHECK(p);
467  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
468  break;
469  }
470  case 6: {
471  const auto p = boost::get<std::pair<std::string, int>>(&lit);
472  CHECK(p);
473  const auto str_id = getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
474  ->getIdOfString(p->first);
475  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
476  break;
477  }
478  case 7: {
479  const auto p = boost::get<std::string>(&lit);
480  CHECK(p);
481  int32_t off_and_len = crt_real_str_off << 16;
482  const auto& crt_real_str = real_strings[crt_real_str_idx];
483  off_and_len |= static_cast<int16_t>(crt_real_str.size());
484  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
485  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
486  ++crt_real_str_idx;
487  crt_real_str_off += crt_real_str.size();
488  break;
489  }
490  case 8: {
491  const auto p = boost::get<std::vector<double>>(&lit);
492  CHECK(p);
493  int32_t off_and_len = crt_double_arr_lit_off << 16;
494  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
495  int32_t len = crt_double_arr_lit.size();
496  CHECK_EQ((len >> 16), 0);
497  off_and_len |= static_cast<int16_t>(len);
498  int32_t double_array_bytesize = len * sizeof(double);
499  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
500  memcpy(&serialized[crt_double_arr_lit_off],
501  crt_double_arr_lit.data(),
502  double_array_bytesize);
503  ++crt_double_arr_lit_idx;
504  crt_double_arr_lit_off += double_array_bytesize;
505  break;
506  }
507  case 9: {
508  const auto p = boost::get<std::vector<int32_t>>(&lit);
509  CHECK(p);
510  int32_t off_and_len = crt_int32_arr_lit_off << 16;
511  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
512  int32_t len = crt_int32_arr_lit.size();
513  CHECK_EQ((len >> 16), 0);
514  off_and_len |= static_cast<int16_t>(len);
515  int32_t int32_array_bytesize = len * sizeof(int32_t);
516  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
517  memcpy(&serialized[crt_int32_arr_lit_off],
518  crt_int32_arr_lit.data(),
519  int32_array_bytesize);
520  ++crt_int32_arr_lit_idx;
521  crt_int32_arr_lit_off += int32_array_bytesize;
522  break;
523  }
524  case 10: {
525  const auto p = boost::get<std::vector<int8_t>>(&lit);
526  CHECK(p);
527  int32_t off_and_len = crt_int8_arr_lit_off << 16;
528  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
529  int32_t len = crt_int8_arr_lit.size();
530  CHECK_EQ((len >> 16), 0);
531  off_and_len |= static_cast<int16_t>(len);
532  int32_t int8_array_bytesize = len;
533  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
534  memcpy(&serialized[crt_int8_arr_lit_off],
535  crt_int8_arr_lit.data(),
536  int8_array_bytesize);
537  ++crt_int8_arr_lit_idx;
538  crt_int8_arr_lit_off += int8_array_bytesize;
539  break;
540  }
541  case 11: {
542  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
543  CHECK(p);
544  if (p->second == 64) {
545  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
546  const auto& crt_align64_int8_arr_lit =
547  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
548  int32_t len = crt_align64_int8_arr_lit.size();
549  CHECK_EQ((len >> 16), 0);
550  off_and_len |= static_cast<int16_t>(len);
551  int32_t align64_int8_array_bytesize = len;
552  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
553  memcpy(&serialized[crt_align64_int8_arr_lit_off],
554  crt_align64_int8_arr_lit.data(),
555  align64_int8_array_bytesize);
556  ++crt_align64_int8_arr_lit_idx;
557  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
558  } else if (p->second == 32) {
559  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
560  const auto& crt_align32_int8_arr_lit =
561  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
562  int32_t len = crt_align32_int8_arr_lit.size();
563  CHECK_EQ((len >> 16), 0);
564  off_and_len |= static_cast<int16_t>(len);
565  int32_t align32_int8_array_bytesize = len;
566  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
567  memcpy(&serialized[crt_align32_int8_arr_lit_off],
568  crt_align32_int8_arr_lit.data(),
569  align32_int8_array_bytesize);
570  ++crt_align32_int8_arr_lit_idx;
571  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
572  } else {
573  CHECK(false);
574  }
575  break;
576  }
577  default:
578  CHECK(false);
579  }
580  }
581  return serialized;
582 }
583 
584 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
585  if (device_type == ExecutorDeviceType::GPU) {
586  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
587  CHECK(cuda_mgr);
588  return cuda_mgr->getDeviceCount();
589  } else {
590  return 1;
591  }
592 }
593 
595  const Data_Namespace::MemoryLevel memory_level) const {
596  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
598 }
599 
600 // TODO(alex): remove or split
601 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
602  const SQLTypeInfo& ti,
603  const int64_t agg_init_val,
604  const int8_t out_byte_width,
605  const int64_t* out_vec,
606  const size_t out_vec_sz,
607  const bool is_group_by,
608  const bool float_argument_input) {
609  switch (agg) {
610  case kAVG:
611  case kSUM:
612  if (0 != agg_init_val) {
613  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
614  int64_t agg_result = agg_init_val;
615  for (size_t i = 0; i < out_vec_sz; ++i) {
616  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
617  }
618  return {agg_result, 0};
619  } else {
620  CHECK(ti.is_fp());
621  switch (out_byte_width) {
622  case 4: {
623  int agg_result = static_cast<int32_t>(agg_init_val);
624  for (size_t i = 0; i < out_vec_sz; ++i) {
626  &agg_result,
627  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
628  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
629  }
630  const int64_t converted_bin =
631  float_argument_input
632  ? static_cast<int64_t>(agg_result)
633  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
634  return {converted_bin, 0};
635  break;
636  }
637  case 8: {
638  int64_t agg_result = agg_init_val;
639  for (size_t i = 0; i < out_vec_sz; ++i) {
641  &agg_result,
642  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
643  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
644  }
645  return {agg_result, 0};
646  break;
647  }
648  default:
649  CHECK(false);
650  }
651  }
652  }
653  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
654  int64_t agg_result = 0;
655  for (size_t i = 0; i < out_vec_sz; ++i) {
656  agg_result += out_vec[i];
657  }
658  return {agg_result, 0};
659  } else {
660  CHECK(ti.is_fp());
661  switch (out_byte_width) {
662  case 4: {
663  float r = 0.;
664  for (size_t i = 0; i < out_vec_sz; ++i) {
665  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
666  }
667  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
668  const int64_t converted_bin =
669  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
670  return {converted_bin, 0};
671  }
672  case 8: {
673  double r = 0.;
674  for (size_t i = 0; i < out_vec_sz; ++i) {
675  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
676  }
677  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
678  }
679  default:
680  CHECK(false);
681  }
682  }
683  break;
684  case kCOUNT: {
685  uint64_t agg_result = 0;
686  for (size_t i = 0; i < out_vec_sz; ++i) {
687  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
688  agg_result += out;
689  }
690  return {static_cast<int64_t>(agg_result), 0};
691  }
692  case kMIN: {
693  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
694  int64_t agg_result = agg_init_val;
695  for (size_t i = 0; i < out_vec_sz; ++i) {
696  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
697  }
698  return {agg_result, 0};
699  } else {
700  switch (out_byte_width) {
701  case 4: {
702  int32_t agg_result = static_cast<int32_t>(agg_init_val);
703  for (size_t i = 0; i < out_vec_sz; ++i) {
705  &agg_result,
706  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
707  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
708  }
709  const int64_t converted_bin =
710  float_argument_input
711  ? static_cast<int64_t>(agg_result)
712  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
713  return {converted_bin, 0};
714  }
715  case 8: {
716  int64_t agg_result = agg_init_val;
717  for (size_t i = 0; i < out_vec_sz; ++i) {
719  &agg_result,
720  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
721  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
722  }
723  return {agg_result, 0};
724  }
725  default:
726  CHECK(false);
727  }
728  }
729  }
730  case kMAX:
731  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
732  int64_t agg_result = agg_init_val;
733  for (size_t i = 0; i < out_vec_sz; ++i) {
734  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
735  }
736  return {agg_result, 0};
737  } else {
738  switch (out_byte_width) {
739  case 4: {
740  int32_t agg_result = static_cast<int32_t>(agg_init_val);
741  for (size_t i = 0; i < out_vec_sz; ++i) {
743  &agg_result,
744  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
745  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
746  }
747  const int64_t converted_bin =
748  float_argument_input ? static_cast<int64_t>(agg_result)
749  : float_to_double_bin(agg_result, !ti.get_notnull());
750  return {converted_bin, 0};
751  }
752  case 8: {
753  int64_t agg_result = agg_init_val;
754  for (size_t i = 0; i < out_vec_sz; ++i) {
756  &agg_result,
757  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
758  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
759  }
760  return {agg_result, 0};
761  }
762  default:
763  CHECK(false);
764  }
765  }
766  case kSAMPLE: {
767  int64_t agg_result = agg_init_val;
768  for (size_t i = 0; i < out_vec_sz; ++i) {
769  if (out_vec[i] != agg_init_val) {
770  agg_result = out_vec[i];
771  break;
772  }
773  }
774  return {agg_result, 0};
775  }
776  default:
777  CHECK(false);
778  }
779  abort();
780 }
781 
782 namespace {
783 
785  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
786  auto& first = results_per_device.front().first;
787  CHECK(first);
788  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
789  const auto& next = results_per_device[dev_idx].first;
790  CHECK(next);
791  first->append(*next);
792  }
793  return std::move(first);
794 }
795 
796 } // namespace
797 
799  auto& results_per_device = execution_dispatch.getFragmentResults();
800  if (results_per_device.empty()) {
801  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
802  std::vector<TargetInfo> targets;
803  for (const auto target_expr : ra_exe_unit.target_exprs) {
804  targets.push_back(get_target_info(target_expr, g_bigint_count));
805  }
806  return std::make_shared<ResultSet>(
807  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, nullptr);
808  }
809  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
810  std::sort(results_per_device.begin(),
811  results_per_device.end(),
812  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
813  CHECK_GE(lhs.second.size(), size_t(1));
814  CHECK_GE(rhs.second.size(), size_t(1));
815  return lhs.second.front() < rhs.second.front();
816  });
817 
818  return get_merged_result(results_per_device);
819 }
820 
822  const RelAlgExecutionUnit& ra_exe_unit,
823  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
824  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
825  const QueryMemoryDescriptor& query_mem_desc) const {
826  if (ra_exe_unit.estimator) {
827  return reduce_estimator_results(ra_exe_unit, results_per_device);
828  }
829 
830  if (results_per_device.empty()) {
831  std::vector<TargetInfo> targets;
832  for (const auto target_expr : ra_exe_unit.target_exprs) {
833  targets.push_back(get_target_info(target_expr, g_bigint_count));
834  }
835  return std::make_shared<ResultSet>(
836  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
837  }
838 
840  results_per_device,
841  row_set_mem_owner,
842  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
843 }
844 
846  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
847  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
848  const QueryMemoryDescriptor& query_mem_desc) const {
849  std::shared_ptr<ResultSet> reduced_results;
850 
851  const auto& first = results_per_device.front().first;
852 
853  if (query_mem_desc.getQueryDescriptionType() ==
855  results_per_device.size() > 1) {
856  const auto total_entry_count = std::accumulate(
857  results_per_device.begin(),
858  results_per_device.end(),
859  size_t(0),
860  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
861  const auto& r = rs.first;
862  return init + r->getQueryMemDesc().getEntryCount();
863  });
864  CHECK(total_entry_count);
865  auto query_mem_desc = first->getQueryMemDesc();
866  query_mem_desc.setEntryCount(total_entry_count);
867  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
869  query_mem_desc,
870  row_set_mem_owner,
871  this);
872  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
873  reduced_results->initializeStorage();
874  switch (query_mem_desc.getEffectiveKeyWidth()) {
875  case 4:
876  first->getStorage()->moveEntriesToBuffer<int32_t>(
877  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
878  break;
879  case 8:
880  first->getStorage()->moveEntriesToBuffer<int64_t>(
881  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
882  break;
883  default:
884  CHECK(false);
885  }
886  } else {
887  reduced_results = first;
888  }
889 
890 #ifdef WITH_REDUCTION_JIT
891  const auto& this_result_set = results_per_device[0].first;
892  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
893  this_result_set->getTargetInfos(),
894  this_result_set->getTargetInitVals());
895  const auto reduction_code = reduction_jit.codegen();
896 #else
897  ReductionCode reduction_code{};
898 #endif // WITH_REDUCTION_JIT
899  for (size_t i = 1; i < results_per_device.size(); ++i) {
900  reduced_results->getStorage()->reduce(
901  *(results_per_device[i].first->getStorage()), {}, reduction_code);
902  }
903 
904  return reduced_results;
905 }
906 
908  const RelAlgExecutionUnit& ra_exe_unit,
909  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
910  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
911  const QueryMemoryDescriptor& query_mem_desc) const {
912  if (results_per_device.size() == 1) {
913  return std::move(results_per_device.front().first);
914  }
915  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
917  for (const auto& result : results_per_device) {
918  auto rows = result.first;
919  CHECK(rows);
920  if (!rows) {
921  continue;
922  }
923  SpeculativeTopNMap that(
924  *rows,
925  ra_exe_unit.target_exprs,
926  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
927  m.reduce(that);
928  }
929  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
930  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
931  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
932 }
933 
934 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
935  std::unordered_set<int> available_gpus;
936  if (cat.getDataMgr().gpusPresent()) {
938  CHECK_GT(gpu_count, 0);
939  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
940  available_gpus.insert(gpu_id);
941  }
942  }
943  return available_gpus;
944 }
945 
946 size_t get_context_count(const ExecutorDeviceType device_type,
947  const size_t cpu_count,
948  const size_t gpu_count) {
949  return device_type == ExecutorDeviceType::GPU ? gpu_count
950  : static_cast<size_t>(cpu_count);
951 }
952 
953 namespace {
954 
955 // Compute a very conservative entry count for the output buffer entry count using no
956 // other information than the number of tuples in each table and multiplying them
957 // together.
958 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
960  // Check for overflows since we're multiplying potentially big table sizes.
961  using checked_size_t = boost::multiprecision::number<
962  boost::multiprecision::cpp_int_backend<64,
963  64,
964  boost::multiprecision::unsigned_magnitude,
965  boost::multiprecision::checked,
966  void>>;
967  checked_size_t max_groups_buffer_entry_guess = 1;
968  for (const auto& query_info : query_infos) {
969  CHECK(!query_info.info.fragments.empty());
970  auto it = std::max_element(query_info.info.fragments.begin(),
971  query_info.info.fragments.end(),
972  [](const FragmentInfo& f1, const FragmentInfo& f2) {
973  return f1.getNumTuples() < f2.getNumTuples();
974  });
975  max_groups_buffer_entry_guess *= it->getNumTuples();
976  }
977  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
978  // baseline group layout with that many entries anyway.
979  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
980  try {
981  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
982  max_groups_buffer_entry_guess_cap);
983  } catch (...) {
984  return max_groups_buffer_entry_guess_cap;
985  }
986 }
987 
988 std::string get_table_name(const InputDescriptor& input_desc,
989  const Catalog_Namespace::Catalog& cat) {
990  const auto source_type = input_desc.getSourceType();
991  if (source_type == InputSourceType::TABLE) {
992  const auto td = cat.getMetadataForTable(input_desc.getTableId());
993  CHECK(td);
994  return td->tableName;
995  } else {
996  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
997  }
998 }
999 
1000 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1001  const int device_count) {
1002  if (device_type == ExecutorDeviceType::GPU) {
1003  return device_count * Executor::high_scan_limit;
1004  }
1006 }
1007 
1009  const std::vector<InputTableInfo>& table_infos,
1010  const Catalog_Namespace::Catalog& cat,
1011  const ExecutorDeviceType device_type,
1012  const int device_count) {
1013  for (const auto target_expr : ra_exe_unit.target_exprs) {
1014  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1015  return;
1016  }
1017  }
1018  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1019  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1020  // Allow a query with no scan limit to run on small tables
1021  return;
1022  }
1023  if (ra_exe_unit.use_bump_allocator) {
1024  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1025  // relative to the size of the input), so we bypass this check for now
1026  return;
1027  }
1028  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1029  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1030  (!ra_exe_unit.scan_limit ||
1031  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1032  std::vector<std::string> table_names;
1033  const auto& input_descs = ra_exe_unit.input_descs;
1034  for (const auto& input_desc : input_descs) {
1035  table_names.push_back(get_table_name(input_desc, cat));
1036  }
1037  if (!ra_exe_unit.scan_limit) {
1038  throw WatchdogException(
1039  "Projection query would require a scan without a limit on table(s): " +
1040  boost::algorithm::join(table_names, ", "));
1041  } else {
1042  throw WatchdogException(
1043  "Projection query output result set on table(s): " +
1044  boost::algorithm::join(table_names, ", ") + " would contain " +
1045  std::to_string(ra_exe_unit.scan_limit) +
1046  " rows, which is more than the current system limit of " +
1047  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1048  }
1049  }
1050 }
1051 
1052 } // namespace
1053 
1054 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1055  const RelAlgExecutionUnit& ra_exe_unit) {
1056  if (ra_exe_unit.input_descs.size() < 2) {
1057  return false;
1058  }
1059 
1060  // We only support loop join at the end of folded joins
1061  // where ra_exe_unit.input_descs.size() > 2 for now.
1062  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1063 
1064  ssize_t inner_table_idx = -1;
1065  for (size_t i = 0; i < query_infos.size(); ++i) {
1066  if (query_infos[i].table_id == inner_table_id) {
1067  inner_table_idx = i;
1068  break;
1069  }
1070  }
1071  CHECK_NE(ssize_t(-1), inner_table_idx);
1072  return query_infos[inner_table_idx].info.getNumTuples() <=
1074 }
1075 
1076 namespace {
1077 
1079  const size_t new_scan_limit) {
1080  return {ra_exe_unit_in.input_descs,
1081  ra_exe_unit_in.input_col_descs,
1082  ra_exe_unit_in.simple_quals,
1083  ra_exe_unit_in.quals,
1084  ra_exe_unit_in.join_quals,
1085  ra_exe_unit_in.groupby_exprs,
1086  ra_exe_unit_in.target_exprs,
1087  ra_exe_unit_in.estimator,
1088  ra_exe_unit_in.sort_info,
1089  new_scan_limit,
1090  ra_exe_unit_in.query_features,
1091  ra_exe_unit_in.use_bump_allocator};
1092 }
1093 
1094 } // namespace
1095 
1097  size_t& max_groups_buffer_entry_guess,
1098  const bool is_agg,
1099  const std::vector<InputTableInfo>& query_infos,
1100  const RelAlgExecutionUnit& ra_exe_unit_in,
1101  const CompilationOptions& co,
1102  const ExecutionOptions& eo,
1103  const Catalog_Namespace::Catalog& cat,
1104  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1105  RenderInfo* render_info,
1106  const bool has_cardinality_estimation,
1107  ColumnCacheMap& column_cache) {
1108  try {
1109  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1110  is_agg,
1111  true,
1112  query_infos,
1113  ra_exe_unit_in,
1114  co,
1115  eo,
1116  cat,
1117  row_set_mem_owner,
1118  render_info,
1119  has_cardinality_estimation,
1120  column_cache);
1121  } catch (const CompilationRetryNewScanLimit& e) {
1122  return executeWorkUnitImpl(max_groups_buffer_entry_guess,
1123  is_agg,
1124  false,
1125  query_infos,
1126  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1127  co,
1128  eo,
1129  cat,
1130  row_set_mem_owner,
1131  render_info,
1132  has_cardinality_estimation,
1133  column_cache);
1134  }
1135 }
1136 
1138  size_t& max_groups_buffer_entry_guess,
1139  const bool is_agg,
1140  const bool allow_single_frag_table_opt,
1141  const std::vector<InputTableInfo>& query_infos,
1142  const RelAlgExecutionUnit& ra_exe_unit_in,
1143  const CompilationOptions& co,
1144  const ExecutionOptions& eo,
1145  const Catalog_Namespace::Catalog& cat,
1146  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1147  RenderInfo* render_info,
1148  const bool has_cardinality_estimation,
1149  ColumnCacheMap& column_cache) {
1150  INJECT_TIMER(Exec_executeWorkUnit);
1151  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1152  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type_);
1153  CHECK(!query_infos.empty());
1154  if (!max_groups_buffer_entry_guess) {
1155  // The query has failed the first execution attempt because of running out
1156  // of group by slots. Make the conservative choice: allocate fragment size
1157  // slots and run on the CPU.
1158  CHECK(device_type == ExecutorDeviceType::CPU);
1159  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1160  }
1161 
1162  int8_t crt_min_byte_width{get_min_byte_width()};
1163  do {
1164  ExecutionDispatch execution_dispatch(
1165  this, ra_exe_unit, query_infos, cat, row_set_mem_owner, render_info);
1166  ColumnFetcher column_fetcher(this, column_cache);
1167  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1168  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1169  try {
1170  INJECT_TIMER(execution_dispatch_comp);
1171  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1172  execution_dispatch.compile(max_groups_buffer_entry_guess,
1173  crt_min_byte_width,
1174  {device_type,
1175  co.hoist_literals_,
1176  co.opt_level_,
1178  co.explain_type_,
1180  eo,
1181  column_fetcher,
1182  has_cardinality_estimation);
1183  CHECK(query_comp_desc_owned);
1184  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1185  } catch (CompilationRetryNoCompaction&) {
1186  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1187  continue;
1188  }
1189  if (eo.just_explain) {
1190  return executeExplain(*query_comp_desc_owned);
1191  }
1192 
1193  for (const auto target_expr : ra_exe_unit.target_exprs) {
1194  plan_state_->target_exprs_.push_back(target_expr);
1195  }
1196 
1197  auto dispatch = [&execution_dispatch, &column_fetcher, &eo](
1198  const ExecutorDeviceType chosen_device_type,
1199  int chosen_device_id,
1200  const QueryCompilationDescriptor& query_comp_desc,
1201  const QueryMemoryDescriptor& query_mem_desc,
1202  const FragmentsList& frag_list,
1203  const ExecutorDispatchMode kernel_dispatch_mode,
1204  const int64_t rowid_lookup_key) {
1205  INJECT_TIMER(execution_dispatch_run);
1206  execution_dispatch.run(chosen_device_type,
1207  chosen_device_id,
1208  eo,
1209  column_fetcher,
1210  query_comp_desc,
1211  query_mem_desc,
1212  frag_list,
1213  kernel_dispatch_mode,
1214  rowid_lookup_key);
1215  };
1216 
1217  QueryFragmentDescriptor fragment_descriptor(
1218  ra_exe_unit,
1219  query_infos,
1220  query_comp_desc_owned->getDeviceType() == ExecutorDeviceType::GPU
1222  : std::vector<Data_Namespace::MemoryInfo>{},
1224 
1225  if (!eo.just_validate) {
1226  int available_cpus = cpu_threads();
1227  auto available_gpus = get_available_gpus(cat);
1228 
1229  const auto context_count =
1230  get_context_count(device_type, available_cpus, available_gpus.size());
1231  try {
1232  dispatchFragments(dispatch,
1233  execution_dispatch,
1234  query_infos,
1235  eo,
1236  is_agg,
1237  allow_single_frag_table_opt,
1238  context_count,
1239  *query_comp_desc_owned,
1240  *query_mem_desc_owned,
1241  fragment_descriptor,
1242  available_gpus,
1243  available_cpus);
1244  } catch (QueryExecutionError& e) {
1245  if (eo.with_dynamic_watchdog && interrupted_ &&
1246  e.getErrorCode() == ERR_OUT_OF_TIME) {
1248  }
1249  cat.getDataMgr().freeAllBuffers();
1251  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1252  crt_min_byte_width <<= 1;
1253  continue;
1254  }
1255  throw;
1256  }
1257  }
1258  cat.getDataMgr().freeAllBuffers();
1259  if (is_agg) {
1260  try {
1261  return collectAllDeviceResults(execution_dispatch,
1262  ra_exe_unit.target_exprs,
1263  *query_mem_desc_owned,
1264  query_comp_desc_owned->getDeviceType(),
1265  row_set_mem_owner);
1266  } catch (ReductionRanOutOfSlots&) {
1268  } catch (OverflowOrUnderflow&) {
1269  crt_min_byte_width <<= 1;
1270  continue;
1271  }
1272  }
1273  return resultsUnion(execution_dispatch);
1274 
1275  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1276 
1277  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1280  nullptr,
1281  this);
1282 }
1283 
1285  const InputTableInfo& table_info,
1286  const CompilationOptions& co,
1287  const ExecutionOptions& eo,
1288  const Catalog_Namespace::Catalog& cat,
1289  PerFragmentCB& cb) {
1290  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
1291  ColumnCacheMap column_cache;
1292 
1293  std::vector<InputTableInfo> table_infos{table_info};
1294  // TODO(adb): ensure this is under a try / catch
1295  ExecutionDispatch execution_dispatch(
1296  this, ra_exe_unit, table_infos, cat, row_set_mem_owner_, nullptr);
1297  ColumnFetcher column_fetcher(this, column_cache);
1298  std::unique_ptr<QueryCompilationDescriptor> query_comp_desc_owned;
1299  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1300  std::tie(query_comp_desc_owned, query_mem_desc_owned) =
1301  execution_dispatch.compile(0, 8, co, eo, column_fetcher, false);
1302  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1303  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1304  const auto& outer_fragments = table_info.info.fragments;
1305  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1306  ++fragment_index) {
1307  // We may want to consider in the future allowing this to execute on devices other
1308  // than CPU
1309  execution_dispatch.run(co.device_type_,
1310  0,
1311  eo,
1312  column_fetcher,
1313  *query_comp_desc_owned,
1314  *query_mem_desc_owned,
1315  {{table_id, {fragment_index}}},
1317  -1);
1318  }
1319 
1320  const auto& all_fragment_results = execution_dispatch.getFragmentResults();
1321 
1322  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1323  ++fragment_index) {
1324  const auto fragment_results = all_fragment_results[fragment_index];
1325  cb(fragment_results.first, outer_fragments[fragment_index]);
1326  }
1327 }
1328 
1330  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1331 }
1332 
1333 // Looks at the targets and returns a feasible device type. We only punt
1334 // to CPU for count distinct and we should probably fix it and remove this.
1336  const RelAlgExecutionUnit& ra_exe_unit,
1337  const ExecutorDeviceType requested_device_type) {
1338  for (const auto target_expr : ra_exe_unit.target_exprs) {
1339  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1340  if (!ra_exe_unit.groupby_exprs.empty() &&
1341  !isArchPascalOrLater(requested_device_type)) {
1342  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1343  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1344  return ExecutorDeviceType::CPU;
1345  }
1346  }
1347  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1348  return ExecutorDeviceType::CPU;
1349  }
1350  }
1351  return requested_device_type;
1352 }
1353 
1354 namespace {
1355 
1356 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1357  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1358  if (ti.is_fp()) {
1359  if (float_argument_input && ti.get_type() == kFLOAT) {
1360  int64_t float_null_val = 0;
1361  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1362  static_cast<float>(inline_fp_null_val(ti));
1363  return float_null_val;
1364  }
1365  const auto double_null_val = inline_fp_null_val(ti);
1366  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1367  }
1368  return inline_int_null_val(ti);
1369 }
1370 
1371 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1372  std::vector<int64_t>& entry,
1373  const std::vector<Analyzer::Expr*>& target_exprs,
1374  const QueryMemoryDescriptor& query_mem_desc) {
1375  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1376  const auto target_expr = target_exprs[target_idx];
1377  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1378  CHECK(agg_info.is_agg);
1379  target_infos.push_back(agg_info);
1380  if (g_cluster) {
1381  const auto executor = query_mem_desc.getExecutor();
1382  CHECK(executor);
1383  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1384  CHECK(row_set_mem_owner);
1385  const auto& count_distinct_desc =
1386  query_mem_desc.getCountDistinctDescriptor(target_idx);
1387  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1388  auto count_distinct_buffer = static_cast<int8_t*>(
1389  checked_calloc(count_distinct_desc.bitmapPaddedSizeBytes(), 1));
1390  CHECK(row_set_mem_owner);
1391  row_set_mem_owner->addCountDistinctBuffer(
1392  count_distinct_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), true);
1393  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1394  continue;
1395  }
1396  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1397  auto count_distinct_set = new std::set<int64_t>();
1398  CHECK(row_set_mem_owner);
1399  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1400  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1401  continue;
1402  }
1403  }
1404  const bool float_argument_input = takes_float_argument(agg_info);
1405  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1406  entry.push_back(0);
1407  } else if (agg_info.agg_kind == kAVG) {
1408  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1409  entry.push_back(0);
1410  } else if (agg_info.agg_kind == kSAMPLE) {
1411  if (agg_info.sql_type.is_geometry()) {
1412  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1413  entry.push_back(0);
1414  }
1415  } else if (agg_info.sql_type.is_varlen()) {
1416  entry.push_back(0);
1417  entry.push_back(0);
1418  } else {
1419  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1420  }
1421  } else {
1422  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1423  }
1424  }
1425 }
1426 
1428  const std::vector<Analyzer::Expr*>& target_exprs_in,
1429  const QueryMemoryDescriptor& query_mem_desc,
1430  const ExecutorDeviceType device_type) {
1431  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1432  std::vector<Analyzer::Expr*> target_exprs;
1433  for (const auto target_expr : target_exprs_in) {
1434  const auto target_expr_copy =
1435  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1436  CHECK(target_expr_copy);
1437  auto ti = target_expr->get_type_info();
1438  ti.set_notnull(false);
1439  target_expr_copy->set_type_info(ti);
1440  if (target_expr_copy->get_arg()) {
1441  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1442  arg_ti.set_notnull(false);
1443  target_expr_copy->get_arg()->set_type_info(arg_ti);
1444  }
1445  target_exprs_owned_copies.push_back(target_expr_copy);
1446  target_exprs.push_back(target_expr_copy.get());
1447  }
1448  std::vector<TargetInfo> target_infos;
1449  std::vector<int64_t> entry;
1450  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1451  const auto executor = query_mem_desc.getExecutor();
1452  CHECK(executor);
1453  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1454  CHECK(row_set_mem_owner);
1455  auto rs = std::make_shared<ResultSet>(
1456  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1457  rs->allocateStorage();
1458  rs->fillOneEntry(entry);
1459  return rs;
1460 }
1461 
1462 } // namespace
1463 
1465  ExecutionDispatch& execution_dispatch,
1466  const std::vector<Analyzer::Expr*>& target_exprs,
1467  const QueryMemoryDescriptor& query_mem_desc,
1468  const ExecutorDeviceType device_type,
1469  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1470  auto& result_per_device = execution_dispatch.getFragmentResults();
1471  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1473  return build_row_for_empty_input(target_exprs, query_mem_desc, device_type);
1474  }
1475  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1476  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1477  return reduceSpeculativeTopN(
1478  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1479  }
1480  const auto shard_count =
1481  device_type == ExecutorDeviceType::GPU
1483  : 0;
1484 
1485  if (shard_count && !result_per_device.empty()) {
1486  return collectAllDeviceShardedTopResults(execution_dispatch);
1487  }
1488  return reduceMultiDeviceResults(
1489  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1490 }
1491 
1492 // Collect top results from each device, stitch them together and sort. Partial
1493 // results from each device are guaranteed to be disjunct because we only go on
1494 // this path when one of the columns involved is a shard key.
1496  ExecutionDispatch& execution_dispatch) const {
1497  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1498  auto& result_per_device = execution_dispatch.getFragmentResults();
1499  const auto first_result_set = result_per_device.front().first;
1500  CHECK(first_result_set);
1501  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1502  CHECK(!top_query_mem_desc.didOutputColumnar());
1503  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1504  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1505  top_query_mem_desc.setEntryCount(0);
1506  for (auto& result : result_per_device) {
1507  const auto result_set = result.first;
1508  CHECK(result_set);
1509  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1510  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1511  top_query_mem_desc.setEntryCount(new_entry_cnt);
1512  }
1513  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1514  first_result_set->getDeviceType(),
1515  top_query_mem_desc,
1516  first_result_set->getRowSetMemOwner(),
1517  this);
1518  auto top_storage = top_result_set->allocateStorage();
1519  const auto top_result_set_buffer = top_storage->getUnderlyingBuffer();
1520  size_t top_output_row_idx{0};
1521  for (auto& result : result_per_device) {
1522  const auto result_set = result.first;
1523  CHECK(result_set);
1524  const auto& top_permutation = result_set->getPermutationBuffer();
1525  CHECK_LE(top_permutation.size(), top_n);
1526  const auto result_set_buffer = result_set->getStorage()->getUnderlyingBuffer();
1527  for (const auto sorted_idx : top_permutation) {
1528  const auto row_ptr =
1529  result_set_buffer + sorted_idx * top_query_mem_desc.getRowSize();
1530  memcpy(top_result_set_buffer + top_output_row_idx * top_query_mem_desc.getRowSize(),
1531  row_ptr,
1532  top_query_mem_desc.getRowSize());
1533  ++top_output_row_idx;
1534  }
1535  }
1536  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1537  return top_result_set;
1538 }
1539 
1540 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1541  const {
1542  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1543  const auto& join_info = plan_state_->join_info_;
1544  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1545  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1546  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1547  id_to_cond.insert(
1548  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1549  }
1550  return id_to_cond;
1551 }
1552 
1553 namespace {
1554 
1555 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1556  for (const auto& col : fetched_cols) {
1557  if (col.is_lazily_fetched) {
1558  return true;
1559  }
1560  }
1561  return false;
1562 }
1563 
1564 } // namespace
1565 
1567  const std::function<void(const ExecutorDeviceType chosen_device_type,
1568  int chosen_device_id,
1569  const QueryCompilationDescriptor& query_comp_desc,
1570  const QueryMemoryDescriptor& query_mem_desc,
1571  const FragmentsList& frag_list,
1572  const ExecutorDispatchMode kernel_dispatch_mode,
1573  const int64_t rowid_lookup_key)> dispatch,
1574  const ExecutionDispatch& execution_dispatch,
1575  const std::vector<InputTableInfo>& table_infos,
1576  const ExecutionOptions& eo,
1577  const bool is_agg,
1578  const bool allow_single_frag_table_opt,
1579  const size_t context_count,
1580  const QueryCompilationDescriptor& query_comp_desc,
1581  const QueryMemoryDescriptor& query_mem_desc,
1582  QueryFragmentDescriptor& fragment_descriptor,
1583  std::unordered_set<int>& available_gpus,
1584  int& available_cpus) {
1585  std::vector<std::future<void>> query_threads;
1586  const auto& ra_exe_unit = execution_dispatch.getExecutionUnit();
1587  CHECK(!ra_exe_unit.input_descs.empty());
1588 
1589  const auto device_type = query_comp_desc.getDeviceType();
1590  const bool uses_lazy_fetch =
1591  plan_state_->allow_lazy_fetch_ &&
1592  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
1593  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1594  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1595 
1596  const auto device_count = deviceCount(device_type);
1597  CHECK_GT(device_count, 0);
1598 
1599  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1600  execution_dispatch.getFragOffsets(),
1601  device_count,
1602  device_type,
1603  use_multifrag_kernel,
1605  this);
1606  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1607  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1608  }
1609 
1610  if (use_multifrag_kernel) {
1611  VLOG(1) << "Dispatching multifrag kernels";
1612  VLOG(1) << query_mem_desc.toString();
1613 
1614  // NB: We should never be on this path when the query is retried because of running
1615  // out of group by slots; also, for scan only queries on CPU we want the
1616  // high-granularity, fragment by fragment execution instead. For scan only queries on
1617  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1618  // buffer per fragment.
1619  auto multifrag_kernel_dispatch =
1620  [&query_threads, &dispatch, query_comp_desc, query_mem_desc](
1621  const int device_id,
1622  const FragmentsList& frag_list,
1623  const int64_t rowid_lookup_key) {
1624  query_threads.push_back(std::async(std::launch::async,
1625  dispatch,
1627  device_id,
1628  query_comp_desc,
1629  query_mem_desc,
1630  frag_list,
1632  rowid_lookup_key));
1633  };
1634  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
1635  } else {
1636  VLOG(1) << "Dispatching kernel per fragment";
1637  VLOG(1) << query_mem_desc.toString();
1638 
1639  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
1641  table_infos.size() == 1 && table_infos.front().table_id > 0) {
1642  const auto max_frag_size =
1643  table_infos.front().info.getFragmentNumTuplesUpperBound();
1644  if (max_frag_size < query_mem_desc.getEntryCount()) {
1645  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
1646  << " to match max fragment size " << max_frag_size
1647  << " for kernel per fragment execution path.";
1648  throw CompilationRetryNewScanLimit(max_frag_size);
1649  }
1650  }
1651 
1652  size_t frag_list_idx{0};
1653  auto fragment_per_kernel_dispatch = [&query_threads,
1654  &dispatch,
1655  &frag_list_idx,
1656  &device_type,
1657  query_comp_desc,
1658  query_mem_desc](const int device_id,
1659  const FragmentsList& frag_list,
1660  const int64_t rowid_lookup_key) {
1661  if (!frag_list.size()) {
1662  return;
1663  }
1664  CHECK_GE(device_id, 0);
1665 
1666  query_threads.push_back(std::async(std::launch::async,
1667  dispatch,
1668  device_type,
1669  device_id,
1670  query_comp_desc,
1671  query_mem_desc,
1672  frag_list,
1674  rowid_lookup_key));
1675 
1676  ++frag_list_idx;
1677  };
1678 
1679  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
1680  ra_exe_unit);
1681  }
1682  for (auto& child : query_threads) {
1683  child.wait();
1684  }
1685  for (auto& child : query_threads) {
1686  child.get();
1687  }
1688 }
1689 
1691  const RelAlgExecutionUnit& ra_exe_unit,
1692  const ExecutorDeviceType device_type,
1693  const size_t table_idx,
1694  const size_t outer_frag_idx,
1695  std::map<int, const TableFragments*>& selected_tables_fragments,
1696  const std::unordered_map<int, const Analyzer::BinOper*>&
1697  inner_table_id_to_join_condition) {
1698  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1699  auto table_frags_it = selected_tables_fragments.find(table_id);
1700  CHECK(table_frags_it != selected_tables_fragments.end());
1701  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
1702  const auto outer_table_fragments_it =
1703  selected_tables_fragments.find(outer_input_desc.getTableId());
1704  const auto outer_table_fragments = outer_table_fragments_it->second;
1705  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
1706  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
1707  if (!table_idx) {
1708  return {outer_frag_idx};
1709  }
1710  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
1711  auto& inner_frags = table_frags_it->second;
1712  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
1713  std::vector<size_t> all_frag_ids;
1714  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
1715  ++inner_frag_idx) {
1716  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
1717  if (skipFragmentPair(outer_fragment_info,
1718  inner_frag_info,
1719  table_idx,
1720  inner_table_id_to_join_condition,
1721  ra_exe_unit,
1722  device_type)) {
1723  continue;
1724  }
1725  all_frag_ids.push_back(inner_frag_idx);
1726  }
1727  return all_frag_ids;
1728 }
1729 
1730 // Returns true iff the join between two fragments cannot yield any results, per
1731 // shard information. The pair can be skipped to avoid full broadcast.
1733  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
1734  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
1735  const int table_idx,
1736  const std::unordered_map<int, const Analyzer::BinOper*>&
1737  inner_table_id_to_join_condition,
1738  const RelAlgExecutionUnit& ra_exe_unit,
1739  const ExecutorDeviceType device_type) {
1740  if (device_type != ExecutorDeviceType::GPU) {
1741  return false;
1742  }
1743  CHECK(table_idx >= 0 &&
1744  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
1745  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
1746  // Both tables need to be sharded the same way.
1747  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
1748  outer_fragment_info.shard == inner_fragment_info.shard) {
1749  return false;
1750  }
1751  const Analyzer::BinOper* join_condition{nullptr};
1752  if (ra_exe_unit.join_quals.empty()) {
1753  CHECK(!inner_table_id_to_join_condition.empty());
1754  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
1755  CHECK(condition_it != inner_table_id_to_join_condition.end());
1756  join_condition = condition_it->second;
1757  CHECK(join_condition);
1758  } else {
1759  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
1760  plan_state_->join_info_.join_hash_tables_.size());
1761  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
1762  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
1763  table_idx) {
1764  CHECK(!join_condition);
1765  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
1766  }
1767  }
1768  }
1769  if (!join_condition) {
1770  return false;
1771  }
1772  // TODO(adb): support fragment skipping based on the overlaps operator
1773  if (join_condition->is_overlaps_oper()) {
1774  return false;
1775  }
1776  size_t shard_count{0};
1777  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
1778  join_condition->get_left_operand())) {
1779  auto inner_outer_pairs =
1780  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
1782  join_condition, this, inner_outer_pairs);
1783  } else {
1784  shard_count = get_shard_count(join_condition, this);
1785  }
1786  if (shard_count && !ra_exe_unit.join_quals.empty()) {
1787  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
1788  }
1789  return shard_count;
1790 }
1791 
1792 namespace {
1793 
1795  const Catalog_Namespace::Catalog& cat) {
1796  const int table_id = col_desc->getScanDesc().getTableId();
1797  const int col_id = col_desc->getColId();
1798  return get_column_descriptor_maybe(col_id, table_id, cat);
1799 }
1800 
1801 } // namespace
1802 
1803 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
1804  const std::vector<InputDescriptor>& input_descs,
1805  const std::map<int, const TableFragments*>& all_tables_fragments) {
1806  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
1807  for (auto& desc : input_descs) {
1808  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
1809  CHECK(fragments_it != all_tables_fragments.end());
1810  const auto& fragments = *fragments_it->second;
1811  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
1812  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
1813  frag_offsets[i] = off;
1814  off += fragments[i].getNumTuples();
1815  }
1816  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
1817  }
1818  return tab_id_to_frag_offsets;
1819 }
1820 
1821 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
1823  const RelAlgExecutionUnit& ra_exe_unit,
1824  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
1825  const std::vector<InputDescriptor>& input_descs,
1826  const std::map<int, const TableFragments*>& all_tables_fragments) {
1827  std::vector<std::vector<int64_t>> all_num_rows;
1828  std::vector<std::vector<uint64_t>> all_frag_offsets;
1829  const auto tab_id_to_frag_offsets =
1830  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
1831  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
1832  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
1833  std::vector<int64_t> num_rows;
1834  std::vector<uint64_t> frag_offsets;
1835  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
1836  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
1837  const auto frag_id = selected_frag_ids[tab_idx];
1838  const auto fragments_it =
1839  all_tables_fragments.find(input_descs[tab_idx].getTableId());
1840  CHECK(fragments_it != all_tables_fragments.end());
1841  const auto& fragments = *fragments_it->second;
1842  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
1843  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
1844  const auto& fragment = fragments[frag_id];
1845  num_rows.push_back(fragment.getNumTuples());
1846  } else {
1847  size_t total_row_count{0};
1848  for (const auto& fragment : fragments) {
1849  total_row_count += fragment.getNumTuples();
1850  }
1851  num_rows.push_back(total_row_count);
1852  }
1853  const auto frag_offsets_it =
1854  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
1855  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
1856  const auto& offsets = frag_offsets_it->second;
1857  CHECK_LT(frag_id, offsets.size());
1858  frag_offsets.push_back(offsets[frag_id]);
1859  }
1860  all_num_rows.push_back(num_rows);
1861  // Fragment offsets of outer table should be ONLY used by rowid for now.
1862  all_frag_offsets.push_back(frag_offsets);
1863  }
1864  return {all_num_rows, all_frag_offsets};
1865 }
1866 
1867 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
1868 // all the table fragments.
1870  const RelAlgExecutionUnit& ra_exe_unit,
1871  const FragmentsList& selected_fragments) const {
1872  const auto& input_descs = ra_exe_unit.input_descs;
1873  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
1874  if (nest_level < 1 ||
1875  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
1876  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
1877  (ra_exe_unit.join_quals.empty() &&
1878  plan_state_->isLazyFetchColumn(inner_col_desc))) {
1879  return false;
1880  }
1881  const int table_id = inner_col_desc.getScanDesc().getTableId();
1882  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
1883  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
1884  const auto& fragments = selected_fragments[nest_level].fragment_ids;
1885  return fragments.size() > 1;
1886 }
1887 
1889  const ColumnFetcher& column_fetcher,
1890  const RelAlgExecutionUnit& ra_exe_unit,
1891  const int device_id,
1892  const Data_Namespace::MemoryLevel memory_level,
1893  const std::map<int, const TableFragments*>& all_tables_fragments,
1894  const FragmentsList& selected_fragments,
1895  const Catalog_Namespace::Catalog& cat,
1896  std::list<ChunkIter>& chunk_iterators,
1897  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
1899  const auto& col_global_ids = ra_exe_unit.input_col_descs;
1900  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
1901  std::vector<size_t> local_col_to_frag_pos;
1902  buildSelectedFragsMapping(selected_fragments_crossjoin,
1903  local_col_to_frag_pos,
1904  col_global_ids,
1905  selected_fragments,
1906  ra_exe_unit);
1907 
1909  selected_fragments_crossjoin);
1910 
1911  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
1912  std::vector<std::vector<int64_t>> all_num_rows;
1913  std::vector<std::vector<uint64_t>> all_frag_offsets;
1914 
1915  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
1916  std::vector<const int8_t*> frag_col_buffers(
1917  plan_state_->global_to_local_col_ids_.size());
1918  for (const auto& col_id : col_global_ids) {
1919  CHECK(col_id);
1920  const int table_id = col_id->getScanDesc().getTableId();
1921  const auto cd = try_get_column_descriptor(col_id.get(), cat);
1922  if (cd && cd->isVirtualCol) {
1923  CHECK_EQ("rowid", cd->columnName);
1924  continue;
1925  }
1926  const auto fragments_it = all_tables_fragments.find(table_id);
1927  CHECK(fragments_it != all_tables_fragments.end());
1928  const auto fragments = fragments_it->second;
1929  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
1930  CHECK(it != plan_state_->global_to_local_col_ids_.end());
1931  CHECK_LT(static_cast<size_t>(it->second),
1932  plan_state_->global_to_local_col_ids_.size());
1933  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
1934  if (!fragments->size()) {
1935  return {};
1936  }
1937  CHECK_LT(frag_id, fragments->size());
1938  auto memory_level_for_column = memory_level;
1939  if (plan_state_->columns_to_fetch_.find(
1940  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
1941  plan_state_->columns_to_fetch_.end()) {
1942  memory_level_for_column = Data_Namespace::CPU_LEVEL;
1943  }
1944  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
1945  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
1946  col_id.get(), memory_level_for_column, device_id);
1947  } else {
1948  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
1949  frag_col_buffers[it->second] =
1950  column_fetcher.getAllTableColumnFragments(table_id,
1951  col_id->getColId(),
1952  all_tables_fragments,
1953  memory_level_for_column,
1954  device_id);
1955  } else {
1956  frag_col_buffers[it->second] =
1957  column_fetcher.getOneTableColumnFragment(table_id,
1958  frag_id,
1959  col_id->getColId(),
1960  all_tables_fragments,
1961  chunks,
1962  chunk_iterators,
1963  memory_level_for_column,
1964  device_id);
1965  }
1966  }
1967  }
1968  all_frag_col_buffers.push_back(frag_col_buffers);
1969  }
1970  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
1971  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
1972  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
1973 }
1974 
1975 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
1976  const size_t scan_idx,
1977  const RelAlgExecutionUnit& ra_exe_unit) {
1978  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
1979  scan_idx > 0 &&
1980  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
1981  !selected_fragments[scan_idx].fragment_ids.empty()) {
1982  // Fetch all fragments
1983  return {size_t(0)};
1984  }
1985 
1986  return selected_fragments[scan_idx].fragment_ids;
1987 }
1988 
1990  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
1991  std::vector<size_t>& local_col_to_frag_pos,
1992  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
1993  const FragmentsList& selected_fragments,
1994  const RelAlgExecutionUnit& ra_exe_unit) {
1995  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
1996  size_t frag_pos{0};
1997  const auto& input_descs = ra_exe_unit.input_descs;
1998  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
1999  const int table_id = input_descs[scan_idx].getTableId();
2000  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2001  selected_fragments_crossjoin.push_back(
2002  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2003  for (const auto& col_id : col_global_ids) {
2004  CHECK(col_id);
2005  const auto& input_desc = col_id->getScanDesc();
2006  if (input_desc.getTableId() != table_id ||
2007  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2008  continue;
2009  }
2010  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2011  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2012  CHECK_LT(static_cast<size_t>(it->second),
2013  plan_state_->global_to_local_col_ids_.size());
2014  local_col_to_frag_pos[it->second] = frag_pos;
2015  }
2016  ++frag_pos;
2017  }
2018 }
2019 
2020 namespace {
2021 
2023  public:
2024  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2026  for (auto out : out_vec_) {
2027  delete[] out;
2028  }
2029  }
2030 
2031  private:
2032  std::vector<int64_t*> out_vec_;
2033 };
2034 } // namespace
2035 
2036 template <typename META_TYPE_CLASS>
2038  public:
2039  using ReturnType = void;
2040 
2041  // TODO: Avoid parameter struct indirection and forward directly
2042  ReturnType operator()(int const entry_count,
2043  int& error_code,
2044  TargetInfo const& agg_info,
2045  size_t& out_vec_idx,
2046  std::vector<int64_t*>& out_vec,
2047  std::vector<int64_t>& reduced_outs,
2048  QueryExecutionContext* query_exe_context) {
2049  int64_t val1;
2050  const bool float_argument_input = takes_float_argument(agg_info);
2051  if (is_distinct_target(agg_info)) {
2052  CHECK(agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2053  val1 = out_vec[out_vec_idx][0];
2054  error_code = 0;
2055  } else {
2056  const auto chosen_bytes = static_cast<size_t>(
2057  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2058  std::tie(val1, error_code) =
2060  agg_info.sql_type,
2061  query_exe_context->getAggInitValForIndex(out_vec_idx),
2062  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2063  out_vec[out_vec_idx],
2064  entry_count,
2065  false,
2066  float_argument_input);
2067  }
2068  if (error_code) {
2069  return;
2070  }
2071  reduced_outs.push_back(val1);
2072  if (agg_info.agg_kind == kAVG ||
2073  (agg_info.agg_kind == kSAMPLE &&
2074  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2075  const auto chosen_bytes = static_cast<size_t>(
2076  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx + 1));
2077  int64_t val2;
2078  std::tie(val2, error_code) = Executor::reduceResults(
2079  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2080  agg_info.sql_type,
2081  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2082  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2083  out_vec[out_vec_idx + 1],
2084  entry_count,
2085  false,
2086  false);
2087  if (error_code) {
2088  return;
2089  }
2090  reduced_outs.push_back(val2);
2091  ++out_vec_idx;
2092  }
2093  ++out_vec_idx;
2094  }
2095 };
2096 
2097 // Handles reduction for geo-types
2098 template <>
2099 class AggregateReductionEgress<Experimental::MetaTypeClass<Experimental::Geometry>> {
2100  public:
2101  using ReturnType = void;
2102 
2103  ReturnType operator()(int const entry_count,
2104  int& error_code,
2105  TargetInfo const& agg_info,
2106  size_t& out_vec_idx,
2107  std::vector<int64_t*>& out_vec,
2108  std::vector<int64_t>& reduced_outs,
2109  QueryExecutionContext* query_exe_context) {
2110  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2111  int64_t val1;
2112  const auto chosen_bytes = static_cast<size_t>(
2113  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2114  std::tie(val1, error_code) =
2116  agg_info.sql_type,
2117  query_exe_context->getAggInitValForIndex(out_vec_idx),
2118  chosen_bytes,
2119  out_vec[out_vec_idx],
2120  entry_count,
2121  false,
2122  false);
2123  if (error_code) {
2124  return;
2125  }
2126  reduced_outs.push_back(val1);
2127  out_vec_idx++;
2128  }
2129  }
2130 };
2131 
2133  const RelAlgExecutionUnit& ra_exe_unit,
2134  const CompilationResult& compilation_result,
2135  const bool hoist_literals,
2136  ResultSetPtr& results,
2137  const std::vector<Analyzer::Expr*>& target_exprs,
2138  const ExecutorDeviceType device_type,
2139  std::vector<std::vector<const int8_t*>>& col_buffers,
2140  QueryExecutionContext* query_exe_context,
2141  const std::vector<std::vector<int64_t>>& num_rows,
2142  const std::vector<std::vector<uint64_t>>& frag_offsets,
2143  Data_Namespace::DataMgr* data_mgr,
2144  const int device_id,
2145  const uint32_t start_rowid,
2146  const uint32_t num_tables,
2147  RenderInfo* render_info) {
2149  CHECK(!results);
2150  if (col_buffers.empty()) {
2151  return 0;
2152  }
2153 
2154  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2155  if (render_info) {
2156  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2157  // here, we are in non-insitu mode.
2158  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2159  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2160  "currently unsupported.";
2161  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2162  }
2163 
2164  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2165  std::vector<int64_t*> out_vec;
2166  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2167  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2168  std::unique_ptr<OutVecOwner> output_memory_scope;
2170  return ERR_INTERRUPTED;
2171  }
2172  if (device_type == ExecutorDeviceType::CPU) {
2173  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2174  compilation_result.native_functions,
2175  hoist_literals,
2176  hoist_buf,
2177  col_buffers,
2178  num_rows,
2179  frag_offsets,
2180  0,
2181  &error_code,
2182  num_tables,
2183  join_hash_table_ptrs);
2184  output_memory_scope.reset(new OutVecOwner(out_vec));
2185  } else {
2186  try {
2187  out_vec = query_exe_context->launchGpuCode(ra_exe_unit,
2188  compilation_result.native_functions,
2189  hoist_literals,
2190  hoist_buf,
2191  col_buffers,
2192  num_rows,
2193  frag_offsets,
2194  0,
2195  data_mgr,
2196  blockSize(),
2197  gridSize(),
2198  device_id,
2199  &error_code,
2200  num_tables,
2201  join_hash_table_ptrs,
2202  render_allocator_map_ptr);
2203  output_memory_scope.reset(new OutVecOwner(out_vec));
2204  } catch (const OutOfMemory&) {
2205  return ERR_OUT_OF_GPU_MEM;
2206  } catch (const std::exception& e) {
2207  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2208  }
2209  }
2210  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2211  error_code == Executor::ERR_DIV_BY_ZERO ||
2212  error_code == Executor::ERR_OUT_OF_TIME ||
2213  error_code == Executor::ERR_INTERRUPTED) {
2214  return error_code;
2215  }
2216  if (ra_exe_unit.estimator) {
2217  CHECK(!error_code);
2218  results =
2219  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2220  return 0;
2221  }
2222  std::vector<int64_t> reduced_outs;
2223  const auto num_frags = col_buffers.size();
2224  const size_t entry_count = device_type == ExecutorDeviceType::GPU
2225  ? num_frags * blockSize() * gridSize()
2226  : num_frags;
2227  if (size_t(1) == entry_count) {
2228  for (auto out : out_vec) {
2229  CHECK(out);
2230  reduced_outs.push_back(*out);
2231  }
2232  } else {
2233  size_t out_vec_idx = 0;
2234 
2235  for (const auto target_expr : target_exprs) {
2236  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2237  CHECK(agg_info.is_agg);
2238 
2239  auto meta_class(
2241  auto agg_reduction_impl =
2243  agg_reduction_impl(meta_class,
2244  entry_count,
2245  error_code,
2246  agg_info,
2247  out_vec_idx,
2248  out_vec,
2249  reduced_outs,
2250  query_exe_context);
2251  if (error_code) {
2252  break;
2253  }
2254  }
2255  }
2256 
2257  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2258  auto rows_ptr = std::shared_ptr<ResultSet>(
2259  query_exe_context->query_buffers_->result_sets_[0].release());
2260  rows_ptr->fillOneEntry(reduced_outs);
2261  results = std::move(rows_ptr);
2262  return error_code;
2263 }
2264 
2265 namespace {
2266 
2267 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2268  CHECK(scan_limit);
2269  return results && results->rowCount() < scan_limit;
2270 }
2271 
2272 } // namespace
2273 
2275  const RelAlgExecutionUnit& ra_exe_unit,
2276  const CompilationResult& compilation_result,
2277  const bool hoist_literals,
2278  ResultSetPtr& results,
2279  const ExecutorDeviceType device_type,
2280  std::vector<std::vector<const int8_t*>>& col_buffers,
2281  const std::vector<size_t> outer_tab_frag_ids,
2282  QueryExecutionContext* query_exe_context,
2283  const std::vector<std::vector<int64_t>>& num_rows,
2284  const std::vector<std::vector<uint64_t>>& frag_offsets,
2285  Data_Namespace::DataMgr* data_mgr,
2286  const int device_id,
2287  const int64_t scan_limit,
2288  const uint32_t start_rowid,
2289  const uint32_t num_tables,
2290  RenderInfo* render_info) {
2292  CHECK(!results);
2293  if (col_buffers.empty()) {
2294  return 0;
2295  }
2296  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2297  // TODO(alex):
2298  // 1. Optimize size (make keys more compact).
2299  // 2. Resize on overflow.
2300  // 3. Optimize runtime.
2301  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2302  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2303  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2305  return ERR_INTERRUPTED;
2306  }
2307 
2308  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2309  if (render_info && render_info->useCudaBuffers()) {
2310  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2311  }
2312 
2313  if (device_type == ExecutorDeviceType::CPU) {
2314  query_exe_context->launchCpuCode(ra_exe_unit,
2315  compilation_result.native_functions,
2316  hoist_literals,
2317  hoist_buf,
2318  col_buffers,
2319  num_rows,
2320  frag_offsets,
2321  scan_limit,
2322  &error_code,
2323  num_tables,
2324  join_hash_table_ptrs);
2325  } else {
2326  try {
2327  query_exe_context->launchGpuCode(ra_exe_unit,
2328  compilation_result.native_functions,
2329  hoist_literals,
2330  hoist_buf,
2331  col_buffers,
2332  num_rows,
2333  frag_offsets,
2334  scan_limit,
2335  data_mgr,
2336  blockSize(),
2337  gridSize(),
2338  device_id,
2339  &error_code,
2340  num_tables,
2341  join_hash_table_ptrs,
2342  render_allocator_map_ptr);
2343  } catch (const OutOfMemory&) {
2344  return ERR_OUT_OF_GPU_MEM;
2345  } catch (const OutOfRenderMemory&) {
2346  return ERR_OUT_OF_RENDER_MEM;
2347  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2349  } catch (const std::bad_alloc&) {
2350  return ERR_SPECULATIVE_TOP_OOM;
2351  } catch (const std::exception& e) {
2352  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2353  }
2354  }
2355 
2356  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2357  error_code == Executor::ERR_DIV_BY_ZERO ||
2358  error_code == Executor::ERR_OUT_OF_TIME ||
2359  error_code == Executor::ERR_INTERRUPTED) {
2360  return error_code;
2361  }
2362 
2363  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2364  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2365  results =
2366  query_exe_context->getRowSet(ra_exe_unit, query_exe_context->query_mem_desc_);
2367  CHECK(results);
2368  results->holdLiterals(hoist_buf);
2369  }
2370  if (error_code < 0 && render_allocator_map_ptr) {
2371  // More rows passed the filter than available slots. We don't have a count to check,
2372  // so assume we met the limit if a scan limit is set
2373  if (scan_limit != 0) {
2374  return 0;
2375  } else {
2376  return error_code;
2377  }
2378  }
2379  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
2380  return error_code; // unlucky, not enough results and we ran out of slots
2381  }
2382 
2383  return 0;
2384 }
2385 
2386 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
2387  const int device_id) {
2388  std::vector<int64_t> table_ptrs;
2389  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
2390  for (auto hash_table : join_hash_tables) {
2391  if (!hash_table) {
2392  CHECK(table_ptrs.empty());
2393  return {};
2394  }
2395  table_ptrs.push_back(hash_table->getJoinHashBuffer(
2396  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
2397  }
2398  return table_ptrs;
2399 }
2400 
2401 namespace {
2402 
2403 template <class T>
2404 int64_t insert_one_dict_str(T* col_data,
2405  const std::string& columnName,
2406  const SQLTypeInfo& columnType,
2407  const Analyzer::Constant* col_cv,
2408  const Catalog_Namespace::Catalog& catalog) {
2409  if (col_cv->get_is_null()) {
2410  *col_data = inline_fixed_encoding_null_val(columnType);
2411  } else {
2412  const int dict_id = columnType.get_comp_param();
2413  const auto col_datum = col_cv->get_constval();
2414  const auto& str = *col_datum.stringval;
2415  const auto dd = catalog.getMetadataForDict(dict_id);
2416  CHECK(dd && dd->stringDict);
2417  int32_t str_id = dd->stringDict->getOrAdd(str);
2418  const bool checkpoint_ok = dd->stringDict->checkpoint();
2419  if (!checkpoint_ok) {
2420  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2421  columnName);
2422  }
2423  const bool invalid = str_id > max_valid_int_value<T>();
2424  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2425  if (invalid) {
2426  LOG(ERROR) << "Could not encode string: " << str
2427  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2428  << " bits. Will store NULL instead.";
2429  }
2430  str_id = inline_fixed_encoding_null_val(columnType);
2431  }
2432  *col_data = str_id;
2433  }
2434  return *col_data;
2435 }
2436 
2437 template <class T>
2438 int64_t insert_one_dict_str(T* col_data,
2439  const ColumnDescriptor* cd,
2440  const Analyzer::Constant* col_cv,
2441  const Catalog_Namespace::Catalog& catalog) {
2442  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2443 }
2444 
2445 } // namespace
2446 
2447 namespace Importer_NS {
2448 
2449 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti);
2450 
2451 } // namespace Importer_NS
2452 
2454  const auto plan = root_plan->get_plan();
2455  CHECK(plan);
2456  const auto values_plan = dynamic_cast<const Planner::ValuesScan*>(plan);
2457  if (!values_plan) {
2458  throw std::runtime_error(
2459  "Only simple INSERT of immediate tuples is currently supported");
2460  }
2461  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
2462  const auto& targets = values_plan->get_targetlist();
2463  const int table_id = root_plan->get_result_table_id();
2464  const auto& col_id_list = root_plan->get_result_col_list();
2465  std::vector<const ColumnDescriptor*> col_descriptors;
2466  std::vector<int> col_ids;
2467  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2468  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2469  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2470  auto& cat = root_plan->getCatalog();
2471  const auto table_descriptor = cat.getMetadataForTable(table_id);
2472  const auto shard_tables = cat.getPhysicalTablesDescriptors(table_descriptor);
2473  const TableDescriptor* shard{nullptr};
2474  for (const int col_id : col_id_list) {
2475  const auto cd = get_column_descriptor(col_id, table_id, cat);
2476  const auto col_enc = cd->columnType.get_compression();
2477  if (cd->columnType.is_string()) {
2478  switch (col_enc) {
2479  case kENCODING_NONE: {
2480  auto it_ok =
2481  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2482  CHECK(it_ok.second);
2483  break;
2484  }
2485  case kENCODING_DICT: {
2486  const auto dd = cat.getMetadataForDict(cd->columnType.get_comp_param());
2487  CHECK(dd);
2488  const auto it_ok = col_buffers.emplace(
2489  col_id, std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_size()]));
2490  CHECK(it_ok.second);
2491  break;
2492  }
2493  default:
2494  CHECK(false);
2495  }
2496  } else if (cd->columnType.is_geometry()) {
2497  auto it_ok =
2498  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2499  CHECK(it_ok.second);
2500  } else if (cd->columnType.is_array()) {
2501  auto it_ok =
2502  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2503  CHECK(it_ok.second);
2504  } else {
2505  const auto it_ok = col_buffers.emplace(
2506  col_id,
2507  std::unique_ptr<uint8_t[]>(
2508  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2509  // the buffer
2510  CHECK(it_ok.second);
2511  }
2512  col_descriptors.push_back(cd);
2513  col_ids.push_back(col_id);
2514  }
2515  size_t col_idx = 0;
2517  insert_data.databaseId = cat.getCurrentDB().dbId;
2518  insert_data.tableId = table_id;
2519  int64_t int_col_val{0};
2520  for (auto target_entry : targets) {
2521  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2522  if (!col_cv) {
2523  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2524  CHECK(col_cast);
2525  CHECK_EQ(kCAST, col_cast->get_optype());
2526  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2527  }
2528  CHECK(col_cv);
2529  const auto cd = col_descriptors[col_idx];
2530  auto col_datum = col_cv->get_constval();
2531  auto col_type = cd->columnType.get_type();
2532  uint8_t* col_data_bytes{nullptr};
2533  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2534  (!cd->columnType.is_string() ||
2535  cd->columnType.get_compression() == kENCODING_DICT)) {
2536  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2537  CHECK(col_data_bytes_it != col_buffers.end());
2538  col_data_bytes = col_data_bytes_it->second.get();
2539  }
2540  switch (col_type) {
2541  case kBOOLEAN: {
2542  auto col_data = col_data_bytes;
2543  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2544  : (col_datum.boolval ? 1 : 0);
2545  break;
2546  }
2547  case kTINYINT: {
2548  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2549  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2550  : col_datum.tinyintval;
2551  int_col_val = col_datum.tinyintval;
2552  break;
2553  }
2554  case kSMALLINT: {
2555  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2556  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2557  : col_datum.smallintval;
2558  int_col_val = col_datum.smallintval;
2559  break;
2560  }
2561  case kINT: {
2562  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2563  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2564  : col_datum.intval;
2565  int_col_val = col_datum.intval;
2566  break;
2567  }
2568  case kBIGINT:
2569  case kDECIMAL:
2570  case kNUMERIC: {
2571  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2572  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2573  : col_datum.bigintval;
2574  int_col_val = col_datum.bigintval;
2575  break;
2576  }
2577  case kFLOAT: {
2578  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2579  *col_data = col_datum.floatval;
2580  break;
2581  }
2582  case kDOUBLE: {
2583  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2584  *col_data = col_datum.doubleval;
2585  break;
2586  }
2587  case kTEXT:
2588  case kVARCHAR:
2589  case kCHAR: {
2590  switch (cd->columnType.get_compression()) {
2591  case kENCODING_NONE:
2592  str_col_buffers[col_ids[col_idx]].push_back(
2593  col_datum.stringval ? *col_datum.stringval : "");
2594  break;
2595  case kENCODING_DICT: {
2596  switch (cd->columnType.get_size()) {
2597  case 1:
2598  int_col_val = insert_one_dict_str(
2599  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat);
2600  break;
2601  case 2:
2602  int_col_val = insert_one_dict_str(
2603  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat);
2604  break;
2605  case 4:
2606  int_col_val = insert_one_dict_str(
2607  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat);
2608  break;
2609  default:
2610  CHECK(false);
2611  }
2612  break;
2613  }
2614  default:
2615  CHECK(false);
2616  }
2617  break;
2618  }
2619  case kTIME:
2620  case kTIMESTAMP:
2621  case kDATE: {
2622  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2623  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2624  : col_datum.bigintval;
2625  break;
2626  }
2627  case kARRAY: {
2628  const auto is_null = col_cv->get_is_null();
2629  const auto size = cd->columnType.get_size();
2630  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2631  if (is_null) {
2632  if (size > 0) {
2633  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2634  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2635  throw std::runtime_error("Column " + cd->columnName +
2636  " doesn't accept NULL values");
2637  }
2638  int8_t* buf = (int8_t*)checked_malloc(size);
2639  put_null_array(static_cast<void*>(buf), elem_ti, "");
2640  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2641  p += elem_ti.get_size()) {
2642  put_null(static_cast<void*>(p), elem_ti, "");
2643  }
2644  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2645  } else {
2646  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2647  }
2648  break;
2649  }
2650  const auto l = col_cv->get_value_list();
2651  size_t len = l.size() * elem_ti.get_size();
2652  if (size > 0 && static_cast<size_t>(size) != len) {
2653  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2654  std::to_string(size / elem_ti.get_size()) +
2655  " values, " + "received " + std::to_string(l.size()));
2656  }
2657  if (elem_ti.is_string()) {
2658  CHECK(kENCODING_DICT == elem_ti.get_compression());
2659  CHECK(4 == elem_ti.get_size());
2660 
2661  int8_t* buf = (int8_t*)checked_malloc(len);
2662  int32_t* p = reinterpret_cast<int32_t*>(buf);
2663 
2664  int elemIndex = 0;
2665  for (auto& e : l) {
2666  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2667  CHECK(c);
2668 
2669  int_col_val =
2670  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat);
2671 
2672  elemIndex++;
2673  }
2674  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2675 
2676  } else {
2677  int8_t* buf = (int8_t*)checked_malloc(len);
2678  int8_t* p = buf;
2679  for (auto& e : l) {
2680  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2681  CHECK(c);
2682  p = Importer_NS::appendDatum(p, c->get_constval(), elem_ti);
2683  }
2684  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2685  }
2686  break;
2687  }
2688  case kPOINT:
2689  case kLINESTRING:
2690  case kPOLYGON:
2691  case kMULTIPOLYGON:
2692  str_col_buffers[col_ids[col_idx]].push_back(
2693  col_datum.stringval ? *col_datum.stringval : "");
2694  break;
2695  default:
2696  CHECK(false);
2697  }
2698  ++col_idx;
2699  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2700  const auto shard_count = shard_tables.size();
2701  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2702  shard = shard_tables[shard_idx];
2703  }
2704  }
2705  for (const auto& kv : col_buffers) {
2706  insert_data.columnIds.push_back(kv.first);
2707  DataBlockPtr p;
2708  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2709  insert_data.data.push_back(p);
2710  }
2711  for (auto& kv : str_col_buffers) {
2712  insert_data.columnIds.push_back(kv.first);
2713  DataBlockPtr p;
2714  p.stringsPtr = &kv.second;
2715  insert_data.data.push_back(p);
2716  }
2717  for (auto& kv : arr_col_buffers) {
2718  insert_data.columnIds.push_back(kv.first);
2719  DataBlockPtr p;
2720  p.arraysPtr = &kv.second;
2721  insert_data.data.push_back(p);
2722  }
2723  insert_data.numRows = 1;
2724  if (shard) {
2725  shard->fragmenter->insertData(insert_data);
2726  } else {
2727  table_descriptor->fragmenter->insertData(insert_data);
2728  }
2729 }
2730 
2731 void Executor::nukeOldState(const bool allow_lazy_fetch,
2732  const std::vector<InputTableInfo>& query_infos,
2733  const RelAlgExecutionUnit& ra_exe_unit) {
2734  const bool contains_left_deep_outer_join =
2735  std::find_if(ra_exe_unit.join_quals.begin(),
2736  ra_exe_unit.join_quals.end(),
2737  [](const JoinCondition& join_condition) {
2738  return join_condition.type == JoinType::LEFT;
2739  }) != ra_exe_unit.join_quals.end();
2740  cgen_state_.reset(new CgenState(query_infos, contains_left_deep_outer_join));
2741  plan_state_.reset(
2742  new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join, this));
2743 }
2744 
2745 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
2746  const std::vector<InputTableInfo>& query_infos) {
2747  const auto ld_count = input_descs.size();
2748  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
2749  for (size_t i = 0; i < ld_count; ++i) {
2750  CHECK_LT(i, query_infos.size());
2751  const auto frag_count = query_infos[i].info.fragments.size();
2752  if (i > 0) {
2753  cgen_state_->frag_offsets_.push_back(nullptr);
2754  } else {
2755  if (frag_count > 1) {
2756  cgen_state_->frag_offsets_.push_back(
2757  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
2758  } else {
2759  cgen_state_->frag_offsets_.push_back(nullptr);
2760  }
2761  }
2762  }
2763 }
2764 
2766  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
2767  const std::vector<InputTableInfo>& query_infos,
2768  const MemoryLevel memory_level,
2769  const JoinHashTableInterface::HashType preferred_hash_type,
2770  ColumnCacheMap& column_cache) {
2771  std::shared_ptr<JoinHashTableInterface> join_hash_table;
2772  const int device_count = deviceCountForMemoryLevel(memory_level);
2773  CHECK_GT(device_count, 0);
2774  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
2775  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
2776  }
2777  try {
2778  if (qual_bin_oper->is_overlaps_oper()) {
2779  join_hash_table = OverlapsJoinHashTable::getInstance(
2780  qual_bin_oper, query_infos, memory_level, device_count, column_cache, this);
2781  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2782  qual_bin_oper->get_left_operand())) {
2783  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
2784  query_infos,
2785  memory_level,
2786  preferred_hash_type,
2787  device_count,
2788  column_cache,
2789  this);
2790  } else {
2791  try {
2792  join_hash_table = JoinHashTable::getInstance(qual_bin_oper,
2793  query_infos,
2794  memory_level,
2795  preferred_hash_type,
2796  device_count,
2797  column_cache,
2798  this);
2799  } catch (TooManyHashEntries&) {
2800  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
2801  CHECK_EQ(join_quals.size(), size_t(1));
2802  const auto join_qual =
2803  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
2804  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
2805  query_infos,
2806  memory_level,
2807  preferred_hash_type,
2808  device_count,
2809  column_cache,
2810  this);
2811  }
2812  }
2813  CHECK(join_hash_table);
2814  return {join_hash_table, ""};
2815  } catch (const HashJoinFail& e) {
2816  return {nullptr, e.what()};
2817  }
2818  CHECK(false);
2819  return {nullptr, ""};
2820 }
2821 
2822 int8_t Executor::warpSize() const {
2823  CHECK(catalog_);
2824  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2825  CHECK(cuda_mgr);
2826  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2827  CHECK(!dev_props.empty());
2828  return dev_props.front().warpSize;
2829 }
2830 
2831 unsigned Executor::gridSize() const {
2832  CHECK(catalog_);
2833  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2834  CHECK(cuda_mgr);
2835  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2836  return grid_size_x_ ? grid_size_x_ : 2 * dev_props.front().numMPs;
2837 }
2838 
2839 unsigned Executor::blockSize() const {
2840  CHECK(catalog_);
2841  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2842  CHECK(cuda_mgr);
2843  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2844  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
2845 }
2846 
2847 int64_t Executor::deviceCycles(int milliseconds) const {
2848  CHECK(catalog_);
2849  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2850  CHECK(cuda_mgr);
2851  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
2852  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
2853 }
2854 
2855 llvm::Value* Executor::castToFP(llvm::Value* val) {
2856  if (!val->getType()->isIntegerTy()) {
2857  return val;
2858  }
2859 
2860  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
2861  llvm::Type* dest_ty{nullptr};
2862  switch (val_width) {
2863  case 32:
2864  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
2865  break;
2866  case 64:
2867  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
2868  break;
2869  default:
2870  CHECK(false);
2871  }
2872  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
2873 }
2874 
2875 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
2876  CHECK(val->getType()->isPointerTy());
2877 
2878  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
2879  const auto val_type = val_ptr_type->getElementType();
2880  size_t val_width = 0;
2881  if (val_type->isIntegerTy()) {
2882  val_width = val_type->getIntegerBitWidth();
2883  } else {
2884  if (val_type->isFloatTy()) {
2885  val_width = 32;
2886  } else {
2887  CHECK(val_type->isDoubleTy());
2888  val_width = 64;
2889  }
2890  }
2891  CHECK_LT(size_t(0), val_width);
2892  if (bitWidth == val_width) {
2893  return val;
2894  }
2895  return cgen_state_->ir_builder_.CreateBitCast(
2896  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
2897 }
2898 
2899 #define EXECUTE_INCLUDE
2900 #include "ArrayOps.cpp"
2901 #include "DateAdd.cpp"
2902 #include "StringFunctions.cpp"
2903 #undef EXECUTE_INCLUDE
2904 
2906  auto ra_exe_unit_with_deleted = ra_exe_unit;
2907  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
2908  if (input_table.getSourceType() != InputSourceType::TABLE) {
2909  continue;
2910  }
2911  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
2912  CHECK(td);
2913  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
2914  if (!deleted_cd) {
2915  continue;
2916  }
2917  CHECK(deleted_cd->columnType.is_boolean());
2918  // check deleted column is not already present
2919  bool found = false;
2920  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
2921  if (input_col.get()->getColId() == deleted_cd->columnId &&
2922  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
2923  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
2924  found = true;
2925  }
2926  }
2927  if (!found) {
2928  // add deleted column
2929  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
2930  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
2931  }
2932  }
2933  return ra_exe_unit_with_deleted;
2934 }
2935 
2936 namespace {
2937 
2938 int64_t get_hpt_scaled_value(const int64_t& val,
2939  const int32_t& ldim,
2940  const int32_t& rdim) {
2941  CHECK(ldim != rdim);
2942  return ldim > rdim ? val / DateTimeUtils::get_timestamp_precision_scale(ldim - rdim)
2943  : val * DateTimeUtils::get_timestamp_precision_scale(rdim - ldim);
2944 }
2945 
2946 } // namespace
2947 
2948 std::pair<bool, int64_t> Executor::skipFragment(
2949  const InputDescriptor& table_desc,
2950  const Fragmenter_Namespace::FragmentInfo& fragment,
2951  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
2952  const std::vector<uint64_t>& frag_offsets,
2953  const size_t frag_idx) {
2954  const int table_id = table_desc.getTableId();
2955  for (const auto simple_qual : simple_quals) {
2956  const auto comp_expr =
2957  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2958  if (!comp_expr) {
2959  // is this possible?
2960  return {false, -1};
2961  }
2962  const auto lhs = comp_expr->get_left_operand();
2963  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2964  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2965  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
2966  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
2967  if (lhs_uexpr) {
2968  CHECK(lhs_uexpr->get_optype() ==
2969  kCAST); // We should have only been passed a cast expression
2970  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
2971  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2972  continue;
2973  }
2974  } else {
2975  continue;
2976  }
2977  }
2978  const auto rhs = comp_expr->get_right_operand();
2979  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2980  if (!rhs_const) {
2981  // is this possible?
2982  return {false, -1};
2983  }
2984  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
2985  continue;
2986  }
2987  const int col_id = lhs_col->get_column_id();
2988  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
2989  int64_t chunk_min{0};
2990  int64_t chunk_max{0};
2991  bool is_rowid{false};
2992  size_t start_rowid{0};
2993  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
2994  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
2995  if (cd->isVirtualCol) {
2996  CHECK(cd->columnName == "rowid");
2997  const auto& table_generation = getTableGeneration(table_id);
2998  start_rowid = table_generation.start_rowid;
2999  chunk_min = frag_offsets[frag_idx] + start_rowid;
3000  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3001  is_rowid = true;
3002  }
3003  } else {
3004  const auto& chunk_type = lhs_col->get_type_info();
3005  chunk_min = extract_min_stat(chunk_meta_it->second.chunkStats, chunk_type);
3006  chunk_max = extract_max_stat(chunk_meta_it->second.chunkStats, chunk_type);
3007  }
3008  if (lhs->get_type_info().is_timestamp() &&
3009  (lhs_col->get_type_info().get_dimension() !=
3010  rhs_const->get_type_info().get_dimension()) &&
3011  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3012  rhs_const->get_type_info().is_high_precision_timestamp())) {
3013  // If original timestamp lhs col has different precision,
3014  // column metadata holds value in original precision
3015  // therefore adjust value to match rhs precision
3016  const auto lhs_dimen = lhs_col->get_type_info().get_dimension();
3017  const auto rhs_dimen = rhs_const->get_type_info().get_dimension();
3018  chunk_min = get_hpt_scaled_value(chunk_min, lhs_dimen, rhs_dimen);
3019  chunk_max = get_hpt_scaled_value(chunk_max, lhs_dimen, rhs_dimen);
3020  }
3021  CodeGenerator code_generator(this);
3022  const auto rhs_val = code_generator.codegenIntConst(rhs_const)->getSExtValue();
3023  switch (comp_expr->get_optype()) {
3024  case kGE:
3025  if (chunk_max < rhs_val) {
3026  return {true, -1};
3027  }
3028  break;
3029  case kGT:
3030  if (chunk_max <= rhs_val) {
3031  return {true, -1};
3032  }
3033  break;
3034  case kLE:
3035  if (chunk_min > rhs_val) {
3036  return {true, -1};
3037  }
3038  break;
3039  case kLT:
3040  if (chunk_min >= rhs_val) {
3041  return {true, -1};
3042  }
3043  break;
3044  case kEQ:
3045  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3046  return {true, -1};
3047  } else if (is_rowid) {
3048  return {false, rhs_val - start_rowid};
3049  }
3050  break;
3051  default:
3052  break;
3053  }
3054  }
3055  return {false, -1};
3056 }
3057 
3058 /*
3059  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3060  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3061  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3062  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3063  * if at least one of these skipFragment calls return a true statment in its first value.
3064  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3065  * second value only if its first value is "false".
3066  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3067  * i.e., we only skip if none of the quals trigger the code to update the
3068  * rowid_lookup_key
3069  * - Only AND operations are valid and considered:
3070  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3071  * the skip
3072  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3073  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3074  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3075  * implemented differently, which cause the first one to skip correctly, but the second
3076  * one will not skip.
3077  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3078  * possible
3079  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3080  * projection, no skipping
3081  */
3082 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3083  const InputDescriptor& table_desc,
3084  const RelAlgExecutionUnit& ra_exe_unit,
3085  const Fragmenter_Namespace::FragmentInfo& fragment,
3086  const std::vector<uint64_t>& frag_offsets,
3087  const size_t frag_idx) {
3088  std::pair<bool, int64_t> skip_frag{false, -1};
3089  for (auto& inner_join : ra_exe_unit.join_quals) {
3090  if (inner_join.type != JoinType::INNER) {
3091  continue;
3092  }
3093 
3094  // extracting all the conjunctive simple_quals from the quals stored for the inner
3095  // join
3096  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3097  for (auto& qual : inner_join.quals) {
3098  auto temp_qual = qual_to_conjunctive_form(qual);
3099  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3100  temp_qual.simple_quals.begin(),
3101  temp_qual.simple_quals.end());
3102  }
3103  auto temp_skip_frag = skipFragment(
3104  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3105  if (temp_skip_frag.second != -1) {
3106  skip_frag.second = temp_skip_frag.second;
3107  return skip_frag;
3108  } else {
3109  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3110  }
3111  }
3112  return skip_frag;
3113 }
3114 
3115 std::map<std::pair<int, ::QueryRenderer::QueryRenderManager*>, std::shared_ptr<Executor>>
3117 std::mutex Executor::execute_mutex_;
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr *> &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1427
int32_t getIdOfString(const std::string &str) const
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
const std::string debug_dir_
Definition: Execute.h:986
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr *> &target_exprs) const
Definition: Execute.cpp:278
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:1803
SQLAgg
Definition: sqldefs.h:71
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:195
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:81
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1540
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1794
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
int get_physical_coord_cols() const
Definition: sqltypes.h:359
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:845
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:89
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static auto getMetaTypeClass(SQL_TYPE_INFO const &sql_type_info) -> MetaTypeClassContainer
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:2855
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:217
bool is_time() const
Definition: sqltypes.h:456
double g_bump_allocator_step_reduction
Definition: Execute.cpp:97
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(), ::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:122
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &)
Definition: Execute.cpp:1888
int get_column_id() const
Definition: Analyzer.h:194
ssize_t getGeneration(const uint32_t id) const
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1013
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
Definition: sqltypes.h:51
bool is_fp() const
Definition: sqltypes.h:454
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Definition: Execute.h:970
void setEntryCount(const size_t val)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1054
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:138
int get_result_table_id() const
Definition: Planner.h:291
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:88
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:139
std::unique_ptr< llvm::Module > udf_gpu_module
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t *> &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2042
void dispatchFragments(const std::function< void(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)> dispatch, const ExecutionDispatch &execution_dispatch, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, QueryFragmentDescriptor &fragment_descriptor, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1566
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:988
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
const std::vector< uint64_t > & getFragOffsets() const
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:934
static const int max_gpu_count
Definition: Execute.h:962
std::unordered_map< int, CgenState::LiteralValues > literal_values
Definition: Execute.h:505
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:173
int64_t float_to_double_bin(int32_t val, bool nullable=false)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
static const size_t code_cache_size
Definition: Execute.h:982
#define LOG(tag)
Definition: Logger.h:182
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:65
int64_t insert_one_dict_str(T *col_data, const ColumnDescriptor *cd, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:2438
bool g_enable_columnar_output
Definition: Execute.cpp:84
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:821
InputSourceType getSourceType() const
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1002
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
void assignFragsToMultiDispatch(DISPATCH_FCN f) const
void c(const std::string &query_string, const ExecutorDeviceType device_type)
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2202
int getTableId() const
unsigned gridSize() const
Definition: Execute.cpp:2831
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:300
Definition: sqldefs.h:36
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr *> &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2132
const ExecutorOptLevel opt_level_
std::string join(T const &container, std::string const &delim)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
bool is_varlen() const
Definition: sqltypes.h:464
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:228
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
OutVecOwner(const std::vector< int64_t *> &out_vec)
Definition: Execute.cpp:2024
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2905
#define CHECK_GE(x, y)
Definition: Logger.h:200
int64_t const int32_t sz
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:601
Definition: sqldefs.h:49
Definition: sqldefs.h:30
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:83
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:994
ResultSetPtr collectAllDeviceResults(ExecutionDispatch &execution_dispatch, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1464
std::shared_ptr< ResultSet > ResultSetPtr
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:944
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:70
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:958
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:120
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:74
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_map, Executor *executor)
uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:966
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:232
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:510
static void invalidateCaches()
int64_t getAggInitValForIndex(const size_t index) const
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:2875
size_t cuda_block_size
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
int getDeviceCount() const
Definition: CudaMgr.h:81
const std::vector< InputDescriptor > input_descs
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define CHECK_GT(x, y)
Definition: Logger.h:199
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
Container for compilation results and assorted options for a single execution unit.
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< FragmentsPerTable > FragmentsList
ReductionCode codegen() const
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:297
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:85
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:155
bool g_enable_debug_timer
Definition: Execute.cpp:68
int8_t get_min_byte_width()
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:76
void executeSimpleInsert(const Planner::RootPlan *root_plan)
Definition: Execute.cpp:2453
const Executor * getExecutor() const
ExecutorDispatchMode
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:2847
bool useCudaBuffers() const
Definition: RenderInfo.cpp:60
static const size_t high_scan_limit
Definition: Execute.h:465
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:784
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
CodeCache gpu_code_cache_
Definition: Execute.h:976
Definition: sqldefs.h:71
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
std::unique_ptr< QueryMemoryInitializer > query_buffers_
const InputDescriptor & getScanDesc() const
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:2386
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:2745
bool g_null_div_by_zero
Definition: Execute.cpp:73
::QueryRenderer::QueryRenderManager * render_manager_
Definition: Execute.h:978
QueryFeatureDescriptor query_features
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1017
Datum get_constval() const
Definition: Analyzer.h:328
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:323
const size_t limit
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:55
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1004
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
bool is_integer() const
Definition: sqltypes.h:452
static SysCatalog & instance()
Definition: SysCatalog.h:240
const bool allow_multifrag
bool g_from_table_reordering
Definition: Execute.cpp:75
const bool just_validate
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:114
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:236
Classes representing a parse tree.
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:946
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
const Catalog_Namespace::Catalog & getCatalog() const
Definition: Planner.h:293
const SortInfo sort_info
void init(LogOptions const &log_opts)
Definition: Logger.cpp:260
std::mutex str_dict_mutex_
Definition: Execute.h:971
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:990
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
#define INJECT_TIMER(DESC)
Definition: measure.h:91
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:328
#define CHECK_NE(x, y)
Definition: Logger.h:196
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1008
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:240
const JoinQualsPerNestingLevel join_quals
int getNestLevel() const
std::shared_timed_mutex mapd_shared_mutex
bool is_decimal() const
Definition: sqltypes.h:453
const std::string debug_file_
Definition: Execute.h:987
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:584
const double gpu_input_mem_limit_percent
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:960
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
friend class QueryMemoryDescriptor
Definition: Execute.h:1024
ExecutorDeviceType getDeviceType() const
size_t cuda_grid_size
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1356
size_t g_big_group_threshold
Definition: Execute.cpp:90
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1010
float g_filter_push_down_high_frac
Definition: Execute.cpp:82
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:907
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:959
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1012
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:71
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
size_t g_max_memory_allocation_size
Definition: Execute.cpp:92
const bool register_intel_jit_listener_
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:116
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1008
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const unsigned block_size_x_
Definition: Execute.h:984
const unsigned grid_size_x_
Definition: Execute.h:985
specifies the content in-memory of a row in the column metadata table
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:632
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:87
const std::shared_ptr< Analyzer::Estimator > estimator
int64_t get_hpt_scaled_value(const int64_t &val, const int32_t &ldim, const int32_t &rdim)
Definition: Execute.cpp:2938
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:176
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1005
const RelAlgExecutionUnit & getExecutionUnit() const
static ResultSetPtr resultsUnion(ExecutionDispatch &execution_dispatch)
Definition: Execute.cpp:798
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:823
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1078
SQLAgg agg_kind
Definition: TargetInfo.h:41
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1348
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:248
ExecutorDeviceType device_type_
std::string * stringval
Definition: sqltypes.h:131
bool isCPUOnly() const
Definition: Execute.cpp:206
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
bool g_enable_window_functions
Definition: Execute.cpp:91
Definition: sqldefs.h:34
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments *> &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition)
Definition: Execute.cpp:1690
bool is_boolean() const
Definition: sqltypes.h:457
size_t g_min_memory_allocation_size
Definition: Execute.cpp:93
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:594
bool get_is_null() const
Definition: Analyzer.h:327
const ExecutorExplainType explain_type_
#define CHECK_LT(x, y)
Definition: Logger.h:197
Definition: sqltypes.h:54
Definition: sqltypes.h:55
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2731
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:188
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1975
const Plan * get_plan() const
Definition: Planner.h:289
#define CHECK_LE(x, y)
Definition: Logger.h:198
size_t getNumBytesForFetchedRow() const
Definition: Execute.cpp:252
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2274
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:326
StringDictionaryGenerations string_dictionary_generations_
Definition: Execute.h:995
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:993
std::vector< std::pair< void *, void * > > native_functions
Definition: Execute.h:504
Speculative top N algorithm.
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1096
CodeCache cpu_code_cache_
Definition: Execute.h:975
int get_table_id() const
Definition: Analyzer.h:193
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:2948
Definition: sqldefs.h:71
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:1869
int8_t warpSize() const
Definition: Execute.cpp:2822
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
const int64_t const uint32_t const uint32_t const uint32_t const bool const bool const int32_t frag_idx
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:116
std::unique_ptr< llvm::Module > udf_cpu_module
Definition: sqltypes.h:43
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCB &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1284
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2267
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:332
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3082
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1989
TableGenerations table_generations_
Definition: Execute.h:996
llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant)
Definition: ConstantIR.cpp:88
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
void assignFragsToKernelDispatch(DISPATCH_FCN f, const RelAlgExecutionUnit &ra_exe_unit) const
bool g_cache_string_hash
Definition: Execute.cpp:86
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1000
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCB
Definition: Execute.h:324
bool g_enable_filter_push_down
Definition: Execute.cpp:80
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool is_number() const
Definition: sqltypes.h:455
bool g_enable_bump_allocator
Definition: Execute.cpp:96
static const int32_t ERR_SPECULATIVE_TOP_OOM
Definition: Execute.h:1011
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file, ::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:101
#define CHECK(condition)
Definition: Logger.h:187
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
MetaTypeClassHandler< SPECIALIZED_HANDLER, Geometry > GeoVsNonGeoClassHandler
const int8_t * literals
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:48
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:1006
ReturnType operator()(int const entry_count, int &error_code, TargetInfo const &agg_info, size_t &out_vec_idx, std::vector< int64_t *> &out_vec, std::vector< int64_t > &reduced_outs, QueryExecutionContext *query_exe_context)
Definition: Execute.cpp:2103
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool is_geometry() const
Definition: sqltypes.h:462
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:72
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
const std::list< int > & get_result_col_list() const
Definition: Planner.h:292
bool g_enable_watchdog
Definition: Execute.cpp:69
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
ResultSetPtr collectAllDeviceShardedTopResults(ExecutionDispatch &execution_dispatch) const
Definition: Execute.cpp:1495
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1137
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1335
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:1822
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: sqltypes.h:47
const std::map< int, ChunkMetadata > & getChunkMetadataMap() const
SQLTypeInfo columnType
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:180
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
specifies the content in-memory of a row in the table metadata table
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:244
const bool with_dynamic_watchdog_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
Definition: Execute.h:936
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:211
static std::map< std::pair< int, ::QueryRenderer::QueryRenderManager * >, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1000
int8_t * numbersPtr
Definition: sqltypes.h:137
const size_t offset
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1732
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_map, Executor *executor)
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:71
QueryDescriptionType getQueryDescriptionType() const
Definition: sqldefs.h:71
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
int cpu_threads()
Definition: thread_count.h:23
static std::mutex execute_mutex_
Definition: Execute.h:1001
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1371
std::string columnName
Descriptor for the fragments required for a query.
Definition: sqldefs.h:71
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
bool is_string() const
Definition: sqltypes.h:450
ExpressionRange getColRange(const PhysicalInput &) const
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1555
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2765
#define IS_GEO(T)
Definition: sqltypes.h:164
const QueryMemoryDescriptor query_mem_desc_
#define VLOG(n)
Definition: Logger.h:277
Functions to support array operations used by the executor.
bool interrupted_
Definition: Execute.h:968
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int db_id_
Definition: Execute.h:989
const bool with_watchdog
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:119
void clearMetaInfoCache()
Definition: Execute.cpp:319
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:139
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
const TableGeneration & getGeneration(const uint32_t id) const
const TemporaryTables * temporary_tables_
Definition: Execute.h:991
unsigned blockSize() const
Definition: Execute.cpp:2839
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1329
const Expr * get_left_operand() const
Definition: Analyzer.h:435