OmniSciDB  0264ff685a
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "CodeGenerator.h"
21 #include "ColumnFetcher.h"
24 #include "DynamicWatchdog.h"
25 #include "EquiJoinCondition.h"
26 #include "ErrorHandling.h"
27 #include "ExpressionRewrite.h"
29 #include "GpuMemUtils.h"
30 #include "InPlaceSort.h"
33 #include "JsonAccessors.h"
36 #include "QueryRewrite.h"
37 #include "QueryTemplateGenerator.h"
38 #include "ResultSetReductionJIT.h"
39 #include "RuntimeFunctions.h"
40 #include "SpeculativeTopN.h"
41 
44 
45 #include "CudaMgr/CudaMgr.h"
47 #include "Parser/ParserNode.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/misc.h"
53 #include "Shared/scope.h"
54 #include "Shared/shard_key.h"
55 #include "Shared/threadpool.h"
56 
57 #include "AggregatedColRange.h"
59 
60 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
61 #include <boost/filesystem/operations.hpp>
62 #include <boost/filesystem/path.hpp>
63 
64 #ifdef HAVE_CUDA
65 #include <cuda.h>
66 #endif // HAVE_CUDA
67 #include <chrono>
68 #include <ctime>
69 #include <future>
70 #include <iostream>
71 #include <memory>
72 #include <numeric>
73 #include <set>
74 #include <thread>
75 
76 bool g_enable_watchdog{false};
78 bool g_use_tbb_pool{false};
81 bool g_allow_cpu_retry{true};
82 bool g_null_div_by_zero{false};
86 extern bool g_enable_smem_group_by;
87 extern std::unique_ptr<llvm::Module> udf_gpu_module;
88 extern std::unique_ptr<llvm::Module> udf_cpu_module;
96 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
99 size_t g_big_group_threshold{20000};
102 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
104  256}; // minimum memory allocation required for projection query output buffer
105  // without pre-flight count
115  4096}; // GPU shared memory threshold (in bytes), if larger
116  // buffer sizes are required we do not use GPU shared
117  // memory optimizations Setting this to 0 means unlimited
118  // (subject to other dynamically calculated caps)
120  true}; // enable use of shared memory when performing group-by with select non-count
121  // aggregates
123  true}; // enable optimizations for using GPU shared memory in implementation of
124  // non-grouped aggregates
125 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
126  // limits the allocation for the output buffer arena
127 
130 
131 extern bool g_cache_string_hash;
132 
133 int const Executor::max_gpu_count;
134 
136 
137 Executor::Executor(const ExecutorId executor_id,
138  const size_t block_size_x,
139  const size_t grid_size_x,
140  const size_t max_gpu_slab_size,
141  const std::string& debug_dir,
142  const std::string& debug_file)
143  : cgen_state_(new CgenState({}, false))
146  , block_size_x_(block_size_x)
147  , grid_size_x_(grid_size_x)
148  , max_gpu_slab_size_(max_gpu_slab_size)
149  , debug_dir_(debug_dir)
150  , debug_file_(debug_file)
151  , executor_id_(executor_id)
152  , catalog_(nullptr)
153  , temporary_tables_(nullptr)
154  , input_table_info_cache_(this) {}
155 
156 std::shared_ptr<Executor> Executor::getExecutor(
157  const ExecutorId executor_id,
158  const std::string& debug_dir,
159  const std::string& debug_file,
160  const SystemParameters system_parameters) {
162 
163  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
164  auto it = executors_.find(executor_id);
165  if (it != executors_.end()) {
166  return it->second;
167  }
168  auto executor = std::make_shared<Executor>(executor_id,
169  system_parameters.cuda_block_size,
170  system_parameters.cuda_grid_size,
171  system_parameters.max_gpu_slab_size,
172  debug_dir,
173  debug_file);
174  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
175  return executor;
176 }
177 
179  switch (memory_level) {
182  mapd_unique_lock<mapd_shared_mutex> flush_lock(
183  execute_mutex_); // Don't flush memory while queries are running
184 
186  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
187  // The hash table cache uses CPU memory not managed by the buffer manager. In the
188  // future, we should manage these allocations with the buffer manager directly.
189  // For now, assume the user wants to purge the hash table cache when they clear
190  // CPU memory (currently used in ExecuteTest to lower memory pressure)
192  }
193  break;
194  }
195  default: {
196  throw std::runtime_error(
197  "Clearing memory levels other than the CPU level or GPU level is not "
198  "supported.");
199  }
200  }
201 }
202 
204  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
205 }
206 
208  const int dict_id_in,
209  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
210  const bool with_generation) const {
211  CHECK(row_set_mem_owner);
212  std::lock_guard<std::mutex> lock(
213  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
214  return row_set_mem_owner->getOrAddStringDictProxy(
215  dict_id_in, with_generation, catalog_);
216 }
217 
219  const int dict_id_in,
220  const bool with_generation,
221  const Catalog_Namespace::Catalog* catalog) {
222  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
223  CHECK(catalog);
224  const auto dd = catalog->getMetadataForDict(dict_id);
225  if (dd) {
226  CHECK(dd->stringDict);
227  CHECK_LE(dd->dictNBits, 32);
228  const int64_t generation =
229  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
230  return addStringDict(dd->stringDict, dict_id, generation);
231  }
232  CHECK_EQ(0, dict_id);
233  if (!lit_str_dict_proxy_) {
234  std::shared_ptr<StringDictionary> tsd =
235  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
236  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
237  }
238  return lit_str_dict_proxy_.get();
239 }
240 
242  std::lock_guard<std::mutex> lock(state_mutex_);
243  auto& td = t_digests_.emplace_back(std::make_unique<quantile::TDigest>());
244  // Separating TDigests from their buffers allows for flexibile memory management.
245  // The incoming Buffer is used to collect and sort incoming data,
246  // and used as scratch space during its merging into the main Centroids buffer.
247  td->setBuffer(t_digest_buffers_.emplace_back(g_approx_quantile_buffer));
248  td->setCentroids(t_digest_buffers_.emplace_back(g_approx_quantile_centroids));
249  return td.get();
250 }
251 
252 bool Executor::isCPUOnly() const {
253  CHECK(catalog_);
254  return !catalog_->getDataMgr().getCudaMgr();
255 }
256 
258  const Analyzer::ColumnVar* col_var) const {
260  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
261 }
262 
264  const Analyzer::ColumnVar* col_var,
265  int n) const {
266  const auto cd = getColumnDescriptor(col_var);
267  if (!cd || n > cd->columnType.get_physical_cols()) {
268  return nullptr;
269  }
271  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
272 }
273 
275  return catalog_;
276 }
277 
279  catalog_ = catalog;
280 }
281 
282 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
283  return row_set_mem_owner_;
284 }
285 
287  return temporary_tables_;
288 }
289 
291  return input_table_info_cache_.getTableInfo(table_id);
292 }
293 
294 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
295  return table_generations_.getGeneration(table_id);
296 }
297 
299  return agg_col_range_cache_.getColRange(phys_input);
300 }
301 
302 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
303  size_t num_bytes = 0;
304  if (!plan_state_) {
305  return 0;
306  }
307  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
308  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
309  continue;
310  }
311 
312  if (fetched_col_pair.first < 0) {
313  num_bytes += 8;
314  } else {
315  const auto cd =
316  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
317  const auto& ti = cd->columnType;
318  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
319  ? 4
320  : ti.get_size();
321  if (sz < 0) {
322  // for varlen types, only account for the pointer/size for each row, for now
323  num_bytes += 16;
324  } else {
325  num_bytes += sz;
326  }
327  }
328  }
329  return num_bytes;
330 }
331 
332 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
333  const std::vector<Analyzer::Expr*>& target_exprs) const {
335  CHECK(catalog_);
336  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
337  for (const auto target_expr : target_exprs) {
338  if (!plan_state_->isLazyFetchColumn(target_expr)) {
339  col_lazy_fetch_info.emplace_back(
340  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
341  } else {
342  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
343  CHECK(col_var);
344  auto col_id = col_var->get_column_id();
345  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
346  auto cd = (col_var->get_table_id() > 0)
347  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
348  : nullptr;
349  if (cd && IS_GEO(cd->columnType.get_type())) {
350  // Geo coords cols will be processed in sequence. So we only need to track the
351  // first coords col in lazy fetch info.
352  {
353  auto cd0 =
354  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
355  auto col0_ti = cd0->columnType;
356  CHECK(!cd0->isVirtualCol);
357  auto col0_var = makeExpr<Analyzer::ColumnVar>(
358  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
359  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
360  col_lazy_fetch_info.emplace_back(
361  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
362  }
363  } else {
364  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
365  const auto& col_ti = col_var->get_type_info();
366  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
367  }
368  }
369  }
370  return col_lazy_fetch_info;
371 }
372 
377 }
378 
379 std::vector<int8_t> Executor::serializeLiterals(
380  const std::unordered_map<int, CgenState::LiteralValues>& literals,
381  const int device_id) {
382  if (literals.empty()) {
383  return {};
384  }
385  const auto dev_literals_it = literals.find(device_id);
386  CHECK(dev_literals_it != literals.end());
387  const auto& dev_literals = dev_literals_it->second;
388  size_t lit_buf_size{0};
389  std::vector<std::string> real_strings;
390  std::vector<std::vector<double>> double_array_literals;
391  std::vector<std::vector<int8_t>> align64_int8_array_literals;
392  std::vector<std::vector<int32_t>> int32_array_literals;
393  std::vector<std::vector<int8_t>> align32_int8_array_literals;
394  std::vector<std::vector<int8_t>> int8_array_literals;
395  for (const auto& lit : dev_literals) {
396  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
397  if (lit.which() == 7) {
398  const auto p = boost::get<std::string>(&lit);
399  CHECK(p);
400  real_strings.push_back(*p);
401  } else if (lit.which() == 8) {
402  const auto p = boost::get<std::vector<double>>(&lit);
403  CHECK(p);
404  double_array_literals.push_back(*p);
405  } else if (lit.which() == 9) {
406  const auto p = boost::get<std::vector<int32_t>>(&lit);
407  CHECK(p);
408  int32_array_literals.push_back(*p);
409  } else if (lit.which() == 10) {
410  const auto p = boost::get<std::vector<int8_t>>(&lit);
411  CHECK(p);
412  int8_array_literals.push_back(*p);
413  } else if (lit.which() == 11) {
414  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
415  CHECK(p);
416  if (p->second == 64) {
417  align64_int8_array_literals.push_back(p->first);
418  } else if (p->second == 32) {
419  align32_int8_array_literals.push_back(p->first);
420  } else {
421  CHECK(false);
422  }
423  }
424  }
425  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
426  throw TooManyLiterals();
427  }
428  int16_t crt_real_str_off = lit_buf_size;
429  for (const auto& real_str : real_strings) {
430  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
431  lit_buf_size += real_str.size();
432  }
433  if (double_array_literals.size() > 0) {
434  lit_buf_size = align(lit_buf_size, sizeof(double));
435  }
436  int16_t crt_double_arr_lit_off = lit_buf_size;
437  for (const auto& double_array_literal : double_array_literals) {
438  CHECK_LE(double_array_literal.size(),
439  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
440  lit_buf_size += double_array_literal.size() * sizeof(double);
441  }
442  if (align64_int8_array_literals.size() > 0) {
443  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
444  }
445  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
446  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
447  CHECK_LE(align64_int8_array_literals.size(),
448  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
449  lit_buf_size += align64_int8_array_literal.size();
450  }
451  if (int32_array_literals.size() > 0) {
452  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
453  }
454  int16_t crt_int32_arr_lit_off = lit_buf_size;
455  for (const auto& int32_array_literal : int32_array_literals) {
456  CHECK_LE(int32_array_literal.size(),
457  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
458  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
459  }
460  if (align32_int8_array_literals.size() > 0) {
461  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
462  }
463  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
464  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
465  CHECK_LE(align32_int8_array_literals.size(),
466  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
467  lit_buf_size += align32_int8_array_literal.size();
468  }
469  int16_t crt_int8_arr_lit_off = lit_buf_size;
470  for (const auto& int8_array_literal : int8_array_literals) {
471  CHECK_LE(int8_array_literal.size(),
472  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
473  lit_buf_size += int8_array_literal.size();
474  }
475  unsigned crt_real_str_idx = 0;
476  unsigned crt_double_arr_lit_idx = 0;
477  unsigned crt_align64_int8_arr_lit_idx = 0;
478  unsigned crt_int32_arr_lit_idx = 0;
479  unsigned crt_align32_int8_arr_lit_idx = 0;
480  unsigned crt_int8_arr_lit_idx = 0;
481  std::vector<int8_t> serialized(lit_buf_size);
482  size_t off{0};
483  for (const auto& lit : dev_literals) {
484  const auto lit_bytes = CgenState::literalBytes(lit);
485  off = CgenState::addAligned(off, lit_bytes);
486  switch (lit.which()) {
487  case 0: {
488  const auto p = boost::get<int8_t>(&lit);
489  CHECK(p);
490  serialized[off - lit_bytes] = *p;
491  break;
492  }
493  case 1: {
494  const auto p = boost::get<int16_t>(&lit);
495  CHECK(p);
496  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
497  break;
498  }
499  case 2: {
500  const auto p = boost::get<int32_t>(&lit);
501  CHECK(p);
502  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
503  break;
504  }
505  case 3: {
506  const auto p = boost::get<int64_t>(&lit);
507  CHECK(p);
508  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
509  break;
510  }
511  case 4: {
512  const auto p = boost::get<float>(&lit);
513  CHECK(p);
514  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
515  break;
516  }
517  case 5: {
518  const auto p = boost::get<double>(&lit);
519  CHECK(p);
520  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
521  break;
522  }
523  case 6: {
524  const auto p = boost::get<std::pair<std::string, int>>(&lit);
525  CHECK(p);
526  const auto str_id =
528  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
529  ->getOrAddTransient(p->first)
530  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
531  ->getIdOfString(p->first);
532  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
533  break;
534  }
535  case 7: {
536  const auto p = boost::get<std::string>(&lit);
537  CHECK(p);
538  int32_t off_and_len = crt_real_str_off << 16;
539  const auto& crt_real_str = real_strings[crt_real_str_idx];
540  off_and_len |= static_cast<int16_t>(crt_real_str.size());
541  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
542  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
543  ++crt_real_str_idx;
544  crt_real_str_off += crt_real_str.size();
545  break;
546  }
547  case 8: {
548  const auto p = boost::get<std::vector<double>>(&lit);
549  CHECK(p);
550  int32_t off_and_len = crt_double_arr_lit_off << 16;
551  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
552  int32_t len = crt_double_arr_lit.size();
553  CHECK_EQ((len >> 16), 0);
554  off_and_len |= static_cast<int16_t>(len);
555  int32_t double_array_bytesize = len * sizeof(double);
556  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
557  memcpy(&serialized[crt_double_arr_lit_off],
558  crt_double_arr_lit.data(),
559  double_array_bytesize);
560  ++crt_double_arr_lit_idx;
561  crt_double_arr_lit_off += double_array_bytesize;
562  break;
563  }
564  case 9: {
565  const auto p = boost::get<std::vector<int32_t>>(&lit);
566  CHECK(p);
567  int32_t off_and_len = crt_int32_arr_lit_off << 16;
568  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
569  int32_t len = crt_int32_arr_lit.size();
570  CHECK_EQ((len >> 16), 0);
571  off_and_len |= static_cast<int16_t>(len);
572  int32_t int32_array_bytesize = len * sizeof(int32_t);
573  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
574  memcpy(&serialized[crt_int32_arr_lit_off],
575  crt_int32_arr_lit.data(),
576  int32_array_bytesize);
577  ++crt_int32_arr_lit_idx;
578  crt_int32_arr_lit_off += int32_array_bytesize;
579  break;
580  }
581  case 10: {
582  const auto p = boost::get<std::vector<int8_t>>(&lit);
583  CHECK(p);
584  int32_t off_and_len = crt_int8_arr_lit_off << 16;
585  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
586  int32_t len = crt_int8_arr_lit.size();
587  CHECK_EQ((len >> 16), 0);
588  off_and_len |= static_cast<int16_t>(len);
589  int32_t int8_array_bytesize = len;
590  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
591  memcpy(&serialized[crt_int8_arr_lit_off],
592  crt_int8_arr_lit.data(),
593  int8_array_bytesize);
594  ++crt_int8_arr_lit_idx;
595  crt_int8_arr_lit_off += int8_array_bytesize;
596  break;
597  }
598  case 11: {
599  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
600  CHECK(p);
601  if (p->second == 64) {
602  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
603  const auto& crt_align64_int8_arr_lit =
604  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
605  int32_t len = crt_align64_int8_arr_lit.size();
606  CHECK_EQ((len >> 16), 0);
607  off_and_len |= static_cast<int16_t>(len);
608  int32_t align64_int8_array_bytesize = len;
609  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
610  memcpy(&serialized[crt_align64_int8_arr_lit_off],
611  crt_align64_int8_arr_lit.data(),
612  align64_int8_array_bytesize);
613  ++crt_align64_int8_arr_lit_idx;
614  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
615  } else if (p->second == 32) {
616  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
617  const auto& crt_align32_int8_arr_lit =
618  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
619  int32_t len = crt_align32_int8_arr_lit.size();
620  CHECK_EQ((len >> 16), 0);
621  off_and_len |= static_cast<int16_t>(len);
622  int32_t align32_int8_array_bytesize = len;
623  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
624  memcpy(&serialized[crt_align32_int8_arr_lit_off],
625  crt_align32_int8_arr_lit.data(),
626  align32_int8_array_bytesize);
627  ++crt_align32_int8_arr_lit_idx;
628  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
629  } else {
630  CHECK(false);
631  }
632  break;
633  }
634  default:
635  CHECK(false);
636  }
637  }
638  return serialized;
639 }
640 
641 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
642  if (device_type == ExecutorDeviceType::GPU) {
643  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
644  CHECK(cuda_mgr);
645  return cuda_mgr->getDeviceCount();
646  } else {
647  return 1;
648  }
649 }
650 
652  const Data_Namespace::MemoryLevel memory_level) const {
653  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
655 }
656 
657 // TODO(alex): remove or split
658 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
659  const SQLTypeInfo& ti,
660  const int64_t agg_init_val,
661  const int8_t out_byte_width,
662  const int64_t* out_vec,
663  const size_t out_vec_sz,
664  const bool is_group_by,
665  const bool float_argument_input) {
666  switch (agg) {
667  case kAVG:
668  case kSUM:
669  if (0 != agg_init_val) {
670  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
671  int64_t agg_result = agg_init_val;
672  for (size_t i = 0; i < out_vec_sz; ++i) {
673  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
674  }
675  return {agg_result, 0};
676  } else {
677  CHECK(ti.is_fp());
678  switch (out_byte_width) {
679  case 4: {
680  int agg_result = static_cast<int32_t>(agg_init_val);
681  for (size_t i = 0; i < out_vec_sz; ++i) {
683  &agg_result,
684  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
685  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
686  }
687  const int64_t converted_bin =
688  float_argument_input
689  ? static_cast<int64_t>(agg_result)
690  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
691  return {converted_bin, 0};
692  break;
693  }
694  case 8: {
695  int64_t agg_result = agg_init_val;
696  for (size_t i = 0; i < out_vec_sz; ++i) {
698  &agg_result,
699  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
700  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
701  }
702  return {agg_result, 0};
703  break;
704  }
705  default:
706  CHECK(false);
707  }
708  }
709  }
710  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
711  int64_t agg_result = 0;
712  for (size_t i = 0; i < out_vec_sz; ++i) {
713  agg_result += out_vec[i];
714  }
715  return {agg_result, 0};
716  } else {
717  CHECK(ti.is_fp());
718  switch (out_byte_width) {
719  case 4: {
720  float r = 0.;
721  for (size_t i = 0; i < out_vec_sz; ++i) {
722  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
723  }
724  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
725  const int64_t converted_bin =
726  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
727  return {converted_bin, 0};
728  }
729  case 8: {
730  double r = 0.;
731  for (size_t i = 0; i < out_vec_sz; ++i) {
732  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
733  }
734  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
735  }
736  default:
737  CHECK(false);
738  }
739  }
740  break;
741  case kCOUNT: {
742  uint64_t agg_result = 0;
743  for (size_t i = 0; i < out_vec_sz; ++i) {
744  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
745  agg_result += out;
746  }
747  return {static_cast<int64_t>(agg_result), 0};
748  }
749  case kMIN: {
750  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
751  int64_t agg_result = agg_init_val;
752  for (size_t i = 0; i < out_vec_sz; ++i) {
753  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
754  }
755  return {agg_result, 0};
756  } else {
757  switch (out_byte_width) {
758  case 4: {
759  int32_t agg_result = static_cast<int32_t>(agg_init_val);
760  for (size_t i = 0; i < out_vec_sz; ++i) {
762  &agg_result,
763  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
764  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
765  }
766  const int64_t converted_bin =
767  float_argument_input
768  ? static_cast<int64_t>(agg_result)
769  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
770  return {converted_bin, 0};
771  }
772  case 8: {
773  int64_t agg_result = agg_init_val;
774  for (size_t i = 0; i < out_vec_sz; ++i) {
776  &agg_result,
777  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
778  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
779  }
780  return {agg_result, 0};
781  }
782  default:
783  CHECK(false);
784  }
785  }
786  }
787  case kMAX:
788  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
789  int64_t agg_result = agg_init_val;
790  for (size_t i = 0; i < out_vec_sz; ++i) {
791  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
792  }
793  return {agg_result, 0};
794  } else {
795  switch (out_byte_width) {
796  case 4: {
797  int32_t agg_result = static_cast<int32_t>(agg_init_val);
798  for (size_t i = 0; i < out_vec_sz; ++i) {
800  &agg_result,
801  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
802  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
803  }
804  const int64_t converted_bin =
805  float_argument_input ? static_cast<int64_t>(agg_result)
806  : float_to_double_bin(agg_result, !ti.get_notnull());
807  return {converted_bin, 0};
808  }
809  case 8: {
810  int64_t agg_result = agg_init_val;
811  for (size_t i = 0; i < out_vec_sz; ++i) {
813  &agg_result,
814  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
815  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
816  }
817  return {agg_result, 0};
818  }
819  default:
820  CHECK(false);
821  }
822  }
823  case kSINGLE_VALUE: {
824  int64_t agg_result = agg_init_val;
825  for (size_t i = 0; i < out_vec_sz; ++i) {
826  if (out_vec[i] != agg_init_val) {
827  if (agg_result == agg_init_val) {
828  agg_result = out_vec[i];
829  } else if (out_vec[i] != agg_result) {
831  }
832  }
833  }
834  return {agg_result, 0};
835  }
836  case kSAMPLE: {
837  int64_t agg_result = agg_init_val;
838  for (size_t i = 0; i < out_vec_sz; ++i) {
839  if (out_vec[i] != agg_init_val) {
840  agg_result = out_vec[i];
841  break;
842  }
843  }
844  return {agg_result, 0};
845  }
846  default:
847  CHECK(false);
848  }
849  abort();
850 }
851 
852 namespace {
853 
855  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
856  auto& first = results_per_device.front().first;
857  CHECK(first);
858  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
859  const auto& next = results_per_device[dev_idx].first;
860  CHECK(next);
861  first->append(*next);
862  }
863  return std::move(first);
864 }
865 
866 } // namespace
867 
869  const RelAlgExecutionUnit& ra_exe_unit) {
870  auto& results_per_device = shared_context.getFragmentResults();
871  if (results_per_device.empty()) {
872  std::vector<TargetInfo> targets;
873  for (const auto target_expr : ra_exe_unit.target_exprs) {
874  targets.push_back(get_target_info(target_expr, g_bigint_count));
875  }
876  return std::make_shared<ResultSet>(targets,
880  catalog_,
881  blockSize(),
882  gridSize());
883  }
884  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
885  std::sort(results_per_device.begin(),
886  results_per_device.end(),
887  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
888  CHECK_GE(lhs.second.size(), size_t(1));
889  CHECK_GE(rhs.second.size(), size_t(1));
890  return lhs.second.front() < rhs.second.front();
891  });
892 
893  return get_merged_result(results_per_device);
894 }
895 
897  const RelAlgExecutionUnit& ra_exe_unit,
898  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
899  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
900  const QueryMemoryDescriptor& query_mem_desc) const {
901  auto timer = DEBUG_TIMER(__func__);
902  if (ra_exe_unit.estimator) {
903  return reduce_estimator_results(ra_exe_unit, results_per_device);
904  }
905 
906  if (results_per_device.empty()) {
907  std::vector<TargetInfo> targets;
908  for (const auto target_expr : ra_exe_unit.target_exprs) {
909  targets.push_back(get_target_info(target_expr, g_bigint_count));
910  }
911  return std::make_shared<ResultSet>(targets,
914  nullptr,
915  catalog_,
916  blockSize(),
917  gridSize());
918  }
919 
921  results_per_device,
922  row_set_mem_owner,
923  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
924 }
925 
926 namespace {
927 
929  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
930  int64_t* compilation_queue_time) {
931  auto clock_begin = timer_start();
932  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
933  *compilation_queue_time = timer_stop(clock_begin);
934  const auto& this_result_set = results_per_device[0].first;
935  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
936  this_result_set->getTargetInfos(),
937  this_result_set->getTargetInitVals());
938  return reduction_jit.codegen();
939 };
940 
941 } // namespace
942 
944  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
945  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
946  const QueryMemoryDescriptor& query_mem_desc) const {
947  auto timer = DEBUG_TIMER(__func__);
948  std::shared_ptr<ResultSet> reduced_results;
949 
950  const auto& first = results_per_device.front().first;
951 
952  if (query_mem_desc.getQueryDescriptionType() ==
954  results_per_device.size() > 1) {
955  const auto total_entry_count = std::accumulate(
956  results_per_device.begin(),
957  results_per_device.end(),
958  size_t(0),
959  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
960  const auto& r = rs.first;
961  return init + r->getQueryMemDesc().getEntryCount();
962  });
963  CHECK(total_entry_count);
964  auto query_mem_desc = first->getQueryMemDesc();
965  query_mem_desc.setEntryCount(total_entry_count);
966  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
968  query_mem_desc,
969  row_set_mem_owner,
970  catalog_,
971  blockSize(),
972  gridSize());
973  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
974  reduced_results->initializeStorage();
975  switch (query_mem_desc.getEffectiveKeyWidth()) {
976  case 4:
977  first->getStorage()->moveEntriesToBuffer<int32_t>(
978  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
979  break;
980  case 8:
981  first->getStorage()->moveEntriesToBuffer<int64_t>(
982  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
983  break;
984  default:
985  CHECK(false);
986  }
987  } else {
988  reduced_results = first;
989  }
990 
991  int64_t compilation_queue_time = 0;
992  const auto reduction_code =
993  get_reduction_code(results_per_device, &compilation_queue_time);
994 
995  for (size_t i = 1; i < results_per_device.size(); ++i) {
996  reduced_results->getStorage()->reduce(
997  *(results_per_device[i].first->getStorage()), {}, reduction_code);
998  }
999  reduced_results->addCompilationQueueTime(compilation_queue_time);
1000  return reduced_results;
1001 }
1002 
1004  const RelAlgExecutionUnit& ra_exe_unit,
1005  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1006  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1007  const QueryMemoryDescriptor& query_mem_desc) const {
1008  if (results_per_device.size() == 1) {
1009  return std::move(results_per_device.front().first);
1010  }
1011  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1013  for (const auto& result : results_per_device) {
1014  auto rows = result.first;
1015  CHECK(rows);
1016  if (!rows) {
1017  continue;
1018  }
1019  SpeculativeTopNMap that(
1020  *rows,
1021  ra_exe_unit.target_exprs,
1022  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1023  m.reduce(that);
1024  }
1025  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1026  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1027  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1028 }
1029 
1030 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
1031  std::unordered_set<int> available_gpus;
1032  if (cat.getDataMgr().gpusPresent()) {
1033  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
1034  CHECK_GT(gpu_count, 0);
1035  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1036  available_gpus.insert(gpu_id);
1037  }
1038  }
1039  return available_gpus;
1040 }
1041 
1042 size_t get_context_count(const ExecutorDeviceType device_type,
1043  const size_t cpu_count,
1044  const size_t gpu_count) {
1045  return device_type == ExecutorDeviceType::GPU ? gpu_count
1046  : static_cast<size_t>(cpu_count);
1047 }
1048 
1049 namespace {
1050 
1051 // Compute a very conservative entry count for the output buffer entry count using no
1052 // other information than the number of tuples in each table and multiplying them
1053 // together.
1054 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1056  // Check for overflows since we're multiplying potentially big table sizes.
1057  using checked_size_t = boost::multiprecision::number<
1058  boost::multiprecision::cpp_int_backend<64,
1059  64,
1060  boost::multiprecision::unsigned_magnitude,
1061  boost::multiprecision::checked,
1062  void>>;
1063  checked_size_t max_groups_buffer_entry_guess = 1;
1064  for (const auto& query_info : query_infos) {
1065  CHECK(!query_info.info.fragments.empty());
1066  auto it = std::max_element(query_info.info.fragments.begin(),
1067  query_info.info.fragments.end(),
1068  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1069  return f1.getNumTuples() < f2.getNumTuples();
1070  });
1071  max_groups_buffer_entry_guess *= it->getNumTuples();
1072  }
1073  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1074  // baseline group layout with that many entries anyway.
1075  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1076  try {
1077  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1078  max_groups_buffer_entry_guess_cap);
1079  } catch (...) {
1080  return max_groups_buffer_entry_guess_cap;
1081  }
1082 }
1083 
1084 std::string get_table_name(const InputDescriptor& input_desc,
1086  const auto source_type = input_desc.getSourceType();
1087  if (source_type == InputSourceType::TABLE) {
1088  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1089  CHECK(td);
1090  return td->tableName;
1091  } else {
1092  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1093  }
1094 }
1095 
1096 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1097  const int device_count) {
1098  if (device_type == ExecutorDeviceType::GPU) {
1099  return device_count * Executor::high_scan_limit;
1100  }
1102 }
1103 
1105  const std::vector<InputTableInfo>& table_infos,
1107  const ExecutorDeviceType device_type,
1108  const int device_count) {
1109  for (const auto target_expr : ra_exe_unit.target_exprs) {
1110  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1111  return;
1112  }
1113  }
1114  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1115  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1116  // Allow a query with no scan limit to run on small tables
1117  return;
1118  }
1119  if (ra_exe_unit.use_bump_allocator) {
1120  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1121  // relative to the size of the input), so we bypass this check for now
1122  return;
1123  }
1124  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1125  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1126  (!ra_exe_unit.scan_limit ||
1127  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1128  std::vector<std::string> table_names;
1129  const auto& input_descs = ra_exe_unit.input_descs;
1130  for (const auto& input_desc : input_descs) {
1131  table_names.push_back(get_table_name(input_desc, cat));
1132  }
1133  if (!ra_exe_unit.scan_limit) {
1134  throw WatchdogException(
1135  "Projection query would require a scan without a limit on table(s): " +
1136  boost::algorithm::join(table_names, ", "));
1137  } else {
1138  throw WatchdogException(
1139  "Projection query output result set on table(s): " +
1140  boost::algorithm::join(table_names, ", ") + " would contain " +
1141  std::to_string(ra_exe_unit.scan_limit) +
1142  " rows, which is more than the current system limit of " +
1143  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1144  }
1145  }
1146 }
1147 
1148 } // namespace
1149 
1150 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1151  const RelAlgExecutionUnit& ra_exe_unit) {
1152  if (ra_exe_unit.input_descs.size() < 2) {
1153  return false;
1154  }
1155 
1156  // We only support loop join at the end of folded joins
1157  // where ra_exe_unit.input_descs.size() > 2 for now.
1158  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1159 
1160  std::optional<size_t> inner_table_idx;
1161  for (size_t i = 0; i < query_infos.size(); ++i) {
1162  if (query_infos[i].table_id == inner_table_id) {
1163  inner_table_idx = i;
1164  break;
1165  }
1166  }
1167  CHECK(inner_table_idx);
1168  return query_infos[*inner_table_idx].info.getNumTuples() <=
1170 }
1171 
1172 namespace {
1173 
1174 template <typename T>
1175 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1176  std::vector<std::string> expr_strs;
1177  for (const auto& expr : expr_container) {
1178  if (!expr) {
1179  expr_strs.emplace_back("NULL");
1180  } else {
1181  expr_strs.emplace_back(expr->toString());
1182  }
1183  }
1184  return expr_strs;
1185 }
1186 
1187 template <>
1188 std::vector<std::string> expr_container_to_string(
1189  const std::list<Analyzer::OrderEntry>& expr_container) {
1190  std::vector<std::string> expr_strs;
1191  for (const auto& expr : expr_container) {
1192  expr_strs.emplace_back(expr.toString());
1193  }
1194  return expr_strs;
1195 }
1196 
1197 std::string join_type_to_string(const JoinType type) {
1198  switch (type) {
1199  case JoinType::INNER:
1200  return "INNER";
1201  case JoinType::LEFT:
1202  return "LEFT";
1203  case JoinType::INVALID:
1204  return "INVALID";
1205  }
1206  UNREACHABLE();
1207  return "";
1208 }
1209 
1210 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1211  switch (algorithm) {
1213  return "ResultSet";
1215  return "Speculative Top N";
1217  return "Streaming Top N";
1218  }
1219  UNREACHABLE();
1220  return "";
1221 }
1222 
1223 } // namespace
1224 
1225 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1226  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1227  // instead of ra_exec_unit description if possible
1228  std::ostringstream os;
1229  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1230  const auto& scan_desc = input_col_desc->getScanDesc();
1231  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1232  << scan_desc.getNestLevel();
1233  }
1234  if (!ra_exe_unit.simple_quals.empty()) {
1235  for (const auto& qual : ra_exe_unit.simple_quals) {
1236  if (qual) {
1237  os << qual->toString() << ",";
1238  }
1239  }
1240  }
1241  if (!ra_exe_unit.quals.empty()) {
1242  for (const auto& qual : ra_exe_unit.quals) {
1243  if (qual) {
1244  os << qual->toString() << ",";
1245  }
1246  }
1247  }
1248  if (!ra_exe_unit.join_quals.empty()) {
1249  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1250  const auto& join_condition = ra_exe_unit.join_quals[i];
1251  os << std::to_string(i) << join_type_to_string(join_condition.type);
1252  for (const auto& qual : join_condition.quals) {
1253  if (qual) {
1254  os << qual->toString() << ",";
1255  }
1256  }
1257  }
1258  }
1259  if (!ra_exe_unit.groupby_exprs.empty()) {
1260  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1261  if (qual) {
1262  os << qual->toString() << ",";
1263  }
1264  }
1265  }
1266  for (const auto& expr : ra_exe_unit.target_exprs) {
1267  if (expr) {
1268  os << expr->toString() << ",";
1269  }
1270  }
1271  os << ::toString(ra_exe_unit.estimator == nullptr);
1272  os << std::to_string(ra_exe_unit.scan_limit);
1273  return os.str();
1274 }
1275 
1276 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1277  os << "\n\tTable/Col/Levels: ";
1278  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1279  const auto& scan_desc = input_col_desc->getScanDesc();
1280  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1281  << scan_desc.getNestLevel() << ") ";
1282  }
1283  if (!ra_exe_unit.simple_quals.empty()) {
1284  os << "\n\tSimple Quals: "
1286  ", ");
1287  }
1288  if (!ra_exe_unit.quals.empty()) {
1289  os << "\n\tQuals: "
1290  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1291  }
1292  if (!ra_exe_unit.join_quals.empty()) {
1293  os << "\n\tJoin Quals: ";
1294  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1295  const auto& join_condition = ra_exe_unit.join_quals[i];
1296  os << "\t\t" << std::to_string(i) << " "
1297  << join_type_to_string(join_condition.type);
1298  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1299  }
1300  }
1301  if (!ra_exe_unit.groupby_exprs.empty()) {
1302  os << "\n\tGroup By: "
1304  ", ");
1305  }
1306  os << "\n\tProjected targets: "
1308  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1309  os << "\n\tSort Info: ";
1310  const auto& sort_info = ra_exe_unit.sort_info;
1311  os << "\n\t Order Entries: "
1312  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1313  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1314  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1315  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1316  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1317  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1318  if (ra_exe_unit.union_all) {
1319  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1320  }
1321  return os;
1322 }
1323 
1324 namespace {
1325 
1327  const size_t new_scan_limit) {
1328  return {ra_exe_unit_in.input_descs,
1329  ra_exe_unit_in.input_col_descs,
1330  ra_exe_unit_in.simple_quals,
1331  ra_exe_unit_in.quals,
1332  ra_exe_unit_in.join_quals,
1333  ra_exe_unit_in.groupby_exprs,
1334  ra_exe_unit_in.target_exprs,
1335  ra_exe_unit_in.estimator,
1336  ra_exe_unit_in.sort_info,
1337  new_scan_limit,
1338  ra_exe_unit_in.query_hint,
1339  ra_exe_unit_in.use_bump_allocator,
1340  ra_exe_unit_in.union_all,
1341  ra_exe_unit_in.query_state};
1342 }
1343 
1344 } // namespace
1345 
1346 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1347  const bool is_agg,
1348  const std::vector<InputTableInfo>& query_infos,
1349  const RelAlgExecutionUnit& ra_exe_unit_in,
1350  const CompilationOptions& co,
1351  const ExecutionOptions& eo,
1353  RenderInfo* render_info,
1354  const bool has_cardinality_estimation,
1355  ColumnCacheMap& column_cache) {
1356  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1357 
1358  ScopeGuard cleanup_post_execution = [this] {
1359  // cleanup/unpin GPU buffer allocations
1360  // TODO: separate out this state into a single object
1361  plan_state_.reset(nullptr);
1362  if (cgen_state_) {
1363  cgen_state_->in_values_bitmaps_.clear();
1364  }
1365  };
1366 
1367  try {
1368  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1369  is_agg,
1370  true,
1371  query_infos,
1372  ra_exe_unit_in,
1373  co,
1374  eo,
1375  cat,
1377  render_info,
1378  has_cardinality_estimation,
1379  column_cache);
1380  if (result) {
1381  result->setKernelQueueTime(kernel_queue_time_ms_);
1382  result->addCompilationQueueTime(compilation_queue_time_ms_);
1383  if (eo.just_validate) {
1384  result->setValidationOnlyRes();
1385  }
1386  }
1387  return result;
1388  } catch (const CompilationRetryNewScanLimit& e) {
1389  auto result =
1390  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1391  is_agg,
1392  false,
1393  query_infos,
1394  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1395  co,
1396  eo,
1397  cat,
1399  render_info,
1400  has_cardinality_estimation,
1401  column_cache);
1402  if (result) {
1403  result->setKernelQueueTime(kernel_queue_time_ms_);
1404  result->addCompilationQueueTime(compilation_queue_time_ms_);
1405  if (eo.just_validate) {
1406  result->setValidationOnlyRes();
1407  }
1408  }
1409  return result;
1410  }
1411 }
1412 
1414  size_t& max_groups_buffer_entry_guess,
1415  const bool is_agg,
1416  const bool allow_single_frag_table_opt,
1417  const std::vector<InputTableInfo>& query_infos,
1418  const RelAlgExecutionUnit& ra_exe_unit_in,
1419  const CompilationOptions& co,
1420  const ExecutionOptions& eo,
1422  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1423  RenderInfo* render_info,
1424  const bool has_cardinality_estimation,
1425  ColumnCacheMap& column_cache) {
1426  INJECT_TIMER(Exec_executeWorkUnit);
1427  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1428  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1429  CHECK(!query_infos.empty());
1430  if (!max_groups_buffer_entry_guess) {
1431  // The query has failed the first execution attempt because of running out
1432  // of group by slots. Make the conservative choice: allocate fragment size
1433  // slots and run on the CPU.
1434  CHECK(device_type == ExecutorDeviceType::CPU);
1435  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1436  }
1437 
1438  int8_t crt_min_byte_width{get_min_byte_width()};
1439  do {
1440  SharedKernelContext shared_context(query_infos);
1441  ColumnFetcher column_fetcher(this, column_cache);
1442  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1443  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1444  if (eo.executor_type == ExecutorType::Native) {
1445  try {
1446  INJECT_TIMER(query_step_compilation);
1447  auto clock_begin = timer_start();
1448  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1449  compilation_queue_time_ms_ += timer_stop(clock_begin);
1450 
1451  query_mem_desc_owned =
1452  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1453  crt_min_byte_width,
1454  has_cardinality_estimation,
1455  ra_exe_unit,
1456  query_infos,
1457  deleted_cols_map,
1458  column_fetcher,
1459  {device_type,
1460  co.hoist_literals,
1461  co.opt_level,
1463  co.allow_lazy_fetch,
1465  co.explain_type,
1467  eo,
1468  render_info,
1469  this);
1470  CHECK(query_mem_desc_owned);
1471  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1472  } catch (CompilationRetryNoCompaction&) {
1473  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1474  continue;
1475  }
1476  } else {
1477  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1478  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1479  CHECK(!query_mem_desc_owned);
1480  query_mem_desc_owned.reset(
1482  }
1483  if (eo.just_explain) {
1484  return executeExplain(*query_comp_desc_owned);
1485  }
1486 
1487  for (const auto target_expr : ra_exe_unit.target_exprs) {
1488  plan_state_->target_exprs_.push_back(target_expr);
1489  }
1490 
1491  if (!eo.just_validate) {
1492  int available_cpus = cpu_threads();
1493  auto available_gpus = get_available_gpus(cat);
1494 
1495  const auto context_count =
1496  get_context_count(device_type, available_cpus, available_gpus.size());
1497  try {
1498  auto kernels = createKernels(shared_context,
1499  ra_exe_unit,
1500  column_fetcher,
1501  query_infos,
1502  eo,
1503  is_agg,
1504  allow_single_frag_table_opt,
1505  context_count,
1506  *query_comp_desc_owned,
1507  *query_mem_desc_owned,
1508  render_info,
1509  available_gpus,
1510  available_cpus);
1511  if (g_use_tbb_pool) {
1512 #ifdef HAVE_TBB
1513  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1514  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1515  std::move(kernels));
1516 #else
1517  throw std::runtime_error(
1518  "This build is not TBB enabled. Restart the server with "
1519  "\"enable-modern-thread-pool\" disabled.");
1520 #endif
1521  } else {
1522  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1523  std::move(kernels));
1524  }
1525  } catch (QueryExecutionError& e) {
1526  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1527  e.getErrorCode() == ERR_OUT_OF_TIME) {
1528  resetInterrupt();
1530  }
1531  if (eo.allow_runtime_query_interrupt && interrupted_.load()) {
1532  resetInterrupt();
1534  }
1536  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1537  crt_min_byte_width <<= 1;
1538  continue;
1539  }
1540  throw;
1541  }
1542  }
1543  if (is_agg) {
1544  try {
1545  return collectAllDeviceResults(shared_context,
1546  ra_exe_unit,
1547  *query_mem_desc_owned,
1548  query_comp_desc_owned->getDeviceType(),
1549  row_set_mem_owner);
1550  } catch (ReductionRanOutOfSlots&) {
1552  } catch (OverflowOrUnderflow&) {
1553  crt_min_byte_width <<= 1;
1554  continue;
1555  }
1556  }
1557  return resultsUnion(shared_context, ra_exe_unit);
1558 
1559  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1560 
1561  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1564  nullptr,
1565  catalog_,
1566  blockSize(),
1567  gridSize());
1568 }
1569 
1571  const InputTableInfo& table_info,
1572  const CompilationOptions& co,
1573  const ExecutionOptions& eo,
1575  PerFragmentCallBack& cb) {
1576  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1577  ColumnCacheMap column_cache;
1578 
1579  std::vector<InputTableInfo> table_infos{table_info};
1580  SharedKernelContext kernel_context(table_infos);
1581 
1582  ColumnFetcher column_fetcher(this, column_cache);
1583  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1584  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1585  {
1586  auto clock_begin = timer_start();
1587  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1588  compilation_queue_time_ms_ += timer_stop(clock_begin);
1589  query_mem_desc_owned =
1590  query_comp_desc_owned->compile(0,
1591  8,
1592  /*has_cardinality_estimation=*/false,
1593  ra_exe_unit,
1594  table_infos,
1595  deleted_cols_map,
1596  column_fetcher,
1597  co,
1598  eo,
1599  nullptr,
1600  this);
1601  }
1602  CHECK(query_mem_desc_owned);
1603  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1604  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1605  const auto& outer_fragments = table_info.info.fragments;
1606 
1607  {
1608  auto clock_begin = timer_start();
1609  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1610  kernel_queue_time_ms_ += timer_stop(clock_begin);
1611 
1612  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1613  ++fragment_index) {
1614  // We may want to consider in the future allowing this to execute on devices other
1615  // than CPU
1616  FragmentsList fragments_list{{table_id, {fragment_index}}};
1617  ExecutionKernel kernel(ra_exe_unit,
1618  co.device_type,
1619  /*device_id=*/0,
1620  eo,
1621  column_fetcher,
1622  *query_comp_desc_owned,
1623  *query_mem_desc_owned,
1624  fragments_list,
1626  /*render_info=*/nullptr,
1627  /*rowid_lookup_key=*/-1);
1628  kernel.run(this, kernel_context);
1629  }
1630  }
1631 
1632  const auto& all_fragment_results = kernel_context.getFragmentResults();
1633 
1634  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1635  ++fragment_index) {
1636  const auto fragment_results = all_fragment_results[fragment_index];
1637  cb(fragment_results.first, outer_fragments[fragment_index]);
1638  }
1639 }
1640 
1642  const TableFunctionExecutionUnit exe_unit,
1643  const std::vector<InputTableInfo>& table_infos,
1644  const CompilationOptions& co,
1645  const ExecutionOptions& eo,
1647  INJECT_TIMER(Exec_executeTableFunction);
1648  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1649 
1650  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1651  // framework, we may want to move this up a level
1652 
1653  ColumnFetcher column_fetcher(this, column_cache);
1654  TableFunctionCompilationContext compilation_context;
1655  compilation_context.compile(exe_unit, co, this);
1656 
1658  return exe_context.execute(
1659  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1660 }
1661 
1663  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1664 }
1665 
1667  const RelAlgExecutionUnit& ra_exe_unit,
1668  const ExecutorDeviceType requested_device_type) {
1669  for (const auto target_expr : ra_exe_unit.target_exprs) {
1670  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1671  if (!ra_exe_unit.groupby_exprs.empty() &&
1672  !isArchPascalOrLater(requested_device_type)) {
1673  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1674  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1675  return ExecutorDeviceType::CPU;
1676  }
1677  }
1678  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1679  return ExecutorDeviceType::CPU;
1680  }
1681  }
1682  return requested_device_type;
1683 }
1684 
1685 namespace {
1686 
1687 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1688  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1689  if (ti.is_fp()) {
1690  if (float_argument_input && ti.get_type() == kFLOAT) {
1691  int64_t float_null_val = 0;
1692  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1693  static_cast<float>(inline_fp_null_val(ti));
1694  return float_null_val;
1695  }
1696  const auto double_null_val = inline_fp_null_val(ti);
1697  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1698  }
1699  return inline_int_null_val(ti);
1700 }
1701 
1702 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1703  std::vector<int64_t>& entry,
1704  const std::vector<Analyzer::Expr*>& target_exprs,
1705  const QueryMemoryDescriptor& query_mem_desc) {
1706  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1707  const auto target_expr = target_exprs[target_idx];
1708  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1709  CHECK(agg_info.is_agg);
1710  target_infos.push_back(agg_info);
1711  if (g_cluster) {
1712  const auto executor = query_mem_desc.getExecutor();
1713  CHECK(executor);
1714  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1715  CHECK(row_set_mem_owner);
1716  const auto& count_distinct_desc =
1717  query_mem_desc.getCountDistinctDescriptor(target_idx);
1718  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1719  CHECK(row_set_mem_owner);
1720  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1721  count_distinct_desc.bitmapPaddedSizeBytes());
1722  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1723  continue;
1724  }
1725  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1726  auto count_distinct_set = new std::set<int64_t>();
1727  CHECK(row_set_mem_owner);
1728  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1729  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1730  continue;
1731  }
1732  }
1733  const bool float_argument_input = takes_float_argument(agg_info);
1734  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1735  entry.push_back(0);
1736  } else if (agg_info.agg_kind == kAVG) {
1737  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1738  entry.push_back(0);
1739  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1740  if (agg_info.sql_type.is_geometry()) {
1741  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1742  entry.push_back(0);
1743  }
1744  } else if (agg_info.sql_type.is_varlen()) {
1745  entry.push_back(0);
1746  entry.push_back(0);
1747  } else {
1748  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1749  }
1750  } else {
1751  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1752  }
1753  }
1754 }
1755 
1757  const std::vector<Analyzer::Expr*>& target_exprs_in,
1758  const QueryMemoryDescriptor& query_mem_desc,
1759  const ExecutorDeviceType device_type) {
1760  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1761  std::vector<Analyzer::Expr*> target_exprs;
1762  for (const auto target_expr : target_exprs_in) {
1763  const auto target_expr_copy =
1764  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1765  CHECK(target_expr_copy);
1766  auto ti = target_expr->get_type_info();
1767  ti.set_notnull(false);
1768  target_expr_copy->set_type_info(ti);
1769  if (target_expr_copy->get_arg()) {
1770  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1771  arg_ti.set_notnull(false);
1772  target_expr_copy->get_arg()->set_type_info(arg_ti);
1773  }
1774  target_exprs_owned_copies.push_back(target_expr_copy);
1775  target_exprs.push_back(target_expr_copy.get());
1776  }
1777  std::vector<TargetInfo> target_infos;
1778  std::vector<int64_t> entry;
1779  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1780  const auto executor = query_mem_desc.getExecutor();
1781  CHECK(executor);
1782  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1783  CHECK(row_set_mem_owner);
1784  auto rs = std::make_shared<ResultSet>(target_infos,
1785  device_type,
1786  query_mem_desc,
1787  row_set_mem_owner,
1788  executor->getCatalog(),
1789  executor->blockSize(),
1790  executor->gridSize());
1791  rs->allocateStorage();
1792  rs->fillOneEntry(entry);
1793  return rs;
1794 }
1795 
1796 } // namespace
1797 
1799  SharedKernelContext& shared_context,
1800  const RelAlgExecutionUnit& ra_exe_unit,
1801  const QueryMemoryDescriptor& query_mem_desc,
1802  const ExecutorDeviceType device_type,
1803  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1804  auto timer = DEBUG_TIMER(__func__);
1805  auto& result_per_device = shared_context.getFragmentResults();
1806  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1809  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1810  }
1811  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1812  try {
1813  return reduceSpeculativeTopN(
1814  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1815  } catch (const std::bad_alloc&) {
1816  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1817  }
1818  }
1819  const auto shard_count =
1820  device_type == ExecutorDeviceType::GPU
1822  : 0;
1823 
1824  if (shard_count && !result_per_device.empty()) {
1825  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1826  }
1827  return reduceMultiDeviceResults(
1828  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1829 }
1830 
1831 namespace {
1842 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1843  const QueryMemoryDescriptor& input_query_mem_desc,
1844  const ResultSetStorage* output_storage,
1845  size_t output_row_index,
1846  const QueryMemoryDescriptor& output_query_mem_desc,
1847  const std::vector<uint32_t>& top_permutation) {
1848  const auto output_buffer = output_storage->getUnderlyingBuffer();
1849  const auto input_buffer = input_storage->getUnderlyingBuffer();
1850  for (const auto sorted_idx : top_permutation) {
1851  // permuting all group-columns in this result set into the final buffer:
1852  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1853  group_idx++) {
1854  const auto input_column_ptr =
1855  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1856  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1857  const auto output_column_ptr =
1858  output_buffer +
1859  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1860  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1861  memcpy(output_column_ptr,
1862  input_column_ptr,
1863  output_query_mem_desc.groupColWidth(group_idx));
1864  }
1865  // permuting all agg-columns in this result set into the final buffer:
1866  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1867  slot_idx++) {
1868  const auto input_column_ptr =
1869  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1870  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1871  const auto output_column_ptr =
1872  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1873  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1874  memcpy(output_column_ptr,
1875  input_column_ptr,
1876  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1877  }
1878  ++output_row_index;
1879  }
1880  return output_row_index;
1881 }
1882 
1892 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1893  const ResultSetStorage* output_storage,
1894  size_t output_row_index,
1895  const QueryMemoryDescriptor& output_query_mem_desc,
1896  const std::vector<uint32_t>& top_permutation) {
1897  const auto output_buffer = output_storage->getUnderlyingBuffer();
1898  const auto input_buffer = input_storage->getUnderlyingBuffer();
1899  for (const auto sorted_idx : top_permutation) {
1900  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1901  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1902  row_ptr,
1903  output_query_mem_desc.getRowSize());
1904  ++output_row_index;
1905  }
1906  return output_row_index;
1907 }
1908 } // namespace
1909 
1910 // Collect top results from each device, stitch them together and sort. Partial
1911 // results from each device are guaranteed to be disjunct because we only go on
1912 // this path when one of the columns involved is a shard key.
1914  SharedKernelContext& shared_context,
1915  const RelAlgExecutionUnit& ra_exe_unit) const {
1916  auto& result_per_device = shared_context.getFragmentResults();
1917  const auto first_result_set = result_per_device.front().first;
1918  CHECK(first_result_set);
1919  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1920  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1921  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1922  top_query_mem_desc.setEntryCount(0);
1923  for (auto& result : result_per_device) {
1924  const auto result_set = result.first;
1925  CHECK(result_set);
1926  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
1927  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1928  top_query_mem_desc.setEntryCount(new_entry_cnt);
1929  }
1930  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1931  first_result_set->getDeviceType(),
1932  top_query_mem_desc,
1933  first_result_set->getRowSetMemOwner(),
1934  catalog_,
1935  blockSize(),
1936  gridSize());
1937  auto top_storage = top_result_set->allocateStorage();
1938  size_t top_output_row_idx{0};
1939  for (auto& result : result_per_device) {
1940  const auto result_set = result.first;
1941  CHECK(result_set);
1942  const auto& top_permutation = result_set->getPermutationBuffer();
1943  CHECK_LE(top_permutation.size(), top_n);
1944  if (top_query_mem_desc.didOutputColumnar()) {
1945  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1946  result_set->getQueryMemDesc(),
1947  top_storage,
1948  top_output_row_idx,
1949  top_query_mem_desc,
1950  top_permutation);
1951  } else {
1952  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1953  top_storage,
1954  top_output_row_idx,
1955  top_query_mem_desc,
1956  top_permutation);
1957  }
1958  }
1959  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1960  return top_result_set;
1961 }
1962 
1963 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1964  const {
1965  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1966  const auto& join_info = plan_state_->join_info_;
1967  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1968  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1969  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1970  id_to_cond.insert(
1971  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1972  }
1973  return id_to_cond;
1974 }
1975 
1976 namespace {
1977 
1978 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1979  for (const auto& col : fetched_cols) {
1980  if (col.is_lazily_fetched) {
1981  return true;
1982  }
1983  }
1984  return false;
1985 }
1986 
1987 } // namespace
1988 
1989 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
1990  SharedKernelContext& shared_context,
1991  const RelAlgExecutionUnit& ra_exe_unit,
1992  ColumnFetcher& column_fetcher,
1993  const std::vector<InputTableInfo>& table_infos,
1994  const ExecutionOptions& eo,
1995  const bool is_agg,
1996  const bool allow_single_frag_table_opt,
1997  const size_t context_count,
1998  const QueryCompilationDescriptor& query_comp_desc,
1999  const QueryMemoryDescriptor& query_mem_desc,
2000  RenderInfo* render_info,
2001  std::unordered_set<int>& available_gpus,
2002  int& available_cpus) {
2003  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2004 
2005  QueryFragmentDescriptor fragment_descriptor(
2006  ra_exe_unit,
2007  table_infos,
2008  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2010  : std::vector<Data_Namespace::MemoryInfo>{},
2013  CHECK(!ra_exe_unit.input_descs.empty());
2014 
2015  const auto device_type = query_comp_desc.getDeviceType();
2016  const bool uses_lazy_fetch =
2017  plan_state_->allow_lazy_fetch_ &&
2019  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2020  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2021  const auto device_count = deviceCount(device_type);
2022  CHECK_GT(device_count, 0);
2023 
2024  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2025  shared_context.getFragOffsets(),
2026  device_count,
2027  device_type,
2028  use_multifrag_kernel,
2030  this);
2031  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2032  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2033  }
2034 
2035  if (use_multifrag_kernel) {
2036  VLOG(1) << "Creating multifrag execution kernels";
2037  VLOG(1) << query_mem_desc.toString();
2038 
2039  // NB: We should never be on this path when the query is retried because of running
2040  // out of group by slots; also, for scan only queries on CPU we want the
2041  // high-granularity, fragment by fragment execution instead. For scan only queries on
2042  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2043  // buffer per fragment.
2044  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2045  &execution_kernels,
2046  &column_fetcher,
2047  &eo,
2048  &query_comp_desc,
2049  &query_mem_desc,
2050  render_info](const int device_id,
2051  const FragmentsList& frag_list,
2052  const int64_t rowid_lookup_key) {
2053  execution_kernels.emplace_back(
2054  std::make_unique<ExecutionKernel>(ra_exe_unit,
2056  device_id,
2057  eo,
2058  column_fetcher,
2059  query_comp_desc,
2060  query_mem_desc,
2061  frag_list,
2063  render_info,
2064  rowid_lookup_key));
2065  };
2066  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2067  } else {
2068  VLOG(1) << "Creating one execution kernel per fragment";
2069  VLOG(1) << query_mem_desc.toString();
2070 
2071  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2073  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2074  const auto max_frag_size =
2075  table_infos.front().info.getFragmentNumTuplesUpperBound();
2076  if (max_frag_size < query_mem_desc.getEntryCount()) {
2077  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2078  << " to match max fragment size " << max_frag_size
2079  << " for kernel per fragment execution path.";
2080  throw CompilationRetryNewScanLimit(max_frag_size);
2081  }
2082  }
2083 
2084  size_t frag_list_idx{0};
2085  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2086  &execution_kernels,
2087  &column_fetcher,
2088  &eo,
2089  &frag_list_idx,
2090  &device_type,
2091  &query_comp_desc,
2092  &query_mem_desc,
2093  render_info](const int device_id,
2094  const FragmentsList& frag_list,
2095  const int64_t rowid_lookup_key) {
2096  if (!frag_list.size()) {
2097  return;
2098  }
2099  CHECK_GE(device_id, 0);
2100 
2101  execution_kernels.emplace_back(
2102  std::make_unique<ExecutionKernel>(ra_exe_unit,
2103  device_type,
2104  device_id,
2105  eo,
2106  column_fetcher,
2107  query_comp_desc,
2108  query_mem_desc,
2109  frag_list,
2111  render_info,
2112  rowid_lookup_key));
2113  ++frag_list_idx;
2114  };
2115 
2116  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2117  ra_exe_unit);
2118  }
2119 
2120  return execution_kernels;
2121 }
2122 
2123 template <typename THREAD_POOL>
2125  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2126  auto clock_begin = timer_start();
2127  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2128  kernel_queue_time_ms_ += timer_stop(clock_begin);
2129 
2130  THREAD_POOL thread_pool;
2131  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2132  for (auto& kernel : kernels) {
2133  thread_pool.spawn(
2134  [this, &shared_context, parent_thread_id = logger::thread_id()](
2135  ExecutionKernel* kernel) {
2136  CHECK(kernel);
2137  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2138  kernel->run(this, shared_context);
2139  },
2140  kernel.get());
2141  }
2142  thread_pool.join();
2143 }
2144 
2146  const RelAlgExecutionUnit& ra_exe_unit,
2147  const ExecutorDeviceType device_type,
2148  const size_t table_idx,
2149  const size_t outer_frag_idx,
2150  std::map<int, const TableFragments*>& selected_tables_fragments,
2151  const std::unordered_map<int, const Analyzer::BinOper*>&
2152  inner_table_id_to_join_condition) {
2153  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2154  auto table_frags_it = selected_tables_fragments.find(table_id);
2155  CHECK(table_frags_it != selected_tables_fragments.end());
2156  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2157  const auto outer_table_fragments_it =
2158  selected_tables_fragments.find(outer_input_desc.getTableId());
2159  const auto outer_table_fragments = outer_table_fragments_it->second;
2160  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2161  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2162  if (!table_idx) {
2163  return {outer_frag_idx};
2164  }
2165  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2166  auto& inner_frags = table_frags_it->second;
2167  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2168  std::vector<size_t> all_frag_ids;
2169  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2170  ++inner_frag_idx) {
2171  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2172  if (skipFragmentPair(outer_fragment_info,
2173  inner_frag_info,
2174  table_idx,
2175  inner_table_id_to_join_condition,
2176  ra_exe_unit,
2177  device_type)) {
2178  continue;
2179  }
2180  all_frag_ids.push_back(inner_frag_idx);
2181  }
2182  return all_frag_ids;
2183 }
2184 
2185 // Returns true iff the join between two fragments cannot yield any results, per
2186 // shard information. The pair can be skipped to avoid full broadcast.
2188  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2189  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2190  const int table_idx,
2191  const std::unordered_map<int, const Analyzer::BinOper*>&
2192  inner_table_id_to_join_condition,
2193  const RelAlgExecutionUnit& ra_exe_unit,
2194  const ExecutorDeviceType device_type) {
2195  if (device_type != ExecutorDeviceType::GPU) {
2196  return false;
2197  }
2198  CHECK(table_idx >= 0 &&
2199  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2200  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2201  // Both tables need to be sharded the same way.
2202  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2203  outer_fragment_info.shard == inner_fragment_info.shard) {
2204  return false;
2205  }
2206  const Analyzer::BinOper* join_condition{nullptr};
2207  if (ra_exe_unit.join_quals.empty()) {
2208  CHECK(!inner_table_id_to_join_condition.empty());
2209  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2210  CHECK(condition_it != inner_table_id_to_join_condition.end());
2211  join_condition = condition_it->second;
2212  CHECK(join_condition);
2213  } else {
2214  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2215  plan_state_->join_info_.join_hash_tables_.size());
2216  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2217  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2218  table_idx) {
2219  CHECK(!join_condition);
2220  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2221  }
2222  }
2223  }
2224  if (!join_condition) {
2225  return false;
2226  }
2227  // TODO(adb): support fragment skipping based on the overlaps operator
2228  if (join_condition->is_overlaps_oper()) {
2229  return false;
2230  }
2231  size_t shard_count{0};
2232  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2233  join_condition->get_left_operand())) {
2234  auto inner_outer_pairs =
2235  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2237  join_condition, this, inner_outer_pairs);
2238  } else {
2239  shard_count = get_shard_count(join_condition, this);
2240  }
2241  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2242  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2243  }
2244  return shard_count;
2245 }
2246 
2247 namespace {
2248 
2251  const int table_id = col_desc->getScanDesc().getTableId();
2252  const int col_id = col_desc->getColId();
2253  return get_column_descriptor_maybe(col_id, table_id, cat);
2254 }
2255 
2256 } // namespace
2257 
2258 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2259  const std::vector<InputDescriptor>& input_descs,
2260  const std::map<int, const TableFragments*>& all_tables_fragments) {
2261  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2262  for (auto& desc : input_descs) {
2263  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2264  CHECK(fragments_it != all_tables_fragments.end());
2265  const auto& fragments = *fragments_it->second;
2266  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2267  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2268  frag_offsets[i] = off;
2269  off += fragments[i].getNumTuples();
2270  }
2271  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2272  }
2273  return tab_id_to_frag_offsets;
2274 }
2275 
2276 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2278  const RelAlgExecutionUnit& ra_exe_unit,
2279  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2280  const std::vector<InputDescriptor>& input_descs,
2281  const std::map<int, const TableFragments*>& all_tables_fragments) {
2282  std::vector<std::vector<int64_t>> all_num_rows;
2283  std::vector<std::vector<uint64_t>> all_frag_offsets;
2284  const auto tab_id_to_frag_offsets =
2285  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2286  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2287  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2288  std::vector<int64_t> num_rows;
2289  std::vector<uint64_t> frag_offsets;
2290  if (!ra_exe_unit.union_all) {
2291  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2292  }
2293  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2294  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2295  const auto fragments_it =
2296  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2297  CHECK(fragments_it != all_tables_fragments.end());
2298  const auto& fragments = *fragments_it->second;
2299  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2300  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2301  const auto& fragment = fragments[frag_id];
2302  num_rows.push_back(fragment.getNumTuples());
2303  } else {
2304  size_t total_row_count{0};
2305  for (const auto& fragment : fragments) {
2306  total_row_count += fragment.getNumTuples();
2307  }
2308  num_rows.push_back(total_row_count);
2309  }
2310  const auto frag_offsets_it =
2311  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2312  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2313  const auto& offsets = frag_offsets_it->second;
2314  CHECK_LT(frag_id, offsets.size());
2315  frag_offsets.push_back(offsets[frag_id]);
2316  }
2317  all_num_rows.push_back(num_rows);
2318  // Fragment offsets of outer table should be ONLY used by rowid for now.
2319  all_frag_offsets.push_back(frag_offsets);
2320  }
2321  return {all_num_rows, all_frag_offsets};
2322 }
2323 
2324 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2325 // all the table fragments.
2327  const RelAlgExecutionUnit& ra_exe_unit,
2328  const FragmentsList& selected_fragments) const {
2329  const auto& input_descs = ra_exe_unit.input_descs;
2330  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2331  if (nest_level < 1 ||
2332  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2333  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2334  (ra_exe_unit.join_quals.empty() &&
2335  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2336  return false;
2337  }
2338  const int table_id = inner_col_desc.getScanDesc().getTableId();
2339  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2340  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2341  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2342  return fragments.size() > 1;
2343 }
2344 
2345 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2346  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2347  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2348  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2349 }
2350 
2352  const ColumnFetcher& column_fetcher,
2353  const RelAlgExecutionUnit& ra_exe_unit,
2354  const int device_id,
2355  const Data_Namespace::MemoryLevel memory_level,
2356  const std::map<int, const TableFragments*>& all_tables_fragments,
2357  const FragmentsList& selected_fragments,
2359  std::list<ChunkIter>& chunk_iterators,
2360  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2361  DeviceAllocator* device_allocator) {
2362  auto timer = DEBUG_TIMER(__func__);
2364  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2365  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2366  std::vector<size_t> local_col_to_frag_pos;
2367  buildSelectedFragsMapping(selected_fragments_crossjoin,
2368  local_col_to_frag_pos,
2369  col_global_ids,
2370  selected_fragments,
2371  ra_exe_unit);
2372 
2374  selected_fragments_crossjoin);
2375 
2376  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2377  std::vector<std::vector<int64_t>> all_num_rows;
2378  std::vector<std::vector<uint64_t>> all_frag_offsets;
2379 
2380  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2381  std::vector<const int8_t*> frag_col_buffers(
2382  plan_state_->global_to_local_col_ids_.size());
2383  for (const auto& col_id : col_global_ids) {
2384  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2386  interrupted_.load()) {
2387  resetInterrupt();
2389  }
2390  CHECK(col_id);
2391  const int table_id = col_id->getScanDesc().getTableId();
2392  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2393  if (cd && cd->isVirtualCol) {
2394  CHECK_EQ("rowid", cd->columnName);
2395  continue;
2396  }
2397  const auto fragments_it = all_tables_fragments.find(table_id);
2398  CHECK(fragments_it != all_tables_fragments.end());
2399  const auto fragments = fragments_it->second;
2400  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2401  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2402  CHECK_LT(static_cast<size_t>(it->second),
2403  plan_state_->global_to_local_col_ids_.size());
2404  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2405  if (!fragments->size()) {
2406  return {};
2407  }
2408  CHECK_LT(frag_id, fragments->size());
2409  auto memory_level_for_column = memory_level;
2410  if (plan_state_->columns_to_fetch_.find(
2411  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2412  plan_state_->columns_to_fetch_.end()) {
2413  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2414  }
2415  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2416  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2417  col_id.get(), memory_level_for_column, device_id, device_allocator);
2418  } else {
2419  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2420  frag_col_buffers[it->second] =
2421  column_fetcher.getAllTableColumnFragments(table_id,
2422  col_id->getColId(),
2423  all_tables_fragments,
2424  memory_level_for_column,
2425  device_id,
2426  device_allocator);
2427  } else {
2428  frag_col_buffers[it->second] =
2429  column_fetcher.getOneTableColumnFragment(table_id,
2430  frag_id,
2431  col_id->getColId(),
2432  all_tables_fragments,
2433  chunks,
2434  chunk_iterators,
2435  memory_level_for_column,
2436  device_id,
2437  device_allocator);
2438  }
2439  }
2440  }
2441  all_frag_col_buffers.push_back(frag_col_buffers);
2442  }
2443  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2444  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2445  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2446 }
2447 
2448 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2449 // This is written under the assumption that multiple inputs implies a UNION ALL.
2451  const ColumnFetcher& column_fetcher,
2452  const RelAlgExecutionUnit& ra_exe_unit,
2453  const int device_id,
2454  const Data_Namespace::MemoryLevel memory_level,
2455  const std::map<int, const TableFragments*>& all_tables_fragments,
2456  const FragmentsList& selected_fragments,
2458  std::list<ChunkIter>& chunk_iterators,
2459  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2460  DeviceAllocator* device_allocator) {
2461  auto timer = DEBUG_TIMER(__func__);
2463 
2464  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2465  std::vector<std::vector<int64_t>> all_num_rows;
2466  std::vector<std::vector<uint64_t>> all_frag_offsets;
2467 
2468  CHECK(!selected_fragments.empty());
2469  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2470  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2471  using TableId = int;
2472  TableId const selected_table_id = selected_fragments.front().table_id;
2473  bool const input_descs_index =
2474  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2475  if (!input_descs_index) {
2476  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2477  }
2478  bool const input_col_descs_index =
2479  selected_table_id ==
2480  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2481  if (!input_col_descs_index) {
2482  CHECK_EQ(selected_table_id,
2483  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2484  }
2485  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2486  << " selected_table_id=" << selected_table_id
2487  << " input_descs_index=" << int(input_descs_index)
2488  << " input_col_descs_index=" << int(input_col_descs_index)
2489  << " ra_exe_unit.input_descs="
2490  << shared::printContainer(ra_exe_unit.input_descs)
2491  << " ra_exe_unit.input_col_descs="
2492  << shared::printContainer(ra_exe_unit.input_col_descs);
2493 
2494  // Partition col_global_ids by table_id
2495  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2496  table_id_to_input_col_descs;
2497  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2498  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2499  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2500  }
2501  for (auto const& pair : table_id_to_input_col_descs) {
2502  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2503  std::vector<size_t> local_col_to_frag_pos;
2504 
2505  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2506  local_col_to_frag_pos,
2507  pair.second,
2508  selected_fragments,
2509  ra_exe_unit);
2510 
2512  selected_fragments_crossjoin);
2513 
2514  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2515  std::vector<const int8_t*> frag_col_buffers(
2516  plan_state_->global_to_local_col_ids_.size());
2517  for (const auto& col_id : pair.second) {
2518  CHECK(col_id);
2519  const int table_id = col_id->getScanDesc().getTableId();
2520  CHECK_EQ(table_id, pair.first);
2521  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2522  if (cd && cd->isVirtualCol) {
2523  CHECK_EQ("rowid", cd->columnName);
2524  continue;
2525  }
2526  const auto fragments_it = all_tables_fragments.find(table_id);
2527  CHECK(fragments_it != all_tables_fragments.end());
2528  const auto fragments = fragments_it->second;
2529  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2530  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2531  CHECK_LT(static_cast<size_t>(it->second),
2532  plan_state_->global_to_local_col_ids_.size());
2533  const size_t frag_id = ra_exe_unit.union_all
2534  ? 0
2535  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2536  if (!fragments->size()) {
2537  return {};
2538  }
2539  CHECK_LT(frag_id, fragments->size());
2540  auto memory_level_for_column = memory_level;
2541  if (plan_state_->columns_to_fetch_.find(
2542  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2543  plan_state_->columns_to_fetch_.end()) {
2544  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2545  }
2546  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2547  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2548  col_id.get(), memory_level_for_column, device_id, device_allocator);
2549  } else {
2550  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2551  frag_col_buffers[it->second] =
2552  column_fetcher.getAllTableColumnFragments(table_id,
2553  col_id->getColId(),
2554  all_tables_fragments,
2555  memory_level_for_column,
2556  device_id,
2557  device_allocator);
2558  } else {
2559  frag_col_buffers[it->second] =
2560  column_fetcher.getOneTableColumnFragment(table_id,
2561  frag_id,
2562  col_id->getColId(),
2563  all_tables_fragments,
2564  chunks,
2565  chunk_iterators,
2566  memory_level_for_column,
2567  device_id,
2568  device_allocator);
2569  }
2570  }
2571  }
2572  all_frag_col_buffers.push_back(frag_col_buffers);
2573  }
2574  std::vector<std::vector<int64_t>> num_rows;
2575  std::vector<std::vector<uint64_t>> frag_offsets;
2576  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2577  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2578  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2579  all_frag_offsets.insert(
2580  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2581  }
2582  // UNION ALL hacks.
2583  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2584  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2585  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2586  }
2587  if (input_descs_index == input_col_descs_index) {
2588  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2589  }
2590 
2591  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2592  << " all_num_rows=" << shared::printContainer(all_num_rows)
2593  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2594  << " input_col_descs_index=" << input_col_descs_index;
2595  return {{all_frag_col_buffers[input_descs_index]},
2596  {{all_num_rows[0][input_descs_index]}},
2597  {{all_frag_offsets[0][input_descs_index]}}};
2598 }
2599 
2600 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2601  const size_t scan_idx,
2602  const RelAlgExecutionUnit& ra_exe_unit) {
2603  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2604  scan_idx > 0 &&
2605  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2606  !selected_fragments[scan_idx].fragment_ids.empty()) {
2607  // Fetch all fragments
2608  return {size_t(0)};
2609  }
2610 
2611  return selected_fragments[scan_idx].fragment_ids;
2612 }
2613 
2615  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2616  std::vector<size_t>& local_col_to_frag_pos,
2617  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2618  const FragmentsList& selected_fragments,
2619  const RelAlgExecutionUnit& ra_exe_unit) {
2620  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2621  size_t frag_pos{0};
2622  const auto& input_descs = ra_exe_unit.input_descs;
2623  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2624  const int table_id = input_descs[scan_idx].getTableId();
2625  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2626  selected_fragments_crossjoin.push_back(
2627  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2628  for (const auto& col_id : col_global_ids) {
2629  CHECK(col_id);
2630  const auto& input_desc = col_id->getScanDesc();
2631  if (input_desc.getTableId() != table_id ||
2632  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2633  continue;
2634  }
2635  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2636  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2637  CHECK_LT(static_cast<size_t>(it->second),
2638  plan_state_->global_to_local_col_ids_.size());
2639  local_col_to_frag_pos[it->second] = frag_pos;
2640  }
2641  ++frag_pos;
2642  }
2643 }
2644 
2646  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2647  std::vector<size_t>& local_col_to_frag_pos,
2648  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2649  const FragmentsList& selected_fragments,
2650  const RelAlgExecutionUnit& ra_exe_unit) {
2651  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2652  size_t frag_pos{0};
2653  const auto& input_descs = ra_exe_unit.input_descs;
2654  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2655  const int table_id = input_descs[scan_idx].getTableId();
2656  // selected_fragments here is from assignFragsToKernelDispatch
2657  // execution_kernel.fragments
2658  if (selected_fragments[0].table_id != table_id) { // TODO 0
2659  continue;
2660  }
2661  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2662  selected_fragments_crossjoin.push_back(
2663  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2664  {size_t(1)}); // TODO
2665  for (const auto& col_id : col_global_ids) {
2666  CHECK(col_id);
2667  const auto& input_desc = col_id->getScanDesc();
2668  if (input_desc.getTableId() != table_id ||
2669  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2670  continue;
2671  }
2672  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2673  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2674  CHECK_LT(static_cast<size_t>(it->second),
2675  plan_state_->global_to_local_col_ids_.size());
2676  local_col_to_frag_pos[it->second] = frag_pos;
2677  }
2678  ++frag_pos;
2679  }
2680 }
2681 
2682 namespace {
2683 
2685  public:
2686  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2688  for (auto out : out_vec_) {
2689  delete[] out;
2690  }
2691  }
2692 
2693  private:
2694  std::vector<int64_t*> out_vec_;
2695 };
2696 } // namespace
2697 
2699  const RelAlgExecutionUnit& ra_exe_unit,
2700  const CompilationResult& compilation_result,
2701  const bool hoist_literals,
2702  ResultSetPtr& results,
2703  const std::vector<Analyzer::Expr*>& target_exprs,
2704  const ExecutorDeviceType device_type,
2705  std::vector<std::vector<const int8_t*>>& col_buffers,
2706  QueryExecutionContext* query_exe_context,
2707  const std::vector<std::vector<int64_t>>& num_rows,
2708  const std::vector<std::vector<uint64_t>>& frag_offsets,
2709  Data_Namespace::DataMgr* data_mgr,
2710  const int device_id,
2711  const uint32_t start_rowid,
2712  const uint32_t num_tables,
2713  RenderInfo* render_info) {
2715  auto timer = DEBUG_TIMER(__func__);
2716  CHECK(!results);
2717  if (col_buffers.empty()) {
2718  return 0;
2719  }
2720 
2721  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2722  if (render_info) {
2723  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2724  // here, we are in non-insitu mode.
2725  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2726  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2727  "currently unsupported.";
2728  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2729  }
2730 
2731  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2732  std::vector<int64_t*> out_vec;
2733  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2734  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2735  std::unique_ptr<OutVecOwner> output_memory_scope;
2737  interrupted_.load()) {
2738  resetInterrupt();
2740  }
2741  if (device_type == ExecutorDeviceType::CPU) {
2742  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2743  compilation_result.generated_code);
2744  CHECK(cpu_generated_code);
2745  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2746  cpu_generated_code.get(),
2747  hoist_literals,
2748  hoist_buf,
2749  col_buffers,
2750  num_rows,
2751  frag_offsets,
2752  0,
2753  &error_code,
2754  num_tables,
2755  join_hash_table_ptrs);
2756  output_memory_scope.reset(new OutVecOwner(out_vec));
2757  } else {
2758  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2759  compilation_result.generated_code);
2760  CHECK(gpu_generated_code);
2761  try {
2762  out_vec = query_exe_context->launchGpuCode(
2763  ra_exe_unit,
2764  gpu_generated_code.get(),
2765  hoist_literals,
2766  hoist_buf,
2767  col_buffers,
2768  num_rows,
2769  frag_offsets,
2770  0,
2771  data_mgr,
2772  blockSize(),
2773  gridSize(),
2774  device_id,
2775  compilation_result.gpu_smem_context.getSharedMemorySize(),
2776  &error_code,
2777  num_tables,
2778  join_hash_table_ptrs,
2779  render_allocator_map_ptr);
2780  output_memory_scope.reset(new OutVecOwner(out_vec));
2781  } catch (const OutOfMemory&) {
2782  return ERR_OUT_OF_GPU_MEM;
2783  } catch (const std::exception& e) {
2784  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2785  }
2786  }
2787  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2788  error_code == Executor::ERR_DIV_BY_ZERO ||
2789  error_code == Executor::ERR_OUT_OF_TIME ||
2790  error_code == Executor::ERR_INTERRUPTED ||
2792  error_code == Executor::ERR_GEOS) {
2793  return error_code;
2794  }
2795  if (ra_exe_unit.estimator) {
2796  CHECK(!error_code);
2797  results =
2798  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2799  return 0;
2800  }
2801  std::vector<int64_t> reduced_outs;
2802  const auto num_frags = col_buffers.size();
2803  const size_t entry_count =
2804  device_type == ExecutorDeviceType::GPU
2805  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2806  ? 1
2807  : blockSize() * gridSize() * num_frags)
2808  : num_frags;
2809  if (size_t(1) == entry_count) {
2810  for (auto out : out_vec) {
2811  CHECK(out);
2812  reduced_outs.push_back(*out);
2813  }
2814  } else {
2815  size_t out_vec_idx = 0;
2816 
2817  for (const auto target_expr : target_exprs) {
2818  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2819  CHECK(agg_info.is_agg);
2820 
2821  const int num_iterations = agg_info.sql_type.is_geometry()
2822  ? agg_info.sql_type.get_physical_coord_cols()
2823  : 1;
2824 
2825  for (int i = 0; i < num_iterations; i++) {
2826  int64_t val1;
2827  const bool float_argument_input = takes_float_argument(agg_info);
2828  if (is_distinct_target(agg_info)) {
2829  CHECK(agg_info.agg_kind == kCOUNT ||
2830  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2831  val1 = out_vec[out_vec_idx][0];
2832  error_code = 0;
2833  } else {
2834  const auto chosen_bytes = static_cast<size_t>(
2835  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2836  std::tie(val1, error_code) = Executor::reduceResults(
2837  agg_info.agg_kind,
2838  agg_info.sql_type,
2839  query_exe_context->getAggInitValForIndex(out_vec_idx),
2840  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2841  out_vec[out_vec_idx],
2842  entry_count,
2843  false,
2844  float_argument_input);
2845  }
2846  if (error_code) {
2847  break;
2848  }
2849  reduced_outs.push_back(val1);
2850  if (agg_info.agg_kind == kAVG ||
2851  (agg_info.agg_kind == kSAMPLE &&
2852  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2853  const auto chosen_bytes = static_cast<size_t>(
2854  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
2855  1));
2856  int64_t val2;
2857  std::tie(val2, error_code) = Executor::reduceResults(
2858  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2859  agg_info.sql_type,
2860  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2861  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2862  out_vec[out_vec_idx + 1],
2863  entry_count,
2864  false,
2865  false);
2866  if (error_code) {
2867  break;
2868  }
2869  reduced_outs.push_back(val2);
2870  ++out_vec_idx;
2871  }
2872  ++out_vec_idx;
2873  }
2874  }
2875  }
2876 
2877  if (error_code) {
2878  return error_code;
2879  }
2880 
2881  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2882  auto rows_ptr = std::shared_ptr<ResultSet>(
2883  query_exe_context->query_buffers_->result_sets_[0].release());
2884  rows_ptr->fillOneEntry(reduced_outs);
2885  results = std::move(rows_ptr);
2886  return error_code;
2887 }
2888 
2889 namespace {
2890 
2891 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2892  CHECK(scan_limit);
2893  return results && results->rowCount() < scan_limit;
2894 }
2895 
2896 } // namespace
2897 
2899  const RelAlgExecutionUnit& ra_exe_unit,
2900  const CompilationResult& compilation_result,
2901  const bool hoist_literals,
2902  ResultSetPtr& results,
2903  const ExecutorDeviceType device_type,
2904  std::vector<std::vector<const int8_t*>>& col_buffers,
2905  const std::vector<size_t> outer_tab_frag_ids,
2906  QueryExecutionContext* query_exe_context,
2907  const std::vector<std::vector<int64_t>>& num_rows,
2908  const std::vector<std::vector<uint64_t>>& frag_offsets,
2909  Data_Namespace::DataMgr* data_mgr,
2910  const int device_id,
2911  const int outer_table_id,
2912  const int64_t scan_limit,
2913  const uint32_t start_rowid,
2914  const uint32_t num_tables,
2915  RenderInfo* render_info) {
2916  auto timer = DEBUG_TIMER(__func__);
2918  CHECK(!results);
2919  if (col_buffers.empty()) {
2920  return 0;
2921  }
2922  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2923  // TODO(alex):
2924  // 1. Optimize size (make keys more compact).
2925  // 2. Resize on overflow.
2926  // 3. Optimize runtime.
2927  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2928  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2929  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2931  interrupted_.load()) {
2932  return ERR_INTERRUPTED;
2933  }
2934 
2935  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2936  if (render_info && render_info->useCudaBuffers()) {
2937  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2938  }
2939 
2940  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
2941  << " ra_exe_unit.input_descs="
2942  << shared::printContainer(ra_exe_unit.input_descs)
2943  << " ra_exe_unit.input_col_descs="
2944  << shared::printContainer(ra_exe_unit.input_col_descs)
2945  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
2946  << " num_rows=" << shared::printContainer(num_rows)
2947  << " frag_offsets=" << shared::printContainer(frag_offsets)
2948  << " query_exe_context->query_buffers_->num_rows_="
2949  << query_exe_context->query_buffers_->num_rows_
2950  << " query_exe_context->query_mem_desc_.getEntryCount()="
2951  << query_exe_context->query_mem_desc_.getEntryCount()
2952  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
2953  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
2954  << " num_tables=" << num_tables;
2955 
2956  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
2957  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
2958  // with outer_table_id.
2959  if (ra_exe_unit_copy.union_all) {
2960  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
2961  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
2962  ra_exe_unit_copy.input_descs.end(),
2963  [outer_table_id](auto const& a, auto const& b) {
2964  return a.getTableId() == outer_table_id &&
2965  b.getTableId() != outer_table_id;
2966  });
2967  while (!ra_exe_unit_copy.input_descs.empty() &&
2968  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
2969  ra_exe_unit_copy.input_descs.pop_back();
2970  }
2971  // Filter ra_exe_unit_copy.input_col_descs.
2972  ra_exe_unit_copy.input_col_descs.remove_if(
2973  [outer_table_id](auto const& input_col_desc) {
2974  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2975  });
2976  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
2977  }
2978 
2979  if (device_type == ExecutorDeviceType::CPU) {
2980  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2981  compilation_result.generated_code);
2982  CHECK(cpu_generated_code);
2983  query_exe_context->launchCpuCode(
2984  ra_exe_unit_copy,
2985  cpu_generated_code.get(),
2986  hoist_literals,
2987  hoist_buf,
2988  col_buffers,
2989  num_rows,
2990  frag_offsets,
2991  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2992  &error_code,
2993  num_tables,
2994  join_hash_table_ptrs);
2995  } else {
2996  try {
2997  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2998  compilation_result.generated_code);
2999  CHECK(gpu_generated_code);
3000  query_exe_context->launchGpuCode(
3001  ra_exe_unit_copy,
3002  gpu_generated_code.get(),
3003  hoist_literals,
3004  hoist_buf,
3005  col_buffers,
3006  num_rows,
3007  frag_offsets,
3008  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3009  data_mgr,
3010  blockSize(),
3011  gridSize(),
3012  device_id,
3013  compilation_result.gpu_smem_context.getSharedMemorySize(),
3014  &error_code,
3015  num_tables,
3016  join_hash_table_ptrs,
3017  render_allocator_map_ptr);
3018  } catch (const OutOfMemory&) {
3019  return ERR_OUT_OF_GPU_MEM;
3020  } catch (const OutOfRenderMemory&) {
3021  return ERR_OUT_OF_RENDER_MEM;
3022  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3024  } catch (const std::exception& e) {
3025  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3026  }
3027  }
3028 
3029  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3030  error_code == Executor::ERR_DIV_BY_ZERO ||
3031  error_code == Executor::ERR_OUT_OF_TIME ||
3032  error_code == Executor::ERR_INTERRUPTED ||
3034  error_code == Executor::ERR_GEOS) {
3035  return error_code;
3036  }
3037 
3038  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3039  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3040  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3041  query_exe_context->query_mem_desc_);
3042  CHECK(results);
3043  VLOG(2) << "results->rowCount()=" << results->rowCount();
3044  results->holdLiterals(hoist_buf);
3045  }
3046  if (error_code < 0 && render_allocator_map_ptr) {
3047  auto const adjusted_scan_limit =
3048  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3049  // More rows passed the filter than available slots. We don't have a count to check,
3050  // so assume we met the limit if a scan limit is set
3051  if (adjusted_scan_limit != 0) {
3052  return 0;
3053  } else {
3054  return error_code;
3055  }
3056  }
3057  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3058  return error_code; // unlucky, not enough results and we ran out of slots
3059  }
3060 
3061  return 0;
3062 }
3063 
3064 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3065  const int device_id) {
3066  std::vector<int64_t> table_ptrs;
3067  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3068  for (auto hash_table : join_hash_tables) {
3069  if (!hash_table) {
3070  CHECK(table_ptrs.empty());
3071  return {};
3072  }
3073  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3074  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3075  }
3076  return table_ptrs;
3077 }
3078 
3079 void Executor::nukeOldState(const bool allow_lazy_fetch,
3080  const std::vector<InputTableInfo>& query_infos,
3081  const PlanState::DeletedColumnsMap& deleted_cols_map,
3082  const RelAlgExecutionUnit* ra_exe_unit) {
3085  const bool contains_left_deep_outer_join =
3086  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3087  ra_exe_unit->join_quals.end(),
3088  [](const JoinCondition& join_condition) {
3089  return join_condition.type == JoinType::LEFT;
3090  }) != ra_exe_unit->join_quals.end();
3091  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3092  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3093  query_infos,
3094  deleted_cols_map,
3095  this));
3096 }
3097 
3098 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3099  const std::vector<InputTableInfo>& query_infos) {
3101  const auto ld_count = input_descs.size();
3102  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3103  for (size_t i = 0; i < ld_count; ++i) {
3104  CHECK_LT(i, query_infos.size());
3105  const auto frag_count = query_infos[i].info.fragments.size();
3106  if (i > 0) {
3107  cgen_state_->frag_offsets_.push_back(nullptr);
3108  } else {
3109  if (frag_count > 1) {
3110  cgen_state_->frag_offsets_.push_back(
3111  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3112  } else {
3113  cgen_state_->frag_offsets_.push_back(nullptr);
3114  }
3115  }
3116  }
3117 }
3118 
3120  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3121  const std::vector<InputTableInfo>& query_infos,
3122  const MemoryLevel memory_level,
3123  const HashType preferred_hash_type,
3124  ColumnCacheMap& column_cache,
3125  const QueryHint& query_hint) {
3126  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3127  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3128  }
3129  // check whether the interrupt flag turns on (non kernel-time query interrupt)
3131  interrupted_.load()) {
3132  resetInterrupt();
3134  }
3135  try {
3136  auto tbl = HashJoin::getInstance(qual_bin_oper,
3137  query_infos,
3138  memory_level,
3139  preferred_hash_type,
3140  deviceCountForMemoryLevel(memory_level),
3141  column_cache,
3142  this,
3143  query_hint);
3144  return {tbl, ""};
3145  } catch (const HashJoinFail& e) {
3146  return {nullptr, e.what()};
3147  }
3148 }
3149 
3150 int8_t Executor::warpSize() const {
3151  CHECK(catalog_);
3152  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3153  CHECK(cuda_mgr);
3154  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3155  CHECK(!dev_props.empty());
3156  return dev_props.front().warpSize;
3157 }
3158 
3159 unsigned Executor::gridSize() const {
3160  CHECK(catalog_);
3161  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3162  if (!cuda_mgr) {
3163  return 0;
3164  }
3165  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3166 }
3167 
3168 unsigned Executor::numBlocksPerMP() const {
3169  CHECK(catalog_);
3170  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3171  CHECK(cuda_mgr);
3172  return grid_size_x_ ? std::ceil(grid_size_x_ / cuda_mgr->getMinNumMPsForAllDevices())
3173  : 2;
3174 }
3175 
3176 unsigned Executor::blockSize() const {
3177  CHECK(catalog_);
3178  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3179  if (!cuda_mgr) {
3180  return 0;
3181  }
3182  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3183  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3184 }
3185 
3187  return max_gpu_slab_size_;
3188 }
3189 
3190 int64_t Executor::deviceCycles(int milliseconds) const {
3191  CHECK(catalog_);
3192  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3193  CHECK(cuda_mgr);
3194  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3195  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3196 }
3197 
3198 llvm::Value* Executor::castToFP(llvm::Value* value,
3199  SQLTypeInfo const& from_ti,
3200  SQLTypeInfo const& to_ti) {
3202  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3203  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3204  llvm::Type* fp_type{nullptr};
3205  switch (to_ti.get_size()) {
3206  case 4:
3207  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3208  break;
3209  case 8:
3210  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3211  break;
3212  default:
3213  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3214  }
3215  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3216  if (from_ti.get_scale()) {
3217  value = cgen_state_->ir_builder_.CreateFDiv(
3218  value,
3219  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3220  }
3221  }
3222  return value;
3223 }
3224 
3225 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3227  CHECK(val->getType()->isPointerTy());
3228 
3229  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3230  const auto val_type = val_ptr_type->getElementType();
3231  size_t val_width = 0;
3232  if (val_type->isIntegerTy()) {
3233  val_width = val_type->getIntegerBitWidth();
3234  } else {
3235  if (val_type->isFloatTy()) {
3236  val_width = 32;
3237  } else {
3238  CHECK(val_type->isDoubleTy());
3239  val_width = 64;
3240  }
3241  }
3242  CHECK_LT(size_t(0), val_width);
3243  if (bitWidth == val_width) {
3244  return val;
3245  }
3246  return cgen_state_->ir_builder_.CreateBitCast(
3247  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3248 }
3249 
3250 #define EXECUTE_INCLUDE
3251 #include "ArrayOps.cpp"
3252 #include "DateAdd.cpp"
3253 #include "StringFunctions.cpp"
3254 #undef EXECUTE_INCLUDE
3255 
3256 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3257  const RelAlgExecutionUnit& ra_exe_unit,
3258  const CompilationOptions& co) {
3259  if (!co.filter_on_deleted_column) {
3260  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3261  }
3262  auto ra_exe_unit_with_deleted = ra_exe_unit;
3263  PlanState::DeletedColumnsMap deleted_cols_map;
3264  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3265  if (input_table.getSourceType() != InputSourceType::TABLE) {
3266  continue;
3267  }
3268  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3269  CHECK(td);
3270  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3271  if (!deleted_cd) {
3272  continue;
3273  }
3274  CHECK(deleted_cd->columnType.is_boolean());
3275  // check deleted column is not already present
3276  bool found = false;
3277  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3278  if (input_col.get()->getColId() == deleted_cd->columnId &&
3279  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3280  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3281  found = true;
3282  }
3283  }
3284  if (!found) {
3285  // add deleted column
3286  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3287  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3288  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3289  if (deleted_cols_it == deleted_cols_map.end()) {
3290  CHECK(deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd))
3291  .second);
3292  } else {
3293  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3294  }
3295  }
3296  }
3297  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3298 }
3299 
3300 namespace {
3301 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3302 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3303 // return type.
3304 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3305  const int64_t chunk_min,
3306  const int64_t chunk_max,
3307  const SQLTypeInfo& lhs_type,
3308  const SQLTypeInfo& rhs_type) {
3309  const int32_t ldim = lhs_type.get_dimension();
3310  const int32_t rdim = rhs_type.get_dimension();
3311  CHECK(ldim != rdim);
3312  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3313  if (ldim > rdim) {
3314  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3315  return {true, chunk_min / scale, chunk_max / scale};
3316  }
3317 
3318  using checked_int64_t = boost::multiprecision::number<
3319  boost::multiprecision::cpp_int_backend<64,
3320  64,
3321  boost::multiprecision::signed_magnitude,
3322  boost::multiprecision::checked,
3323  void>>;
3324 
3325  try {
3326  auto ret =
3327  std::make_tuple(true,
3328  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3329  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3330  return ret;
3331  } catch (const std::overflow_error& e) {
3332  // noop
3333  }
3334  return std::make_tuple(false, chunk_min, chunk_max);
3335 }
3336 
3337 } // namespace
3338 
3339 std::pair<bool, int64_t> Executor::skipFragment(
3340  const InputDescriptor& table_desc,
3341  const Fragmenter_Namespace::FragmentInfo& fragment,
3342  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3343  const std::vector<uint64_t>& frag_offsets,
3344  const size_t frag_idx) {
3345  const int table_id = table_desc.getTableId();
3346  for (const auto& simple_qual : simple_quals) {
3347  const auto comp_expr =
3348  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3349  if (!comp_expr) {
3350  // is this possible?
3351  return {false, -1};
3352  }
3353  const auto lhs = comp_expr->get_left_operand();
3354  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3355  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3356  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3357  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3358  if (lhs_uexpr) {
3359  CHECK(lhs_uexpr->get_optype() ==
3360  kCAST); // We should have only been passed a cast expression
3361  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3362  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3363  continue;
3364  }
3365  } else {
3366  continue;
3367  }
3368  }
3369  const auto rhs = comp_expr->get_right_operand();
3370  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3371  if (!rhs_const) {
3372  // is this possible?
3373  return {false, -1};
3374  }
3375  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3376  continue;
3377  }
3378  const int col_id = lhs_col->get_column_id();
3379  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3380  int64_t chunk_min{0};
3381  int64_t chunk_max{0};
3382  bool is_rowid{false};
3383  size_t start_rowid{0};
3384  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3385  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3386  if (cd->isVirtualCol) {
3387  CHECK(cd->columnName == "rowid");
3388  const auto& table_generation = getTableGeneration(table_id);
3389  start_rowid = table_generation.start_rowid;
3390  chunk_min = frag_offsets[frag_idx] + start_rowid;
3391  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3392  is_rowid = true;
3393  }
3394  } else {
3395  const auto& chunk_type = lhs_col->get_type_info();
3396  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3397  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3398  }
3399  if (chunk_min > chunk_max) {
3400  // invalid metadata range, do not skip fragment
3401  return {false, -1};
3402  }
3403  if (lhs->get_type_info().is_timestamp() &&
3404  (lhs_col->get_type_info().get_dimension() !=
3405  rhs_const->get_type_info().get_dimension()) &&
3406  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3407  rhs_const->get_type_info().is_high_precision_timestamp())) {
3408  // If original timestamp lhs col has different precision,
3409  // column metadata holds value in original precision
3410  // therefore adjust rhs value to match lhs precision
3411 
3412  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3413  // artificially limit the lhs column range. RHS overflow/underflow is already
3414  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3415  bool is_valid;
3416  std::tie(is_valid, chunk_min, chunk_max) =
3418  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3419  if (!is_valid) {
3420  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3421  "value: "
3422  << std::to_string(chunk_min)
3423  << "\nChunk max value: " << std::to_string(chunk_max)
3424  << "\nLHS col precision is: "
3425  << std::to_string(lhs_col->get_type_info().get_dimension())
3426  << "\nRHS precision is: "
3427  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3428  return {false, -1};
3429  }
3430  }
3431  llvm::LLVMContext local_context;
3432  CgenState local_cgen_state(local_context);
3433  CodeGenerator code_generator(&local_cgen_state, nullptr);
3434 
3435  const auto rhs_val =
3436  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3437 
3438  switch (comp_expr->get_optype()) {
3439  case kGE:
3440  if (chunk_max < rhs_val) {
3441  return {true, -1};
3442  }
3443  break;
3444  case kGT:
3445  if (chunk_max <= rhs_val) {
3446  return {true, -1};
3447  }
3448  break;
3449  case kLE:
3450  if (chunk_min > rhs_val) {
3451  return {true, -1};
3452  }
3453  break;
3454  case kLT:
3455  if (chunk_min >= rhs_val) {
3456  return {true, -1};
3457  }
3458  break;
3459  case kEQ:
3460  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3461  return {true, -1};
3462  } else if (is_rowid) {
3463  return {false, rhs_val - start_rowid};
3464  }
3465  break;
3466  default:
3467  break;
3468  }
3469  }
3470  return {false, -1};
3471 }
3472 
3473 /*
3474  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3475  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3476  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3477  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3478  * if at least one of these skipFragment calls return a true statment in its first value.
3479  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3480  * second value only if its first value is "false".
3481  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3482  * i.e., we only skip if none of the quals trigger the code to update the
3483  * rowid_lookup_key
3484  * - Only AND operations are valid and considered:
3485  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3486  * the skip
3487  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3488  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3489  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3490  * implemented differently, which cause the first one to skip correctly, but the second
3491  * one will not skip.
3492  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3493  * possible
3494  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3495  * projection, no skipping
3496  */
3497 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3498  const InputDescriptor& table_desc,
3499  const RelAlgExecutionUnit& ra_exe_unit,
3500  const Fragmenter_Namespace::FragmentInfo& fragment,
3501  const std::vector<uint64_t>& frag_offsets,
3502  const size_t frag_idx) {
3503  std::pair<bool, int64_t> skip_frag{false, -1};
3504  for (auto& inner_join : ra_exe_unit.join_quals) {
3505  if (inner_join.type != JoinType::INNER) {
3506  continue;
3507  }
3508 
3509  // extracting all the conjunctive simple_quals from the quals stored for the inner
3510  // join
3511  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3512  for (auto& qual : inner_join.quals) {
3513  auto temp_qual = qual_to_conjunctive_form(qual);
3514  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3515  temp_qual.simple_quals.begin(),
3516  temp_qual.simple_quals.end());
3517  }
3518  auto temp_skip_frag = skipFragment(
3519  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3520  if (temp_skip_frag.second != -1) {
3521  skip_frag.second = temp_skip_frag.second;
3522  return skip_frag;
3523  } else {
3524  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3525  }
3526  }
3527  return skip_frag;
3528 }
3529 
3531  const std::unordered_set<PhysicalInput>& phys_inputs) {
3532  AggregatedColRange agg_col_range_cache;
3533  CHECK(catalog_);
3534  std::unordered_set<int> phys_table_ids;
3535  for (const auto& phys_input : phys_inputs) {
3536  phys_table_ids.insert(phys_input.table_id);
3537  }
3538  std::vector<InputTableInfo> query_infos;
3539  for (const int table_id : phys_table_ids) {
3540  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3541  }
3542  for (const auto& phys_input : phys_inputs) {
3543  const auto cd =
3544  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3545  CHECK(cd);
3546  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3547  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3548  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3549  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3550  agg_col_range_cache.setColRange(phys_input, col_range);
3551  }
3552  }
3553  return agg_col_range_cache;
3554 }
3555 
3557  const std::unordered_set<PhysicalInput>& phys_inputs) {
3558  StringDictionaryGenerations string_dictionary_generations;
3559  CHECK(catalog_);
3560  for (const auto& phys_input : phys_inputs) {
3561  const auto cd =
3562  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3563  CHECK(cd);
3564  const auto& col_ti =
3565  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3566  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3567  const int dict_id = col_ti.get_comp_param();
3568  const auto dd = catalog_->getMetadataForDict(dict_id);
3569  CHECK(dd && dd->stringDict);
3570  string_dictionary_generations.setGeneration(dict_id,
3571  dd->stringDict->storageEntryCount());
3572  }
3573  }
3574  return string_dictionary_generations;
3575 }
3576 
3578  std::unordered_set<int> phys_table_ids) {
3579  TableGenerations table_generations;
3580  for (const int table_id : phys_table_ids) {
3581  const auto table_info = getTableInfo(table_id);
3582  table_generations.setGeneration(
3583  table_id,
3584  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3585  }
3586  return table_generations;
3587 }
3588 
3589 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3590  const std::unordered_set<int>& phys_table_ids) {
3591  CHECK(catalog_);
3592  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
3593  row_set_mem_owner_->setDictionaryGenerations(
3594  computeStringDictionaryGenerations(phys_inputs));
3596  table_generations_ = computeTableGenerations(phys_table_ids);
3597 }
3598 
3600  return executor_session_mutex_;
3601 }
3602 
3604  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3605  return current_query_session_;
3606 }
3607 
3608 size_t Executor::getRunningExecutorId(mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3610 }
3611 
3612 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
3613  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3614  // if current_query_session is equal to the candidate_query_session,
3615  // or it is empty session we consider
3616  return !candidate_query_session.empty() &&
3617  (current_query_session_ == candidate_query_session);
3618 }
3619 
3621  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3624 }
3625 
3627  std::shared_ptr<const query_state::QueryState>& query_state) {
3628  QuerySessionId query_session_id = "";
3629  std::string query_str = "N/A";
3630  {
3631  mapd_shared_lock<mapd_shared_mutex> read_lock(executor_session_mutex_);
3632  // gather necessary query's info
3633  if (query_state != nullptr && query_state->getConstSessionInfo() != nullptr) {
3634  query_session_id = query_state->getConstSessionInfo()->get_session_id();
3635  query_str = query_state->getQueryStr();
3636  } else if (!getCurrentQuerySession(read_lock).empty()) {
3637  query_session_id = getCurrentQuerySession(read_lock);
3638  }
3639  }
3640  if (!query_session_id.empty()) {
3641  // if session is valid, do update 1) the exact executor id and 2) query status
3642  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3644  query_session_id, query_state->getQuerySubmittedTime(), executor_id_, write_lock);
3645  updateQuerySessionStatusWithLock(query_session_id,
3646  query_state->getQuerySubmittedTime(),
3647  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3648  write_lock);
3649  }
3650  return {query_session_id, query_str};
3651 }
3652 
3654  // check whether we are okay to execute the "pending" query
3655  // i.e., before running the query check if this query session is "ALREADY" interrupted
3656  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3657  if (query_session.empty()) {
3658  return;
3659  }
3660  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3661  // something goes wrong since we assume this is caller's responsibility
3662  // (call this function only for enrolled query session)
3663  if (!queries_session_map_.count(query_session)) {
3664  VLOG(1) << "Interrupting pending query is not available since the query session is "
3665  "not enrolled";
3666  } else {
3667  // here the query session is enrolled but the interrupt flag is not registered
3668  VLOG(1)
3669  << "Interrupting pending query is not available since its interrupt flag is "
3670  "not registered";
3671  }
3672  return;
3673  }
3674  if (queries_interrupt_flag_[query_session]) {
3676  }
3677 }
3678 
3680  const QuerySessionId& query_session,
3681  const std::chrono::time_point<std::chrono::system_clock> submitted,
3682  bool acquire_spin_lock) {
3683  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3684  // clear the interrupt-related info for a finished query
3685  if (query_session.empty()) {
3686  return;
3687  }
3688  removeFromQuerySessionList(query_session, submitted, session_write_lock);
3689  if (query_session.compare(current_query_session_) == 0 &&
3691  invalidateRunningQuerySession(session_write_lock);
3692  if (acquire_spin_lock) {
3693  // for single-node execution, and dist mode uses external spin_lock
3694  // instead of executor's internal spin_lock
3695  execute_spin_lock_.clear(std::memory_order_release);
3696  resetInterrupt();
3697  }
3698  }
3699 }
3700 
3702  std::shared_ptr<const query_state::QueryState>& query_state,
3703  const QuerySessionStatus::QueryStatus new_query_status) {
3704  // update the running query session's the current status
3705  if (query_state) {
3706  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3707  auto query_session = query_state->getConstSessionInfo()->get_session_id();
3708  if (query_session.empty()) {
3709  return;
3710  }
3711  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3712  current_query_session_ = query_session;
3714  }
3715  updateQuerySessionStatusWithLock(query_session,
3716  query_state->getQuerySubmittedTime(),
3717  new_query_status,
3718  session_write_lock);
3719  }
3720 }
3721 
3723  const QuerySessionId& query_session,
3724  const std::chrono::time_point<std::chrono::system_clock> submitted,
3725  const QuerySessionStatus::QueryStatus new_query_status) {
3726  // update the running query session's the current status
3727  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3728  if (query_session.empty()) {
3729  return;
3730  }
3731  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3732  current_query_session_ = query_session;
3734  }
3736  query_session, submitted, new_query_status, session_write_lock);
3737 }
3738 
3740  const QuerySessionId& query_session,
3741  const std::string& query_str,
3742  const std::chrono::time_point<std::chrono::system_clock> submitted,
3743  const size_t executor_id,
3744  const QuerySessionStatus::QueryStatus query_session_status) {
3745  // enroll the query session into the Executor's session map
3746  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3747  if (query_session.empty()) {
3748  return;
3749  }
3750  addToQuerySessionList(query_session,
3751  query_str,
3752  submitted,
3753  executor_id,
3754  query_session_status,
3755  session_write_lock);
3756 }
3757 
3759  const QuerySessionId& query_session,
3760  const std::string& query_str,
3761  const std::chrono::time_point<std::chrono::system_clock> submitted,
3762  const size_t executor_id,
3763  const QuerySessionStatus::QueryStatus query_status,
3764  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3765  // an internal API that enrolls the query session into the Executor's session map
3766  auto t_str = ::toString(submitted);
3767  if (queries_session_map_.count(query_session)) {
3768  if (queries_session_map_.at(query_session).count(t_str)) {
3769  queries_session_map_.at(query_session).erase(t_str);
3770  queries_session_map_.at(query_session)
3771  .emplace(t_str,
3773  query_session, executor_id, query_str, submitted, query_status));
3774  } else {
3775  queries_session_map_.at(query_session)
3776  .emplace(t_str,
3778  query_session, executor_id, query_str, submitted, query_status));
3779  }
3780  } else {
3781  std::map<std::string, QuerySessionStatus> executor_per_query_map;
3782  executor_per_query_map.emplace(
3783  t_str,
3785  query_session, executor_id, query_str, submitted, query_status));
3786  queries_session_map_.emplace(query_session, executor_per_query_map);
3787  }
3788  return queries_interrupt_flag_.emplace(query_session, false).second;
3789 }
3790 
3792  const QuerySessionId& query_session,
3793  const std::chrono::time_point<std::chrono::system_clock> submitted,
3794  const QuerySessionStatus::QueryStatus updated_query_status,
3795  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3796  // an internal API that updates query session status
3797  if (query_session.empty()) {
3798  return false;
3799  }
3800  auto t_str = ::toString(submitted);
3801  if (queries_session_map_.count(query_session)) {
3802  for (auto& query_status : queries_session_map_.at(query_session)) {
3803  auto target_submitted_t_str =
3804  ::toString(query_status.second.getQuerySubmittedTime());
3805  // no time difference --> found the target query status
3806  if (t_str.compare(target_submitted_t_str) == 0) {
3807  auto prev_status = query_status.second.getQueryStatus();
3808  if (prev_status == updated_query_status) {
3809  return false;
3810  }
3811  query_status.second.setQueryStatus(updated_query_status);
3812  return true;
3813  }
3814  }
3815  }
3816  return false;
3817 }
3818 
3820  const QuerySessionId& query_session,
3821  const std::chrono::time_point<std::chrono::system_clock> submitted,
3822  const size_t executor_id,
3823  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3824  // update the executor id of the query session
3825  if (query_session.empty()) {
3826  return false;
3827  }
3828  auto t_str = ::toString(submitted);
3829  if (queries_session_map_.count(query_session)) {
3830  auto storage = queries_session_map_.at(query_session);
3831  for (auto it = storage.begin(); it != storage.end(); it++) {
3832  auto target_submitted_t_str = ::toString(it->second.getQuerySubmittedTime());
3833  // no time difference --> found the target query status
3834  if (t_str.compare(target_submitted_t_str) == 0) {
3835  queries_session_map_.at(query_session).at(t_str).setExecutorId(executor_id);
3836  return true;
3837  }
3838  }
3839  }
3840  return false;
3841 }
3842 
3844  const QuerySessionId& query_session,
3845  const std::chrono::time_point<std::chrono::system_clock> submitted,
3846  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3847  if (query_session.empty()) {
3848  return false;
3849  }
3850  auto t_str = ::toString(submitted);
3851  if (queries_session_map_.count(query_session)) {
3852  auto& storage = queries_session_map_.at(query_session);
3853  if (storage.size() > 1) {
3854  // in this case we only remove query executor info
3855  for (auto it = storage.begin(); it != storage.end(); it++) {
3856  auto target_submitted_t_str = ::toString(it->second.getQuerySubmittedTime());
3857  // no time difference && have the same executor id--> found the target query
3858  if (it->second.getExecutorId() == executor_id_ &&
3859  t_str.compare(target_submitted_t_str) == 0) {
3860  storage.erase(it);
3861  return true;
3862  }
3863  }
3864  } else if (storage.size() == 1) {
3865  // here this session only has a single query executor
3866  // so we clear both executor info and its interrupt flag
3867  queries_session_map_.erase(query_session);
3868  queries_interrupt_flag_.erase(query_session);
3869  if (interrupted_.load()) {
3870  interrupted_.store(false);
3871  VLOG(1) << "RESET Executor " << this << " that had previously been interrupted";
3872  }
3873  return true;
3874  }
3875  }
3876  return false;
3877 }
3878 
3880  const QuerySessionId& query_session,
3881  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3882  if (query_session.empty()) {
3883  return;
3884  }
3885  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
3886  queries_interrupt_flag_[query_session] = true;
3887  }
3888 }
3889 
3891  const QuerySessionId& query_session,
3892  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3893  auto flag_it = queries_interrupt_flag_.find(query_session);
3894  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
3895  flag_it->second;
3896 }
3897 
3899  const QuerySessionId& query_session,
3900  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3901  return !query_session.empty() && queries_session_map_.count(query_session);
3902 }
3903 
3905  const double runtime_query_check_freq,
3906  const unsigned pending_query_check_freq) const {
3907  // The only one scenario that we intentionally call this function is
3908  // to allow runtime query interrupt in QueryRunner for test cases.
3909  // Because test machine's default setting does not allow runtime query interrupt,
3910  // so we have to turn it on within test code if necessary.
3912  g_pending_query_interrupt_freq = pending_query_check_freq;
3913  g_running_query_interrupt_freq = runtime_query_check_freq;
3916  }
3917 }
3918 
3919 void Executor::addToCardinalityCache(const std::string& cache_key,
3920  const size_t cache_value) {
3922  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
3923  cardinality_cache_[cache_key] = cache_value;
3924  VLOG(1) << "Put estimated cardinality to the cache";
3925  }
3926 }
3927 
3929  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
3931  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
3932  VLOG(1) << "Reuse cached cardinality";
3933  return {true, cardinality_cache_[cache_key]};
3934  }
3935  return {false, -1};
3936 }
3937 
3938 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
3939  const QuerySessionId& query_session,
3940  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3941  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
3942  auto& query_infos = queries_session_map_.at(query_session);
3943  std::vector<QuerySessionStatus> ret;
3944  for (auto& info : query_infos) {
3945  ret.push_back(QuerySessionStatus(query_session,
3946  info.second.getExecutorId(),
3947  info.second.getQueryStr(),
3948  info.second.getQuerySubmittedTime(),
3949  info.second.getQueryStatus()));
3950  }
3951  return ret;
3952  }
3953  return {};
3954 }
3955 
3956 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
3957 
3958 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
3959 // current running query's session ID
3960 std::string Executor::current_query_session_{""};
3961 // running executor's id
3963 // contain the interrupt flag's status per query session
3965 // contain a list of queries per query session
3967 // session lock
3969 
3972 
3976 std::atomic<bool> Executor::interrupted_{false};
3977 
3978 std::mutex Executor::compilation_mutex_;
3979 std::mutex Executor::kernel_mutex_;
3980 
3982 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr *> &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1756
int32_t getIdOfString(const std::string &str) const
const std::string debug_dir_
Definition: Execute.h:1028
bool is_agg(const Analyzer::Expr *expr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr *> &target_exprs) const
Definition: Execute.cpp:332
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:2258
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1047
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3530
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1989
double g_running_query_interrupt_freq
Definition: Execute.cpp:113
bool g_enable_smem_group_by
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3819
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:90
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1963
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2249
static QuerySessionMap queries_session_map_
Definition: Execute.h:1055
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1062
bool is_time() const
Definition: sqltypes.h:484
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
CurrentQueryStatus attachExecutorToQuerySession(std::shared_ptr< const query_state::QueryState > &query_state)
Definition: Execute.cpp:3626
int64_t kernel_queue_time_ms_
Definition: Execute.h:1035
JoinType
Definition: sqldefs.h:108
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:943
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:98
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2898
int64_t compilation_queue_time_ms_
Definition: Execute.h:1036
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:263
double g_bump_allocator_step_reduction
Definition: Execute.cpp:107
bool is_string() const
Definition: sqltypes.h:478
bool is_boolean() const
Definition: sqltypes.h:485
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3620
DEVICE void sort(ARGS &&... args)
Definition: gpu_enabled.h:105
int get_column_id() const
Definition: Analyzer.h:195
const ChunkMetadataMap & getChunkMetadataMap() const
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:3653
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1225
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1079
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3758
void setEntryCount(const size_t val)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1150
bool g_strip_join_covered_quals
Definition: Execute.cpp:97
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const QueryHint &query_hint)
Definition: Execute.cpp:3119
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_integer() const
Definition: sqltypes.h:480
bool g_enable_direct_columnarization
Definition: Execute.cpp:108
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3304
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1084
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, bool acquire_spin_lock)
Definition: Execute.cpp:3679
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1641
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1030
static const int max_gpu_count
Definition: Execute.h:1006
GpuSharedMemoryContext gpu_smem_context
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
const std::optional< bool > union_all
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:192
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:112
int64_t float_to_double_bin(int32_t val, bool nullable=false)
static const size_t code_cache_size
Definition: Execute.h:1023
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1058
bool g_enable_columnar_output
Definition: Execute.cpp:93
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:896
InputSourceType getSourceType() const
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:79
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1063
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:3054
int getTableId() const
unsigned gridSize() const
Definition: Execute.cpp:3159
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:385
Definition: sqldefs.h:36
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr *> &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2698
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1197
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3256
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3577
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:241
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:274
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
OutVecOwner(const std::vector< int64_t *> &out_vec)
Definition: Execute.cpp:2686
#define CHECK_GE(x, y)
Definition: Logger.h:210
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:658
Definition: sqldefs.h:49
Definition: sqldefs.h:30
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3612
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:92
static const int32_t ERR_GEOS
Definition: Execute.h:1085
std::ostream & operator<<(std::ostream &os, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1276
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:156
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1045
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1011
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:988
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3186
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool g_enable_experimental_string_functions
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1054
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:83
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1010
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:282
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:525
static void invalidateCaches()
int64_t getAggInitValForIndex(const size_t index) const
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2645
bool g_is_test_env
Definition: Execute.cpp:125
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3225
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
int getDeviceCount() const
Definition: CudaMgr.h:86
static std::mutex kernel_mutex_
Definition: Execute.h:1088
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
bool is_decimal() const
Definition: sqltypes.h:481
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3938
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1892
std::vector< FragmentsPerTable > FragmentsList
virtual ReductionCode codegen() const
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:361
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3599
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:94
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:178
int8_t get_min_byte_width()
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:302
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:85
const Executor * getExecutor() const
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:218
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3190
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const size_t high_scan_limit
Definition: Execute.h:461
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:122
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:854
void run(Executor *executor, SharedKernelContext &shared_context)
CodeCache gpu_code_cache_
Definition: Execute.h:1019
Definition: sqldefs.h:73
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
std::unique_ptr< QueryMemoryInitializer > query_buffers_
const InputDescriptor & getScanDesc() const
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3064
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3098
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3603
bool g_null_div_by_zero
Definition: Execute.cpp:82
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3898
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1083
const ExecutorId executor_id_
Definition: Execute.h:1031
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:80
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:1027
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1798
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1071
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
static SysCatalog & instance()
Definition: SysCatalog.h:291
const bool allow_multifrag
std::string cat(Ts &&... args)
bool g_from_table_reordering
Definition: Execute.cpp:84
const bool just_validate
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:162
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:95
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:3904
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2450
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1042
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:216
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2351
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
std::mutex str_dict_mutex_
Definition: Execute.h:1014
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1032
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:392
#define CHECK_NE(x, y)
Definition: Logger.h:206
const bool with_dynamic_watchdog
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3879
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:218
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1075
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:290
const JoinQualsPerNestingLevel join_quals
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
int getNestLevel() const
std::shared_timed_mutex mapd_shared_mutex
const std::string debug_file_
Definition: Execute.h:1029
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:641
SortAlgorithm
const double gpu_input_mem_limit_percent
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:3928
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:1004
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:278
friend class QueryMemoryDescriptor
Definition: Execute.h:1098
ExecutorDeviceType getDeviceType() const
ExecutorExplainType explain_type
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1687
size_t g_big_group_threshold
Definition: Execute.cpp:99
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1077
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1068
float g_filter_push_down_high_frac
Definition: Execute.cpp:91
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1053
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1003
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1003
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3791
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1078
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3843
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:3198
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:968
size_t g_max_memory_allocation_size
Definition: Execute.cpp:102
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:684
size_t g_approx_quantile_buffer
Definition: Execute.cpp:128
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1104
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const unsigned block_size_x_
Definition: Execute.h:1025
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const unsigned grid_size_x_
Definition: Execute.h:1026
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
quantile::TDigest * newTDigest()
Definition: Execute.cpp:241
specifies the content in-memory of a row in the column metadata table
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:96
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1084
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1057
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:304
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1072
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:404
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1326
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1439
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:298
bool isCPUOnly() const
Definition: Execute.cpp:252
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
ExecutorDeviceType device_type
bool g_enable_window_functions
Definition: Execute.cpp:100
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:3739
int8_t groupColWidth(const size_t key_idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1210
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments *> &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition)
Definition: Execute.cpp:2145
const std::vector< size_t > outer_fragment_indices
size_t g_min_memory_allocation_size
Definition: Execute.cpp:103
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:651
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:51
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:868
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:252
static QuerySessionId current_query_session_
Definition: Execute.h:1049
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:85
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2600
size_t ExecutorId
Definition: Execute.h:374
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:379
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:1044
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:119
CodeCache cpu_code_cache_
Definition: Execute.h:1018
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:313
bool checkIsQuerySessionInterrupted(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3890
int get_table_id() const
Definition: Analyzer.h:194
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3339
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Definition: sqldefs.h:76
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2326
int8_t warpSize() const
Definition: Execute.cpp:3150
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
std::vector< std::string > expr_container_to_string(const std::list< Analyzer::OrderEntry > &expr_container)
Definition: Execute.cpp:1188
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus > > QuerySessionMap
Definition: Execute.h:145
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1842
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:206
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3556
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:79
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:482
void updateQuerySessionStatus(std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:3701
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1913
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3079
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2891
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3168
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3497
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2614
TableGenerations table_generations_
Definition: Execute.h:1046
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:540
ThreadId thread_id()
Definition: Logger.cpp:732
bool g_cache_string_hash
void resetInterrupt()
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1096
bool g_enable_filter_push_down
Definition: Execute.cpp:89
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:111
bool g_enable_bump_allocator
Definition: Execute.cpp:106
size_t getRunningExecutorId(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3608
std::string QuerySessionId
Definition: Execute.h:78
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:3919
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
static std::atomic< bool > interrupted_
Definition: Execute.h:1012
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
int32_t getOrAddTransient(const std::string &str)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:1073
uint64_t exp_to_scale(const unsigned exp)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:1087
bool g_allow_cpu_retry
Definition: Execute.cpp:81
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
bool g_enable_watchdog
Definition: Execute.cpp:76
Definition: sqldefs.h:33
size_t g_approx_quantile_centroids
Definition: Execute.cpp:129
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
static size_t running_query_executor_id_
Definition: Execute.h:1051
DEVICE void swap(ARGS &&... args)
Definition: gpu_enabled.h:114
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1413
static bool typeSupportsRange(const SQLTypeInfo &ti)
size_t getColOffInBytes(const size_t col_idx) const
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1666
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
Definition: Execute.cpp:2277
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:928
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:137
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void > > checked_int64_t
int8_t * getUnderlyingBuffer() const
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:409
mapd_unique_lock< mapd_shared_mutex > write_lock
SQLTypeInfo columnType
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1570
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:294
const bool allow_runtime_query_interrupt
const std::vector< DeviceProperties > & getAllDeviceProperties() const
Definition: CudaMgr.h:120
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
Definition: Execute.h:980
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:257
const size_t offset
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2187
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:80
QueryDescriptionType getQueryDescriptionType() const
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
DEVICE auto accumulate(ARGS &&... args)
Definition: gpu_enabled.h:42
bool g_use_tbb_pool
Definition: Execute.cpp:78
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:533
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1702
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:203
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1346
ExpressionRange getColRange(const PhysicalInput &) const
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1978
HashType
Definition: HashTable.h:19
#define IS_GEO(T)
Definition: sqltypes.h:242
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:110
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1067
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
Functions to support array operations used by the executor.
bool is_number() const
Definition: sqltypes.h:483
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const bool with_watchdog
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1009
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3589
void clearMetaInfoCache()
Definition: Execute.cpp:373
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:187
bool g_enable_table_functions
Definition: Execute.cpp:101
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
bool is_fp() const
Definition: sqltypes.h:482
const TableGeneration & getGeneration(const uint32_t id) const
const TemporaryTables * temporary_tables_
Definition: Execute.h:1033
unsigned blockSize() const
Definition: Execute.cpp:3176
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:375
size_t g_gpu_smem_threshold
Definition: Execute.cpp:114
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1662
const Expr * get_left_operand() const
Definition: Analyzer.h:442
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)