OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "CodeGenerator.h"
21 #include "ColumnFetcher.h"
24 #include "DynamicWatchdog.h"
25 #include "EquiJoinCondition.h"
26 #include "ErrorHandling.h"
27 #include "ExpressionRewrite.h"
29 #include "GpuMemUtils.h"
30 #include "InPlaceSort.h"
33 #include "JsonAccessors.h"
36 #include "QueryRewrite.h"
37 #include "QueryTemplateGenerator.h"
38 #include "ResultSetReductionJIT.h"
39 #include "RuntimeFunctions.h"
40 #include "SpeculativeTopN.h"
41 
44 
45 #include "CudaMgr/CudaMgr.h"
47 #include "Parser/ParserNode.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/misc.h"
53 #include "Shared/scope.h"
54 #include "Shared/shard_key.h"
55 #include "Shared/threadpool.h"
56 
57 #include "AggregatedColRange.h"
59 
60 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
61 #include <boost/filesystem/operations.hpp>
62 #include <boost/filesystem/path.hpp>
63 
64 #ifdef HAVE_CUDA
65 #include <cuda.h>
66 #endif // HAVE_CUDA
67 #include <chrono>
68 #include <ctime>
69 #include <future>
70 #include <iostream>
71 #include <memory>
72 #include <numeric>
73 #include <set>
74 #include <thread>
75 
76 bool g_enable_watchdog{false};
78 bool g_use_tbb_pool{false};
81 bool g_allow_cpu_retry{true};
82 bool g_null_div_by_zero{false};
86 extern bool g_enable_smem_group_by;
87 extern std::unique_ptr<llvm::Module> udf_gpu_module;
88 extern std::unique_ptr<llvm::Module> udf_cpu_module;
98 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
102 size_t g_big_group_threshold{20000};
105 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
107  256}; // minimum memory allocation required for projection query output buffer
108  // without pre-flight count
120  4096}; // GPU shared memory threshold (in bytes), if larger
121  // buffer sizes are required we do not use GPU shared
122  // memory optimizations Setting this to 0 means unlimited
123  // (subject to other dynamically calculated caps)
125  true}; // enable use of shared memory when performing group-by with select non-count
126  // aggregates
128  true}; // enable optimizations for using GPU shared memory in implementation of
129  // non-grouped aggregates
130 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
131  // limits the allocation for the output buffer arena
132 
135 
137 
138 extern bool g_cache_string_hash;
139 
140 int const Executor::max_gpu_count;
141 
143 
144 Executor::Executor(const ExecutorId executor_id,
145  const size_t block_size_x,
146  const size_t grid_size_x,
147  const size_t max_gpu_slab_size,
148  const std::string& debug_dir,
149  const std::string& debug_file)
150  : cgen_state_(new CgenState({}, false))
151  , cpu_code_cache_(code_cache_size)
152  , gpu_code_cache_(code_cache_size)
153  , block_size_x_(block_size_x)
154  , grid_size_x_(grid_size_x)
155  , max_gpu_slab_size_(max_gpu_slab_size)
156  , debug_dir_(debug_dir)
157  , debug_file_(debug_file)
158  , executor_id_(executor_id)
159  , catalog_(nullptr)
160  , temporary_tables_(nullptr)
162 
163 std::shared_ptr<Executor> Executor::getExecutor(
164  const ExecutorId executor_id,
165  const std::string& debug_dir,
166  const std::string& debug_file,
167  const SystemParameters& system_parameters) {
168  INJECT_TIMER(getExecutor);
169 
170  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
171  auto it = executors_.find(executor_id);
172  if (it != executors_.end()) {
173  return it->second;
174  }
175  auto executor = std::make_shared<Executor>(executor_id,
176  system_parameters.cuda_block_size,
177  system_parameters.cuda_grid_size,
178  system_parameters.max_gpu_slab_size,
179  debug_dir,
180  debug_file);
181  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
182  return executor;
183 }
184 
186  switch (memory_level) {
189  mapd_unique_lock<mapd_shared_mutex> flush_lock(
190  execute_mutex_); // Don't flush memory while queries are running
191 
193  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
194  // The hash table cache uses CPU memory not managed by the buffer manager. In the
195  // future, we should manage these allocations with the buffer manager directly.
196  // For now, assume the user wants to purge the hash table cache when they clear
197  // CPU memory (currently used in ExecuteTest to lower memory pressure)
199  }
200  break;
201  }
202  default: {
203  throw std::runtime_error(
204  "Clearing memory levels other than the CPU level or GPU level is not "
205  "supported.");
206  }
207  }
208 }
209 
211  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
212 }
213 
215  const int dict_id_in,
216  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
217  const bool with_generation) const {
218  CHECK(row_set_mem_owner);
219  std::lock_guard<std::mutex> lock(
220  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
221  return row_set_mem_owner->getOrAddStringDictProxy(
222  dict_id_in, with_generation, catalog_);
223 }
224 
226  const int dict_id_in,
227  const bool with_generation,
228  const Catalog_Namespace::Catalog* catalog) {
229  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
230  CHECK(catalog);
231  const auto dd = catalog->getMetadataForDict(dict_id);
232  if (dd) {
233  CHECK(dd->stringDict);
234  CHECK_LE(dd->dictNBits, 32);
235  const int64_t generation =
236  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
237  return addStringDict(dd->stringDict, dict_id, generation);
238  }
239  CHECK_EQ(0, dict_id);
240  if (!lit_str_dict_proxy_) {
241  std::shared_ptr<StringDictionary> tsd =
242  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
243  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
244  }
245  return lit_str_dict_proxy_.get();
246 }
247 
249  std::lock_guard<std::mutex> lock(state_mutex_);
250  return t_digests_
251  .emplace_back(std::make_unique<quantile::TDigest>(
253  .get();
254 }
255 
256 bool Executor::isCPUOnly() const {
257  CHECK(catalog_);
258  return !catalog_->getDataMgr().getCudaMgr();
259 }
260 
262  const Analyzer::ColumnVar* col_var) const {
264  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
265 }
266 
268  const Analyzer::ColumnVar* col_var,
269  int n) const {
270  const auto cd = getColumnDescriptor(col_var);
271  if (!cd || n > cd->columnType.get_physical_cols()) {
272  return nullptr;
273  }
275  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
276 }
277 
279  return catalog_;
280 }
281 
283  catalog_ = catalog;
284 }
285 
286 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
287  return row_set_mem_owner_;
288 }
289 
291  return temporary_tables_;
292 }
293 
295  return input_table_info_cache_.getTableInfo(table_id);
296 }
297 
298 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
299  return table_generations_.getGeneration(table_id);
300 }
301 
303  return agg_col_range_cache_.getColRange(phys_input);
304 }
305 
306 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
307  size_t num_bytes = 0;
308  if (!plan_state_) {
309  return 0;
310  }
311  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
312  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
313  continue;
314  }
315 
316  if (fetched_col_pair.first < 0) {
317  num_bytes += 8;
318  } else {
319  const auto cd =
320  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
321  const auto& ti = cd->columnType;
322  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
323  ? 4
324  : ti.get_size();
325  if (sz < 0) {
326  // for varlen types, only account for the pointer/size for each row, for now
327  num_bytes += 16;
328  } else {
329  num_bytes += sz;
330  }
331  }
332  }
333  return num_bytes;
334 }
335 
336 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
337  const std::vector<Analyzer::Expr*>& target_exprs) const {
338  CHECK(plan_state_);
339  CHECK(catalog_);
340  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
341  for (const auto target_expr : target_exprs) {
342  if (!plan_state_->isLazyFetchColumn(target_expr)) {
343  col_lazy_fetch_info.emplace_back(
344  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
345  } else {
346  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
347  CHECK(col_var);
348  auto col_id = col_var->get_column_id();
349  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
350  auto cd = (col_var->get_table_id() > 0)
351  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
352  : nullptr;
353  if (cd && IS_GEO(cd->columnType.get_type())) {
354  // Geo coords cols will be processed in sequence. So we only need to track the
355  // first coords col in lazy fetch info.
356  {
357  auto cd0 =
358  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
359  auto col0_ti = cd0->columnType;
360  CHECK(!cd0->isVirtualCol);
361  auto col0_var = makeExpr<Analyzer::ColumnVar>(
362  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
363  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
364  col_lazy_fetch_info.emplace_back(
365  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
366  }
367  } else {
368  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
369  const auto& col_ti = col_var->get_type_info();
370  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
371  }
372  }
373  }
374  return col_lazy_fetch_info;
375 }
376 
378  input_table_info_cache_.clear();
379  agg_col_range_cache_.clear();
380  table_generations_.clear();
381 }
382 
383 std::vector<int8_t> Executor::serializeLiterals(
384  const std::unordered_map<int, CgenState::LiteralValues>& literals,
385  const int device_id) {
386  if (literals.empty()) {
387  return {};
388  }
389  const auto dev_literals_it = literals.find(device_id);
390  CHECK(dev_literals_it != literals.end());
391  const auto& dev_literals = dev_literals_it->second;
392  size_t lit_buf_size{0};
393  std::vector<std::string> real_strings;
394  std::vector<std::vector<double>> double_array_literals;
395  std::vector<std::vector<int8_t>> align64_int8_array_literals;
396  std::vector<std::vector<int32_t>> int32_array_literals;
397  std::vector<std::vector<int8_t>> align32_int8_array_literals;
398  std::vector<std::vector<int8_t>> int8_array_literals;
399  for (const auto& lit : dev_literals) {
400  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
401  if (lit.which() == 7) {
402  const auto p = boost::get<std::string>(&lit);
403  CHECK(p);
404  real_strings.push_back(*p);
405  } else if (lit.which() == 8) {
406  const auto p = boost::get<std::vector<double>>(&lit);
407  CHECK(p);
408  double_array_literals.push_back(*p);
409  } else if (lit.which() == 9) {
410  const auto p = boost::get<std::vector<int32_t>>(&lit);
411  CHECK(p);
412  int32_array_literals.push_back(*p);
413  } else if (lit.which() == 10) {
414  const auto p = boost::get<std::vector<int8_t>>(&lit);
415  CHECK(p);
416  int8_array_literals.push_back(*p);
417  } else if (lit.which() == 11) {
418  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
419  CHECK(p);
420  if (p->second == 64) {
421  align64_int8_array_literals.push_back(p->first);
422  } else if (p->second == 32) {
423  align32_int8_array_literals.push_back(p->first);
424  } else {
425  CHECK(false);
426  }
427  }
428  }
429  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
430  throw TooManyLiterals();
431  }
432  int16_t crt_real_str_off = lit_buf_size;
433  for (const auto& real_str : real_strings) {
434  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
435  lit_buf_size += real_str.size();
436  }
437  if (double_array_literals.size() > 0) {
438  lit_buf_size = align(lit_buf_size, sizeof(double));
439  }
440  int16_t crt_double_arr_lit_off = lit_buf_size;
441  for (const auto& double_array_literal : double_array_literals) {
442  CHECK_LE(double_array_literal.size(),
443  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
444  lit_buf_size += double_array_literal.size() * sizeof(double);
445  }
446  if (align64_int8_array_literals.size() > 0) {
447  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
448  }
449  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
450  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
451  CHECK_LE(align64_int8_array_literals.size(),
452  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
453  lit_buf_size += align64_int8_array_literal.size();
454  }
455  if (int32_array_literals.size() > 0) {
456  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
457  }
458  int16_t crt_int32_arr_lit_off = lit_buf_size;
459  for (const auto& int32_array_literal : int32_array_literals) {
460  CHECK_LE(int32_array_literal.size(),
461  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
462  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
463  }
464  if (align32_int8_array_literals.size() > 0) {
465  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
466  }
467  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
468  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
469  CHECK_LE(align32_int8_array_literals.size(),
470  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
471  lit_buf_size += align32_int8_array_literal.size();
472  }
473  int16_t crt_int8_arr_lit_off = lit_buf_size;
474  for (const auto& int8_array_literal : int8_array_literals) {
475  CHECK_LE(int8_array_literal.size(),
476  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
477  lit_buf_size += int8_array_literal.size();
478  }
479  unsigned crt_real_str_idx = 0;
480  unsigned crt_double_arr_lit_idx = 0;
481  unsigned crt_align64_int8_arr_lit_idx = 0;
482  unsigned crt_int32_arr_lit_idx = 0;
483  unsigned crt_align32_int8_arr_lit_idx = 0;
484  unsigned crt_int8_arr_lit_idx = 0;
485  std::vector<int8_t> serialized(lit_buf_size);
486  size_t off{0};
487  for (const auto& lit : dev_literals) {
488  const auto lit_bytes = CgenState::literalBytes(lit);
489  off = CgenState::addAligned(off, lit_bytes);
490  switch (lit.which()) {
491  case 0: {
492  const auto p = boost::get<int8_t>(&lit);
493  CHECK(p);
494  serialized[off - lit_bytes] = *p;
495  break;
496  }
497  case 1: {
498  const auto p = boost::get<int16_t>(&lit);
499  CHECK(p);
500  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
501  break;
502  }
503  case 2: {
504  const auto p = boost::get<int32_t>(&lit);
505  CHECK(p);
506  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
507  break;
508  }
509  case 3: {
510  const auto p = boost::get<int64_t>(&lit);
511  CHECK(p);
512  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
513  break;
514  }
515  case 4: {
516  const auto p = boost::get<float>(&lit);
517  CHECK(p);
518  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
519  break;
520  }
521  case 5: {
522  const auto p = boost::get<double>(&lit);
523  CHECK(p);
524  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
525  break;
526  }
527  case 6: {
528  const auto p = boost::get<std::pair<std::string, int>>(&lit);
529  CHECK(p);
530  const auto str_id =
532  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
533  ->getOrAddTransient(p->first)
534  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
535  ->getIdOfString(p->first);
536  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
537  break;
538  }
539  case 7: {
540  const auto p = boost::get<std::string>(&lit);
541  CHECK(p);
542  int32_t off_and_len = crt_real_str_off << 16;
543  const auto& crt_real_str = real_strings[crt_real_str_idx];
544  off_and_len |= static_cast<int16_t>(crt_real_str.size());
545  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
546  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
547  ++crt_real_str_idx;
548  crt_real_str_off += crt_real_str.size();
549  break;
550  }
551  case 8: {
552  const auto p = boost::get<std::vector<double>>(&lit);
553  CHECK(p);
554  int32_t off_and_len = crt_double_arr_lit_off << 16;
555  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
556  int32_t len = crt_double_arr_lit.size();
557  CHECK_EQ((len >> 16), 0);
558  off_and_len |= static_cast<int16_t>(len);
559  int32_t double_array_bytesize = len * sizeof(double);
560  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
561  memcpy(&serialized[crt_double_arr_lit_off],
562  crt_double_arr_lit.data(),
563  double_array_bytesize);
564  ++crt_double_arr_lit_idx;
565  crt_double_arr_lit_off += double_array_bytesize;
566  break;
567  }
568  case 9: {
569  const auto p = boost::get<std::vector<int32_t>>(&lit);
570  CHECK(p);
571  int32_t off_and_len = crt_int32_arr_lit_off << 16;
572  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
573  int32_t len = crt_int32_arr_lit.size();
574  CHECK_EQ((len >> 16), 0);
575  off_and_len |= static_cast<int16_t>(len);
576  int32_t int32_array_bytesize = len * sizeof(int32_t);
577  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
578  memcpy(&serialized[crt_int32_arr_lit_off],
579  crt_int32_arr_lit.data(),
580  int32_array_bytesize);
581  ++crt_int32_arr_lit_idx;
582  crt_int32_arr_lit_off += int32_array_bytesize;
583  break;
584  }
585  case 10: {
586  const auto p = boost::get<std::vector<int8_t>>(&lit);
587  CHECK(p);
588  int32_t off_and_len = crt_int8_arr_lit_off << 16;
589  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
590  int32_t len = crt_int8_arr_lit.size();
591  CHECK_EQ((len >> 16), 0);
592  off_and_len |= static_cast<int16_t>(len);
593  int32_t int8_array_bytesize = len;
594  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
595  memcpy(&serialized[crt_int8_arr_lit_off],
596  crt_int8_arr_lit.data(),
597  int8_array_bytesize);
598  ++crt_int8_arr_lit_idx;
599  crt_int8_arr_lit_off += int8_array_bytesize;
600  break;
601  }
602  case 11: {
603  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
604  CHECK(p);
605  if (p->second == 64) {
606  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
607  const auto& crt_align64_int8_arr_lit =
608  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
609  int32_t len = crt_align64_int8_arr_lit.size();
610  CHECK_EQ((len >> 16), 0);
611  off_and_len |= static_cast<int16_t>(len);
612  int32_t align64_int8_array_bytesize = len;
613  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
614  memcpy(&serialized[crt_align64_int8_arr_lit_off],
615  crt_align64_int8_arr_lit.data(),
616  align64_int8_array_bytesize);
617  ++crt_align64_int8_arr_lit_idx;
618  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
619  } else if (p->second == 32) {
620  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
621  const auto& crt_align32_int8_arr_lit =
622  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
623  int32_t len = crt_align32_int8_arr_lit.size();
624  CHECK_EQ((len >> 16), 0);
625  off_and_len |= static_cast<int16_t>(len);
626  int32_t align32_int8_array_bytesize = len;
627  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
628  memcpy(&serialized[crt_align32_int8_arr_lit_off],
629  crt_align32_int8_arr_lit.data(),
630  align32_int8_array_bytesize);
631  ++crt_align32_int8_arr_lit_idx;
632  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
633  } else {
634  CHECK(false);
635  }
636  break;
637  }
638  default:
639  CHECK(false);
640  }
641  }
642  return serialized;
643 }
644 
645 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
646  if (device_type == ExecutorDeviceType::GPU) {
647  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
648  CHECK(cuda_mgr);
649  return cuda_mgr->getDeviceCount();
650  } else {
651  return 1;
652  }
653 }
654 
656  const Data_Namespace::MemoryLevel memory_level) const {
657  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
658  : deviceCount(ExecutorDeviceType::CPU);
659 }
660 
661 // TODO(alex): remove or split
662 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
663  const SQLTypeInfo& ti,
664  const int64_t agg_init_val,
665  const int8_t out_byte_width,
666  const int64_t* out_vec,
667  const size_t out_vec_sz,
668  const bool is_group_by,
669  const bool float_argument_input) {
670  switch (agg) {
671  case kAVG:
672  case kSUM:
673  if (0 != agg_init_val) {
674  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
675  int64_t agg_result = agg_init_val;
676  for (size_t i = 0; i < out_vec_sz; ++i) {
677  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
678  }
679  return {agg_result, 0};
680  } else {
681  CHECK(ti.is_fp());
682  switch (out_byte_width) {
683  case 4: {
684  int agg_result = static_cast<int32_t>(agg_init_val);
685  for (size_t i = 0; i < out_vec_sz; ++i) {
687  &agg_result,
688  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
689  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
690  }
691  const int64_t converted_bin =
692  float_argument_input
693  ? static_cast<int64_t>(agg_result)
694  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
695  return {converted_bin, 0};
696  break;
697  }
698  case 8: {
699  int64_t agg_result = agg_init_val;
700  for (size_t i = 0; i < out_vec_sz; ++i) {
702  &agg_result,
703  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
704  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
705  }
706  return {agg_result, 0};
707  break;
708  }
709  default:
710  CHECK(false);
711  }
712  }
713  }
714  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
715  int64_t agg_result = 0;
716  for (size_t i = 0; i < out_vec_sz; ++i) {
717  agg_result += out_vec[i];
718  }
719  return {agg_result, 0};
720  } else {
721  CHECK(ti.is_fp());
722  switch (out_byte_width) {
723  case 4: {
724  float r = 0.;
725  for (size_t i = 0; i < out_vec_sz; ++i) {
726  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
727  }
728  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
729  const int64_t converted_bin =
730  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
731  return {converted_bin, 0};
732  }
733  case 8: {
734  double r = 0.;
735  for (size_t i = 0; i < out_vec_sz; ++i) {
736  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
737  }
738  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
739  }
740  default:
741  CHECK(false);
742  }
743  }
744  break;
745  case kCOUNT: {
746  uint64_t agg_result = 0;
747  for (size_t i = 0; i < out_vec_sz; ++i) {
748  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
749  agg_result += out;
750  }
751  return {static_cast<int64_t>(agg_result), 0};
752  }
753  case kMIN: {
754  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
755  int64_t agg_result = agg_init_val;
756  for (size_t i = 0; i < out_vec_sz; ++i) {
757  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
758  }
759  return {agg_result, 0};
760  } else {
761  switch (out_byte_width) {
762  case 4: {
763  int32_t agg_result = static_cast<int32_t>(agg_init_val);
764  for (size_t i = 0; i < out_vec_sz; ++i) {
766  &agg_result,
767  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
768  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
769  }
770  const int64_t converted_bin =
771  float_argument_input
772  ? static_cast<int64_t>(agg_result)
773  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
774  return {converted_bin, 0};
775  }
776  case 8: {
777  int64_t agg_result = agg_init_val;
778  for (size_t i = 0; i < out_vec_sz; ++i) {
780  &agg_result,
781  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
782  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
783  }
784  return {agg_result, 0};
785  }
786  default:
787  CHECK(false);
788  }
789  }
790  }
791  case kMAX:
792  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
793  int64_t agg_result = agg_init_val;
794  for (size_t i = 0; i < out_vec_sz; ++i) {
795  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
796  }
797  return {agg_result, 0};
798  } else {
799  switch (out_byte_width) {
800  case 4: {
801  int32_t agg_result = static_cast<int32_t>(agg_init_val);
802  for (size_t i = 0; i < out_vec_sz; ++i) {
804  &agg_result,
805  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
806  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
807  }
808  const int64_t converted_bin =
809  float_argument_input ? static_cast<int64_t>(agg_result)
810  : float_to_double_bin(agg_result, !ti.get_notnull());
811  return {converted_bin, 0};
812  }
813  case 8: {
814  int64_t agg_result = agg_init_val;
815  for (size_t i = 0; i < out_vec_sz; ++i) {
817  &agg_result,
818  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
819  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
820  }
821  return {agg_result, 0};
822  }
823  default:
824  CHECK(false);
825  }
826  }
827  case kSINGLE_VALUE: {
828  int64_t agg_result = agg_init_val;
829  for (size_t i = 0; i < out_vec_sz; ++i) {
830  if (out_vec[i] != agg_init_val) {
831  if (agg_result == agg_init_val) {
832  agg_result = out_vec[i];
833  } else if (out_vec[i] != agg_result) {
835  }
836  }
837  }
838  return {agg_result, 0};
839  }
840  case kSAMPLE: {
841  int64_t agg_result = agg_init_val;
842  for (size_t i = 0; i < out_vec_sz; ++i) {
843  if (out_vec[i] != agg_init_val) {
844  agg_result = out_vec[i];
845  break;
846  }
847  }
848  return {agg_result, 0};
849  }
850  default:
851  CHECK(false);
852  }
853  abort();
854 }
855 
856 namespace {
857 
859  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
860  auto& first = results_per_device.front().first;
861  CHECK(first);
862  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
863  const auto& next = results_per_device[dev_idx].first;
864  CHECK(next);
865  first->append(*next);
866  }
867  return std::move(first);
868 }
869 
870 } // namespace
871 
873  const RelAlgExecutionUnit& ra_exe_unit) {
874  auto& results_per_device = shared_context.getFragmentResults();
875  if (results_per_device.empty()) {
876  std::vector<TargetInfo> targets;
877  for (const auto target_expr : ra_exe_unit.target_exprs) {
878  targets.push_back(get_target_info(target_expr, g_bigint_count));
879  }
880  return std::make_shared<ResultSet>(targets,
883  row_set_mem_owner_,
884  catalog_,
885  blockSize(),
886  gridSize());
887  }
888  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
889  std::sort(results_per_device.begin(),
890  results_per_device.end(),
891  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
892  CHECK_GE(lhs.second.size(), size_t(1));
893  CHECK_GE(rhs.second.size(), size_t(1));
894  return lhs.second.front() < rhs.second.front();
895  });
896 
897  return get_merged_result(results_per_device);
898 }
899 
901  const RelAlgExecutionUnit& ra_exe_unit,
902  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
903  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
904  const QueryMemoryDescriptor& query_mem_desc) const {
905  auto timer = DEBUG_TIMER(__func__);
906  if (ra_exe_unit.estimator) {
907  return reduce_estimator_results(ra_exe_unit, results_per_device);
908  }
909 
910  if (results_per_device.empty()) {
911  std::vector<TargetInfo> targets;
912  for (const auto target_expr : ra_exe_unit.target_exprs) {
913  targets.push_back(get_target_info(target_expr, g_bigint_count));
914  }
915  return std::make_shared<ResultSet>(targets,
918  nullptr,
919  catalog_,
920  blockSize(),
921  gridSize());
922  }
923 
924  return reduceMultiDeviceResultSets(
925  results_per_device,
926  row_set_mem_owner,
927  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
928 }
929 
930 namespace {
931 
933  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
934  int64_t* compilation_queue_time) {
935  auto clock_begin = timer_start();
936  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
937  *compilation_queue_time = timer_stop(clock_begin);
938  const auto& this_result_set = results_per_device[0].first;
939  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
940  this_result_set->getTargetInfos(),
941  this_result_set->getTargetInitVals());
942  return reduction_jit.codegen();
943 };
944 
945 } // namespace
946 
948  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
949  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
950  const QueryMemoryDescriptor& query_mem_desc) const {
951  auto timer = DEBUG_TIMER(__func__);
952  std::shared_ptr<ResultSet> reduced_results;
953 
954  const auto& first = results_per_device.front().first;
955 
956  if (query_mem_desc.getQueryDescriptionType() ==
958  results_per_device.size() > 1) {
959  const auto total_entry_count = std::accumulate(
960  results_per_device.begin(),
961  results_per_device.end(),
962  size_t(0),
963  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
964  const auto& r = rs.first;
965  return init + r->getQueryMemDesc().getEntryCount();
966  });
967  CHECK(total_entry_count);
968  auto query_mem_desc = first->getQueryMemDesc();
969  query_mem_desc.setEntryCount(total_entry_count);
970  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
973  row_set_mem_owner,
974  catalog_,
975  blockSize(),
976  gridSize());
977  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
978  reduced_results->initializeStorage();
979  switch (query_mem_desc.getEffectiveKeyWidth()) {
980  case 4:
981  first->getStorage()->moveEntriesToBuffer<int32_t>(
982  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
983  break;
984  case 8:
985  first->getStorage()->moveEntriesToBuffer<int64_t>(
986  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
987  break;
988  default:
989  CHECK(false);
990  }
991  } else {
992  reduced_results = first;
993  }
994 
995  int64_t compilation_queue_time = 0;
996  const auto reduction_code =
997  get_reduction_code(results_per_device, &compilation_queue_time);
998 
999  for (size_t i = 1; i < results_per_device.size(); ++i) {
1000  reduced_results->getStorage()->reduce(
1001  *(results_per_device[i].first->getStorage()), {}, reduction_code);
1002  }
1003  reduced_results->addCompilationQueueTime(compilation_queue_time);
1004  return reduced_results;
1005 }
1006 
1008  const RelAlgExecutionUnit& ra_exe_unit,
1009  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1010  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1011  const QueryMemoryDescriptor& query_mem_desc) const {
1012  if (results_per_device.size() == 1) {
1013  return std::move(results_per_device.front().first);
1014  }
1015  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1017  for (const auto& result : results_per_device) {
1018  auto rows = result.first;
1019  CHECK(rows);
1020  if (!rows) {
1021  continue;
1022  }
1023  SpeculativeTopNMap that(
1024  *rows,
1025  ra_exe_unit.target_exprs,
1026  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1027  m.reduce(that);
1028  }
1029  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1030  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1031  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1032 }
1033 
1034 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
1035  std::unordered_set<int> available_gpus;
1036  if (cat.getDataMgr().gpusPresent()) {
1037  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
1038  CHECK_GT(gpu_count, 0);
1039  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1040  available_gpus.insert(gpu_id);
1041  }
1042  }
1043  return available_gpus;
1044 }
1045 
1046 size_t get_context_count(const ExecutorDeviceType device_type,
1047  const size_t cpu_count,
1048  const size_t gpu_count) {
1049  return device_type == ExecutorDeviceType::GPU ? gpu_count
1050  : static_cast<size_t>(cpu_count);
1051 }
1052 
1053 namespace {
1054 
1055 // Compute a very conservative entry count for the output buffer entry count using no
1056 // other information than the number of tuples in each table and multiplying them
1057 // together.
1058 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1060  // Check for overflows since we're multiplying potentially big table sizes.
1061  using checked_size_t = boost::multiprecision::number<
1062  boost::multiprecision::cpp_int_backend<64,
1063  64,
1064  boost::multiprecision::unsigned_magnitude,
1065  boost::multiprecision::checked,
1066  void>>;
1067  checked_size_t max_groups_buffer_entry_guess = 1;
1068  for (const auto& query_info : query_infos) {
1069  CHECK(!query_info.info.fragments.empty());
1070  auto it = std::max_element(query_info.info.fragments.begin(),
1071  query_info.info.fragments.end(),
1072  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1073  return f1.getNumTuples() < f2.getNumTuples();
1074  });
1075  max_groups_buffer_entry_guess *= it->getNumTuples();
1076  }
1077  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1078  // baseline group layout with that many entries anyway.
1079  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1080  try {
1081  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1082  max_groups_buffer_entry_guess_cap);
1083  } catch (...) {
1084  return max_groups_buffer_entry_guess_cap;
1085  }
1086 }
1087 
1088 std::string get_table_name(const InputDescriptor& input_desc,
1090  const auto source_type = input_desc.getSourceType();
1091  if (source_type == InputSourceType::TABLE) {
1092  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1093  CHECK(td);
1094  return td->tableName;
1095  } else {
1096  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1097  }
1098 }
1099 
1100 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1101  const int device_count) {
1102  if (device_type == ExecutorDeviceType::GPU) {
1103  return device_count * Executor::high_scan_limit;
1104  }
1106 }
1107 
1109  const std::vector<InputTableInfo>& table_infos,
1111  const ExecutorDeviceType device_type,
1112  const int device_count) {
1113  for (const auto target_expr : ra_exe_unit.target_exprs) {
1114  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1115  return;
1116  }
1117  }
1118  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1119  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1120  // Allow a query with no scan limit to run on small tables
1121  return;
1122  }
1123  if (ra_exe_unit.use_bump_allocator) {
1124  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1125  // relative to the size of the input), so we bypass this check for now
1126  return;
1127  }
1128  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1129  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1130  (!ra_exe_unit.scan_limit ||
1131  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1132  std::vector<std::string> table_names;
1133  const auto& input_descs = ra_exe_unit.input_descs;
1134  for (const auto& input_desc : input_descs) {
1135  table_names.push_back(get_table_name(input_desc, cat));
1136  }
1137  if (!ra_exe_unit.scan_limit) {
1138  throw WatchdogException(
1139  "Projection query would require a scan without a limit on table(s): " +
1140  boost::algorithm::join(table_names, ", "));
1141  } else {
1142  throw WatchdogException(
1143  "Projection query output result set on table(s): " +
1144  boost::algorithm::join(table_names, ", ") + " would contain " +
1145  std::to_string(ra_exe_unit.scan_limit) +
1146  " rows, which is more than the current system limit of " +
1147  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1148  }
1149  }
1150 }
1151 
1152 } // namespace
1153 
1154 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1155  const RelAlgExecutionUnit& ra_exe_unit) {
1156  if (ra_exe_unit.input_descs.size() < 2) {
1157  return false;
1158  }
1159 
1160  // We only support loop join at the end of folded joins
1161  // where ra_exe_unit.input_descs.size() > 2 for now.
1162  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1163 
1164  std::optional<size_t> inner_table_idx;
1165  for (size_t i = 0; i < query_infos.size(); ++i) {
1166  if (query_infos[i].table_id == inner_table_id) {
1167  inner_table_idx = i;
1168  break;
1169  }
1170  }
1171  CHECK(inner_table_idx);
1172  return query_infos[*inner_table_idx].info.getNumTuples() <=
1174 }
1175 
1176 namespace {
1177 
1178 template <typename T>
1179 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1180  std::vector<std::string> expr_strs;
1181  for (const auto& expr : expr_container) {
1182  if (!expr) {
1183  expr_strs.emplace_back("NULL");
1184  } else {
1185  expr_strs.emplace_back(expr->toString());
1186  }
1187  }
1188  return expr_strs;
1189 }
1190 
1191 template <>
1192 std::vector<std::string> expr_container_to_string(
1193  const std::list<Analyzer::OrderEntry>& expr_container) {
1194  std::vector<std::string> expr_strs;
1195  for (const auto& expr : expr_container) {
1196  expr_strs.emplace_back(expr.toString());
1197  }
1198  return expr_strs;
1199 }
1200 
1201 std::string join_type_to_string(const JoinType type) {
1202  switch (type) {
1203  case JoinType::INNER:
1204  return "INNER";
1205  case JoinType::LEFT:
1206  return "LEFT";
1207  case JoinType::INVALID:
1208  return "INVALID";
1209  }
1210  UNREACHABLE();
1211  return "";
1212 }
1213 
1214 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1215  switch (algorithm) {
1217  return "ResultSet";
1219  return "Speculative Top N";
1221  return "Streaming Top N";
1222  }
1223  UNREACHABLE();
1224  return "";
1225 }
1226 
1227 } // namespace
1228 
1229 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1230  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1231  // instead of ra_exec_unit description if possible
1232  std::ostringstream os;
1233  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1234  const auto& scan_desc = input_col_desc->getScanDesc();
1235  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1236  << scan_desc.getNestLevel();
1237  }
1238  if (!ra_exe_unit.simple_quals.empty()) {
1239  for (const auto& qual : ra_exe_unit.simple_quals) {
1240  if (qual) {
1241  os << qual->toString() << ",";
1242  }
1243  }
1244  }
1245  if (!ra_exe_unit.quals.empty()) {
1246  for (const auto& qual : ra_exe_unit.quals) {
1247  if (qual) {
1248  os << qual->toString() << ",";
1249  }
1250  }
1251  }
1252  if (!ra_exe_unit.join_quals.empty()) {
1253  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1254  const auto& join_condition = ra_exe_unit.join_quals[i];
1255  os << std::to_string(i) << join_type_to_string(join_condition.type);
1256  for (const auto& qual : join_condition.quals) {
1257  if (qual) {
1258  os << qual->toString() << ",";
1259  }
1260  }
1261  }
1262  }
1263  if (!ra_exe_unit.groupby_exprs.empty()) {
1264  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1265  if (qual) {
1266  os << qual->toString() << ",";
1267  }
1268  }
1269  }
1270  for (const auto& expr : ra_exe_unit.target_exprs) {
1271  if (expr) {
1272  os << expr->toString() << ",";
1273  }
1274  }
1275  os << ::toString(ra_exe_unit.estimator == nullptr);
1276  os << std::to_string(ra_exe_unit.scan_limit);
1277  return os.str();
1278 }
1279 
1280 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1281  os << "\n\tTable/Col/Levels: ";
1282  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1283  const auto& scan_desc = input_col_desc->getScanDesc();
1284  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1285  << scan_desc.getNestLevel() << ") ";
1286  }
1287  if (!ra_exe_unit.simple_quals.empty()) {
1288  os << "\n\tSimple Quals: "
1290  ", ");
1291  }
1292  if (!ra_exe_unit.quals.empty()) {
1293  os << "\n\tQuals: "
1294  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1295  }
1296  if (!ra_exe_unit.join_quals.empty()) {
1297  os << "\n\tJoin Quals: ";
1298  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1299  const auto& join_condition = ra_exe_unit.join_quals[i];
1300  os << "\t\t" << std::to_string(i) << " "
1301  << join_type_to_string(join_condition.type);
1302  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1303  }
1304  }
1305  if (!ra_exe_unit.groupby_exprs.empty()) {
1306  os << "\n\tGroup By: "
1308  ", ");
1309  }
1310  os << "\n\tProjected targets: "
1312  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1313  os << "\n\tSort Info: ";
1314  const auto& sort_info = ra_exe_unit.sort_info;
1315  os << "\n\t Order Entries: "
1316  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1317  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1318  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1319  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1320  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1321  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1322  if (ra_exe_unit.union_all) {
1323  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1324  }
1325  return os;
1326 }
1327 
1328 namespace {
1329 
1331  const size_t new_scan_limit) {
1332  return {ra_exe_unit_in.input_descs,
1333  ra_exe_unit_in.input_col_descs,
1334  ra_exe_unit_in.simple_quals,
1335  ra_exe_unit_in.quals,
1336  ra_exe_unit_in.join_quals,
1337  ra_exe_unit_in.groupby_exprs,
1338  ra_exe_unit_in.target_exprs,
1339  ra_exe_unit_in.estimator,
1340  ra_exe_unit_in.sort_info,
1341  new_scan_limit,
1342  ra_exe_unit_in.query_hint,
1343  ra_exe_unit_in.use_bump_allocator,
1344  ra_exe_unit_in.union_all,
1345  ra_exe_unit_in.query_state};
1346 }
1347 
1348 } // namespace
1349 
1350 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1351  const bool is_agg,
1352  const std::vector<InputTableInfo>& query_infos,
1353  const RelAlgExecutionUnit& ra_exe_unit_in,
1354  const CompilationOptions& co,
1355  const ExecutionOptions& eo,
1357  RenderInfo* render_info,
1358  const bool has_cardinality_estimation,
1359  ColumnCacheMap& column_cache) {
1360  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1361 
1362  ScopeGuard cleanup_post_execution = [this] {
1363  // cleanup/unpin GPU buffer allocations
1364  // TODO: separate out this state into a single object
1365  plan_state_.reset(nullptr);
1366  if (cgen_state_) {
1367  cgen_state_->in_values_bitmaps_.clear();
1368  }
1369  };
1370 
1371  try {
1372  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1373  is_agg,
1374  true,
1375  query_infos,
1376  ra_exe_unit_in,
1377  co,
1378  eo,
1379  cat,
1380  row_set_mem_owner_,
1381  render_info,
1382  has_cardinality_estimation,
1383  column_cache);
1384  if (result) {
1385  result->setKernelQueueTime(kernel_queue_time_ms_);
1386  result->addCompilationQueueTime(compilation_queue_time_ms_);
1387  if (eo.just_validate) {
1388  result->setValidationOnlyRes();
1389  }
1390  }
1391  return result;
1392  } catch (const CompilationRetryNewScanLimit& e) {
1393  auto result =
1394  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1395  is_agg,
1396  false,
1397  query_infos,
1398  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1399  co,
1400  eo,
1401  cat,
1402  row_set_mem_owner_,
1403  render_info,
1404  has_cardinality_estimation,
1405  column_cache);
1406  if (result) {
1407  result->setKernelQueueTime(kernel_queue_time_ms_);
1408  result->addCompilationQueueTime(compilation_queue_time_ms_);
1409  if (eo.just_validate) {
1410  result->setValidationOnlyRes();
1411  }
1412  }
1413  return result;
1414  }
1415 }
1416 
1418  size_t& max_groups_buffer_entry_guess,
1419  const bool is_agg,
1420  const bool allow_single_frag_table_opt,
1421  const std::vector<InputTableInfo>& query_infos,
1422  const RelAlgExecutionUnit& ra_exe_unit_in,
1423  const CompilationOptions& co,
1424  const ExecutionOptions& eo,
1426  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1427  RenderInfo* render_info,
1428  const bool has_cardinality_estimation,
1429  ColumnCacheMap& column_cache) {
1430  INJECT_TIMER(Exec_executeWorkUnit);
1431  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1432  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1433  CHECK(!query_infos.empty());
1434  if (!max_groups_buffer_entry_guess) {
1435  // The query has failed the first execution attempt because of running out
1436  // of group by slots. Make the conservative choice: allocate fragment size
1437  // slots and run on the CPU.
1438  CHECK(device_type == ExecutorDeviceType::CPU);
1439  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1440  }
1441 
1442  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1443  do {
1444  SharedKernelContext shared_context(query_infos);
1445  ColumnFetcher column_fetcher(this, column_cache);
1446  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1447  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1448  if (eo.executor_type == ExecutorType::Native) {
1449  try {
1450  INJECT_TIMER(query_step_compilation);
1451  auto clock_begin = timer_start();
1452  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1453  compilation_queue_time_ms_ += timer_stop(clock_begin);
1454 
1455  query_mem_desc_owned =
1456  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1457  crt_min_byte_width,
1458  has_cardinality_estimation,
1459  ra_exe_unit,
1460  query_infos,
1461  deleted_cols_map,
1462  column_fetcher,
1463  {device_type,
1464  co.hoist_literals,
1465  co.opt_level,
1467  co.allow_lazy_fetch,
1469  co.explain_type,
1471  eo,
1472  render_info,
1473  this);
1474  CHECK(query_mem_desc_owned);
1475  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1476  } catch (CompilationRetryNoCompaction&) {
1477  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1478  continue;
1479  }
1480  } else {
1481  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1482  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1483  CHECK(!query_mem_desc_owned);
1484  query_mem_desc_owned.reset(
1486  }
1487  if (eo.just_explain) {
1488  return executeExplain(*query_comp_desc_owned);
1489  }
1490 
1491  for (const auto target_expr : ra_exe_unit.target_exprs) {
1492  plan_state_->target_exprs_.push_back(target_expr);
1493  }
1494 
1495  if (!eo.just_validate) {
1496  int available_cpus = cpu_threads();
1497  auto available_gpus = get_available_gpus(cat);
1498 
1499  const auto context_count =
1500  get_context_count(device_type, available_cpus, available_gpus.size());
1501  try {
1502  auto kernels = createKernels(shared_context,
1503  ra_exe_unit,
1504  column_fetcher,
1505  query_infos,
1506  eo,
1507  is_agg,
1508  allow_single_frag_table_opt,
1509  context_count,
1510  *query_comp_desc_owned,
1511  *query_mem_desc_owned,
1512  render_info,
1513  available_gpus,
1514  available_cpus);
1515  if (g_use_tbb_pool) {
1516 #ifdef HAVE_TBB
1517  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1518  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1519  std::move(kernels));
1520 #else
1521  throw std::runtime_error(
1522  "This build is not TBB enabled. Restart the server with "
1523  "\"enable-modern-thread-pool\" disabled.");
1524 #endif
1525  } else {
1526  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1527  std::move(kernels));
1528  }
1529  } catch (QueryExecutionError& e) {
1530  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1531  e.getErrorCode() == ERR_OUT_OF_TIME) {
1532  throw QueryExecutionError(ERR_INTERRUPTED);
1533  }
1534  if (e.getErrorCode() == ERR_INTERRUPTED) {
1535  throw QueryExecutionError(ERR_INTERRUPTED);
1536  }
1537  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1538  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1539  crt_min_byte_width <<= 1;
1540  continue;
1541  }
1542  throw;
1543  }
1544  }
1545  if (is_agg) {
1546  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1547  // update query status to let user know we are now in the reduction phase
1548  std::string curRunningSession{""};
1549  std::string curRunningQuerySubmittedTime{""};
1550  bool sessionEnrolled = false;
1552  {
1553  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
1554  executor->getSessionLock());
1555  curRunningSession = executor->getCurrentQuerySession(session_read_lock);
1556  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1557  sessionEnrolled =
1558  executor->checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1559  }
1560  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1561  sessionEnrolled) {
1562  executor->updateQuerySessionStatus(curRunningSession,
1563  curRunningQuerySubmittedTime,
1565  }
1566  }
1567  try {
1568  return collectAllDeviceResults(shared_context,
1569  ra_exe_unit,
1570  *query_mem_desc_owned,
1571  query_comp_desc_owned->getDeviceType(),
1572  row_set_mem_owner);
1573  } catch (ReductionRanOutOfSlots&) {
1574  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1575  } catch (OverflowOrUnderflow&) {
1576  crt_min_byte_width <<= 1;
1577  continue;
1578  } catch (QueryExecutionError& e) {
1579  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1580  << ", what(): " << e.what();
1581  throw QueryExecutionError(e.getErrorCode());
1582  }
1583  }
1584  return resultsUnion(shared_context, ra_exe_unit);
1585 
1586  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1587 
1588  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1591  nullptr,
1592  catalog_,
1593  blockSize(),
1594  gridSize());
1595 }
1596 
1598  const RelAlgExecutionUnit& ra_exe_unit_in,
1599  const InputTableInfo& table_info,
1600  const CompilationOptions& co,
1601  const ExecutionOptions& eo,
1603  PerFragmentCallBack& cb,
1604  const std::set<size_t>& fragment_indexes_param) {
1605  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1606  ColumnCacheMap column_cache;
1607 
1608  std::vector<InputTableInfo> table_infos{table_info};
1609  SharedKernelContext kernel_context(table_infos);
1610 
1611  ColumnFetcher column_fetcher(this, column_cache);
1612  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1613  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1614  {
1615  auto clock_begin = timer_start();
1616  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1617  compilation_queue_time_ms_ += timer_stop(clock_begin);
1618  query_mem_desc_owned =
1619  query_comp_desc_owned->compile(0,
1620  8,
1621  /*has_cardinality_estimation=*/false,
1622  ra_exe_unit,
1623  table_infos,
1624  deleted_cols_map,
1625  column_fetcher,
1626  co,
1627  eo,
1628  nullptr,
1629  this);
1630  }
1631  CHECK(query_mem_desc_owned);
1632  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1633  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1634  const auto& outer_fragments = table_info.info.fragments;
1635 
1636  std::set<size_t> fragment_indexes;
1637  if (fragment_indexes_param.empty()) {
1638  // An empty `fragment_indexes_param` set implies executing
1639  // the query for all fragments in the table. In this
1640  // case, populate `fragment_indexes` with all fragment indexes.
1641  for (size_t i = 0; i < outer_fragments.size(); i++) {
1642  fragment_indexes.emplace(i);
1643  }
1644  } else {
1645  fragment_indexes = fragment_indexes_param;
1646  }
1647 
1648  {
1649  auto clock_begin = timer_start();
1650  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1651  kernel_queue_time_ms_ += timer_stop(clock_begin);
1652 
1653  for (auto fragment_index : fragment_indexes) {
1654  // We may want to consider in the future allowing this to execute on devices other
1655  // than CPU
1656  FragmentsList fragments_list{{table_id, {fragment_index}}};
1657  ExecutionKernel kernel(ra_exe_unit,
1658  co.device_type,
1659  /*device_id=*/0,
1660  eo,
1661  column_fetcher,
1662  *query_comp_desc_owned,
1663  *query_mem_desc_owned,
1664  fragments_list,
1666  /*render_info=*/nullptr,
1667  /*rowid_lookup_key=*/-1);
1668  kernel.run(this, 0, kernel_context);
1669  }
1670  }
1671 
1672  const auto& all_fragment_results = kernel_context.getFragmentResults();
1673 
1674  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
1675  CHECK_EQ(result_fragment_indexes.size(), 1);
1676  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
1677  }
1678 }
1679 
1681  const TableFunctionExecutionUnit exe_unit,
1682  const std::vector<InputTableInfo>& table_infos,
1683  const CompilationOptions& co,
1684  const ExecutionOptions& eo,
1686  INJECT_TIMER(Exec_executeTableFunction);
1687 
1688  if (eo.just_validate) {
1690  /*entry_count=*/0,
1692  /*is_table_function=*/true);
1693  query_mem_desc.setOutputColumnar(true);
1694  return std::make_shared<ResultSet>(
1695  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
1696  co.device_type,
1697  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
1698  this->getRowSetMemoryOwner(),
1699  this->getCatalog(),
1700  this->blockSize(),
1701  this->gridSize());
1702  }
1703 
1704  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1705 
1706  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1707  // framework, we may want to move this up a level
1708 
1709  ColumnFetcher column_fetcher(this, column_cache);
1710  TableFunctionCompilationContext compilation_context;
1711  compilation_context.compile(exe_unit, co, this);
1712 
1713  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1714  return exe_context.execute(
1715  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1716 }
1717 
1719  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1720 }
1721 
1723  const RelAlgExecutionUnit& ra_exe_unit,
1724  const ExecutorDeviceType requested_device_type) {
1725  for (const auto target_expr : ra_exe_unit.target_exprs) {
1726  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1727  if (!ra_exe_unit.groupby_exprs.empty() &&
1728  !isArchPascalOrLater(requested_device_type)) {
1729  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1730  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1731  return ExecutorDeviceType::CPU;
1732  }
1733  }
1734  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1735  return ExecutorDeviceType::CPU;
1736  }
1737  }
1738  return requested_device_type;
1739 }
1740 
1741 namespace {
1742 
1743 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1744  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1745  if (ti.is_fp()) {
1746  if (float_argument_input && ti.get_type() == kFLOAT) {
1747  int64_t float_null_val = 0;
1748  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1749  static_cast<float>(inline_fp_null_val(ti));
1750  return float_null_val;
1751  }
1752  const auto double_null_val = inline_fp_null_val(ti);
1753  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1754  }
1755  return inline_int_null_val(ti);
1756 }
1757 
1758 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1759  std::vector<int64_t>& entry,
1760  const std::vector<Analyzer::Expr*>& target_exprs,
1762  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1763  const auto target_expr = target_exprs[target_idx];
1764  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1765  CHECK(agg_info.is_agg);
1766  target_infos.push_back(agg_info);
1767  if (g_cluster) {
1768  const auto executor = query_mem_desc.getExecutor();
1769  CHECK(executor);
1770  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1771  CHECK(row_set_mem_owner);
1772  const auto& count_distinct_desc =
1773  query_mem_desc.getCountDistinctDescriptor(target_idx);
1774  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1775  CHECK(row_set_mem_owner);
1776  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1777  count_distinct_desc.bitmapPaddedSizeBytes(),
1778  /*thread_idx=*/0); // TODO: can we detect thread idx here?
1779  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1780  continue;
1781  }
1782  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1783  auto count_distinct_set = new std::set<int64_t>();
1784  CHECK(row_set_mem_owner);
1785  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1786  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1787  continue;
1788  }
1789  }
1790  const bool float_argument_input = takes_float_argument(agg_info);
1791  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1792  entry.push_back(0);
1793  } else if (agg_info.agg_kind == kAVG) {
1794  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1795  entry.push_back(0);
1796  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1797  if (agg_info.sql_type.is_geometry()) {
1798  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1799  entry.push_back(0);
1800  }
1801  } else if (agg_info.sql_type.is_varlen()) {
1802  entry.push_back(0);
1803  entry.push_back(0);
1804  } else {
1805  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1806  }
1807  } else {
1808  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1809  }
1810  }
1811 }
1812 
1814  const std::vector<Analyzer::Expr*>& target_exprs_in,
1816  const ExecutorDeviceType device_type) {
1817  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1818  std::vector<Analyzer::Expr*> target_exprs;
1819  for (const auto target_expr : target_exprs_in) {
1820  const auto target_expr_copy =
1821  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1822  CHECK(target_expr_copy);
1823  auto ti = target_expr->get_type_info();
1824  ti.set_notnull(false);
1825  target_expr_copy->set_type_info(ti);
1826  if (target_expr_copy->get_arg()) {
1827  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1828  arg_ti.set_notnull(false);
1829  target_expr_copy->get_arg()->set_type_info(arg_ti);
1830  }
1831  target_exprs_owned_copies.push_back(target_expr_copy);
1832  target_exprs.push_back(target_expr_copy.get());
1833  }
1834  std::vector<TargetInfo> target_infos;
1835  std::vector<int64_t> entry;
1836  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1837  const auto executor = query_mem_desc.getExecutor();
1838  CHECK(executor);
1839  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1840  CHECK(row_set_mem_owner);
1841  auto rs = std::make_shared<ResultSet>(target_infos,
1842  device_type,
1844  row_set_mem_owner,
1845  executor->getCatalog(),
1846  executor->blockSize(),
1847  executor->gridSize());
1848  rs->allocateStorage();
1849  rs->fillOneEntry(entry);
1850  return rs;
1851 }
1852 
1853 } // namespace
1854 
1856  SharedKernelContext& shared_context,
1857  const RelAlgExecutionUnit& ra_exe_unit,
1859  const ExecutorDeviceType device_type,
1860  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1861  auto timer = DEBUG_TIMER(__func__);
1862  auto& result_per_device = shared_context.getFragmentResults();
1863  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1866  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1867  }
1868  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1869  try {
1870  return reduceSpeculativeTopN(
1871  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1872  } catch (const std::bad_alloc&) {
1873  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1874  }
1875  }
1876  const auto shard_count =
1877  device_type == ExecutorDeviceType::GPU
1879  : 0;
1880 
1881  if (shard_count && !result_per_device.empty()) {
1882  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1883  }
1884  return reduceMultiDeviceResults(
1885  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1886 }
1887 
1888 namespace {
1899 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1900  const QueryMemoryDescriptor& input_query_mem_desc,
1901  const ResultSetStorage* output_storage,
1902  size_t output_row_index,
1903  const QueryMemoryDescriptor& output_query_mem_desc,
1904  const std::vector<uint32_t>& top_permutation) {
1905  const auto output_buffer = output_storage->getUnderlyingBuffer();
1906  const auto input_buffer = input_storage->getUnderlyingBuffer();
1907  for (const auto sorted_idx : top_permutation) {
1908  // permuting all group-columns in this result set into the final buffer:
1909  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1910  group_idx++) {
1911  const auto input_column_ptr =
1912  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1913  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1914  const auto output_column_ptr =
1915  output_buffer +
1916  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1917  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1918  memcpy(output_column_ptr,
1919  input_column_ptr,
1920  output_query_mem_desc.groupColWidth(group_idx));
1921  }
1922  // permuting all agg-columns in this result set into the final buffer:
1923  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1924  slot_idx++) {
1925  const auto input_column_ptr =
1926  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1927  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1928  const auto output_column_ptr =
1929  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1930  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1931  memcpy(output_column_ptr,
1932  input_column_ptr,
1933  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1934  }
1935  ++output_row_index;
1936  }
1937  return output_row_index;
1938 }
1939 
1949 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1950  const ResultSetStorage* output_storage,
1951  size_t output_row_index,
1952  const QueryMemoryDescriptor& output_query_mem_desc,
1953  const std::vector<uint32_t>& top_permutation) {
1954  const auto output_buffer = output_storage->getUnderlyingBuffer();
1955  const auto input_buffer = input_storage->getUnderlyingBuffer();
1956  for (const auto sorted_idx : top_permutation) {
1957  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1958  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1959  row_ptr,
1960  output_query_mem_desc.getRowSize());
1961  ++output_row_index;
1962  }
1963  return output_row_index;
1964 }
1965 } // namespace
1966 
1967 // Collect top results from each device, stitch them together and sort. Partial
1968 // results from each device are guaranteed to be disjunct because we only go on
1969 // this path when one of the columns involved is a shard key.
1971  SharedKernelContext& shared_context,
1972  const RelAlgExecutionUnit& ra_exe_unit) const {
1973  auto& result_per_device = shared_context.getFragmentResults();
1974  const auto first_result_set = result_per_device.front().first;
1975  CHECK(first_result_set);
1976  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1977  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1978  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1979  top_query_mem_desc.setEntryCount(0);
1980  for (auto& result : result_per_device) {
1981  const auto result_set = result.first;
1982  CHECK(result_set);
1983  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
1984  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1985  top_query_mem_desc.setEntryCount(new_entry_cnt);
1986  }
1987  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1988  first_result_set->getDeviceType(),
1989  top_query_mem_desc,
1990  first_result_set->getRowSetMemOwner(),
1991  catalog_,
1992  blockSize(),
1993  gridSize());
1994  auto top_storage = top_result_set->allocateStorage();
1995  size_t top_output_row_idx{0};
1996  for (auto& result : result_per_device) {
1997  const auto result_set = result.first;
1998  CHECK(result_set);
1999  const auto& top_permutation = result_set->getPermutationBuffer();
2000  CHECK_LE(top_permutation.size(), top_n);
2001  if (top_query_mem_desc.didOutputColumnar()) {
2002  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2003  result_set->getQueryMemDesc(),
2004  top_storage,
2005  top_output_row_idx,
2006  top_query_mem_desc,
2007  top_permutation);
2008  } else {
2009  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2010  top_storage,
2011  top_output_row_idx,
2012  top_query_mem_desc,
2013  top_permutation);
2014  }
2015  }
2016  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2017  return top_result_set;
2018 }
2019 
2020 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2021  const {
2022  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2023  const auto& join_info = plan_state_->join_info_;
2024  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2025  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2026  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2027  id_to_cond.insert(
2028  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2029  }
2030  return id_to_cond;
2031 }
2032 
2033 namespace {
2034 
2035 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2036  for (const auto& col : fetched_cols) {
2037  if (col.is_lazily_fetched) {
2038  return true;
2039  }
2040  }
2041  return false;
2042 }
2043 
2044 } // namespace
2045 
2046 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2047  SharedKernelContext& shared_context,
2048  const RelAlgExecutionUnit& ra_exe_unit,
2049  ColumnFetcher& column_fetcher,
2050  const std::vector<InputTableInfo>& table_infos,
2051  const ExecutionOptions& eo,
2052  const bool is_agg,
2053  const bool allow_single_frag_table_opt,
2054  const size_t context_count,
2055  const QueryCompilationDescriptor& query_comp_desc,
2057  RenderInfo* render_info,
2058  std::unordered_set<int>& available_gpus,
2059  int& available_cpus) {
2060  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2061 
2062  QueryFragmentDescriptor fragment_descriptor(
2063  ra_exe_unit,
2064  table_infos,
2065  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2066  ? catalog_->getDataMgr().getMemoryInfo(Data_Namespace::MemoryLevel::GPU_LEVEL)
2067  : std::vector<Data_Namespace::MemoryInfo>{},
2070  CHECK(!ra_exe_unit.input_descs.empty());
2071 
2072  const auto device_type = query_comp_desc.getDeviceType();
2073  const bool uses_lazy_fetch =
2074  plan_state_->allow_lazy_fetch_ &&
2075  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
2076  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2077  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2078  const auto device_count = deviceCount(device_type);
2079  CHECK_GT(device_count, 0);
2080 
2081  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2082  shared_context.getFragOffsets(),
2083  device_count,
2084  device_type,
2085  use_multifrag_kernel,
2087  this);
2088  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2089  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2090  }
2091 
2092  if (use_multifrag_kernel) {
2093  VLOG(1) << "Creating multifrag execution kernels";
2094  VLOG(1) << query_mem_desc.toString();
2095 
2096  // NB: We should never be on this path when the query is retried because of running
2097  // out of group by slots; also, for scan only queries on CPU we want the
2098  // high-granularity, fragment by fragment execution instead. For scan only queries on
2099  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2100  // buffer per fragment.
2101  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2102  &execution_kernels,
2103  &column_fetcher,
2104  &eo,
2105  &query_comp_desc,
2106  &query_mem_desc,
2107  render_info](const int device_id,
2108  const FragmentsList& frag_list,
2109  const int64_t rowid_lookup_key) {
2110  execution_kernels.emplace_back(
2111  std::make_unique<ExecutionKernel>(ra_exe_unit,
2113  device_id,
2114  eo,
2115  column_fetcher,
2116  query_comp_desc,
2117  query_mem_desc,
2118  frag_list,
2120  render_info,
2121  rowid_lookup_key));
2122  };
2123  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2124  } else {
2125  VLOG(1) << "Creating one execution kernel per fragment";
2126  VLOG(1) << query_mem_desc.toString();
2127 
2128  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2130  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2131  const auto max_frag_size =
2132  table_infos.front().info.getFragmentNumTuplesUpperBound();
2133  if (max_frag_size < query_mem_desc.getEntryCount()) {
2134  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2135  << " to match max fragment size " << max_frag_size
2136  << " for kernel per fragment execution path.";
2137  throw CompilationRetryNewScanLimit(max_frag_size);
2138  }
2139  }
2140 
2141  size_t frag_list_idx{0};
2142  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2143  &execution_kernels,
2144  &column_fetcher,
2145  &eo,
2146  &frag_list_idx,
2147  &device_type,
2148  &query_comp_desc,
2149  &query_mem_desc,
2150  render_info](const int device_id,
2151  const FragmentsList& frag_list,
2152  const int64_t rowid_lookup_key) {
2153  if (!frag_list.size()) {
2154  return;
2155  }
2156  CHECK_GE(device_id, 0);
2157 
2158  execution_kernels.emplace_back(
2159  std::make_unique<ExecutionKernel>(ra_exe_unit,
2160  device_type,
2161  device_id,
2162  eo,
2163  column_fetcher,
2164  query_comp_desc,
2165  query_mem_desc,
2166  frag_list,
2168  render_info,
2169  rowid_lookup_key));
2170  ++frag_list_idx;
2171  };
2172 
2173  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2174  ra_exe_unit);
2175  }
2176 
2177  return execution_kernels;
2178 }
2179 
2180 template <typename THREAD_POOL>
2182  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2183  auto clock_begin = timer_start();
2184  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2185  kernel_queue_time_ms_ += timer_stop(clock_begin);
2186 
2187  THREAD_POOL thread_pool;
2188  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2189  size_t kernel_idx = 1;
2190  for (auto& kernel : kernels) {
2191  thread_pool.spawn(
2192  [this, &shared_context, parent_thread_id = logger::thread_id()](
2193  ExecutionKernel* kernel, const size_t crt_kernel_idx) {
2194  CHECK(kernel);
2195  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2196  const size_t thread_idx = crt_kernel_idx % cpu_threads();
2197  kernel->run(this, thread_idx, shared_context);
2198  },
2199  kernel.get(),
2200  kernel_idx++);
2201  }
2202  thread_pool.join();
2203 }
2204 
2206  const RelAlgExecutionUnit& ra_exe_unit,
2207  const ExecutorDeviceType device_type,
2208  const size_t table_idx,
2209  const size_t outer_frag_idx,
2210  std::map<int, const TableFragments*>& selected_tables_fragments,
2211  const std::unordered_map<int, const Analyzer::BinOper*>&
2212  inner_table_id_to_join_condition) {
2213  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2214  auto table_frags_it = selected_tables_fragments.find(table_id);
2215  CHECK(table_frags_it != selected_tables_fragments.end());
2216  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2217  const auto outer_table_fragments_it =
2218  selected_tables_fragments.find(outer_input_desc.getTableId());
2219  const auto outer_table_fragments = outer_table_fragments_it->second;
2220  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2221  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2222  if (!table_idx) {
2223  return {outer_frag_idx};
2224  }
2225  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2226  auto& inner_frags = table_frags_it->second;
2227  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2228  std::vector<size_t> all_frag_ids;
2229  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2230  ++inner_frag_idx) {
2231  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2232  if (skipFragmentPair(outer_fragment_info,
2233  inner_frag_info,
2234  table_idx,
2235  inner_table_id_to_join_condition,
2236  ra_exe_unit,
2237  device_type)) {
2238  continue;
2239  }
2240  all_frag_ids.push_back(inner_frag_idx);
2241  }
2242  return all_frag_ids;
2243 }
2244 
2245 // Returns true iff the join between two fragments cannot yield any results, per
2246 // shard information. The pair can be skipped to avoid full broadcast.
2248  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2249  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2250  const int table_idx,
2251  const std::unordered_map<int, const Analyzer::BinOper*>&
2252  inner_table_id_to_join_condition,
2253  const RelAlgExecutionUnit& ra_exe_unit,
2254  const ExecutorDeviceType device_type) {
2255  if (device_type != ExecutorDeviceType::GPU) {
2256  return false;
2257  }
2258  CHECK(table_idx >= 0 &&
2259  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2260  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2261  // Both tables need to be sharded the same way.
2262  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2263  outer_fragment_info.shard == inner_fragment_info.shard) {
2264  return false;
2265  }
2266  const Analyzer::BinOper* join_condition{nullptr};
2267  if (ra_exe_unit.join_quals.empty()) {
2268  CHECK(!inner_table_id_to_join_condition.empty());
2269  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2270  CHECK(condition_it != inner_table_id_to_join_condition.end());
2271  join_condition = condition_it->second;
2272  CHECK(join_condition);
2273  } else {
2274  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2275  plan_state_->join_info_.join_hash_tables_.size());
2276  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2277  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2278  table_idx) {
2279  CHECK(!join_condition);
2280  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2281  }
2282  }
2283  }
2284  if (!join_condition) {
2285  return false;
2286  }
2287  // TODO(adb): support fragment skipping based on the overlaps operator
2288  if (join_condition->is_overlaps_oper()) {
2289  return false;
2290  }
2291  size_t shard_count{0};
2292  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2293  join_condition->get_left_operand())) {
2294  auto inner_outer_pairs =
2295  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2297  join_condition, this, inner_outer_pairs);
2298  } else {
2299  shard_count = get_shard_count(join_condition, this);
2300  }
2301  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2302  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2303  }
2304  return shard_count;
2305 }
2306 
2307 namespace {
2308 
2311  const int table_id = col_desc->getScanDesc().getTableId();
2312  const int col_id = col_desc->getColId();
2313  return get_column_descriptor_maybe(col_id, table_id, cat);
2314 }
2315 
2316 } // namespace
2317 
2318 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2319  const std::vector<InputDescriptor>& input_descs,
2320  const std::map<int, const TableFragments*>& all_tables_fragments) {
2321  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2322  for (auto& desc : input_descs) {
2323  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2324  CHECK(fragments_it != all_tables_fragments.end());
2325  const auto& fragments = *fragments_it->second;
2326  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2327  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2328  frag_offsets[i] = off;
2329  off += fragments[i].getNumTuples();
2330  }
2331  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2332  }
2333  return tab_id_to_frag_offsets;
2334 }
2335 
2336 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2338  const RelAlgExecutionUnit& ra_exe_unit,
2339  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2340  const std::vector<InputDescriptor>& input_descs,
2341  const std::map<int, const TableFragments*>& all_tables_fragments) {
2342  std::vector<std::vector<int64_t>> all_num_rows;
2343  std::vector<std::vector<uint64_t>> all_frag_offsets;
2344  const auto tab_id_to_frag_offsets =
2345  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2346  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2347  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2348  std::vector<int64_t> num_rows;
2349  std::vector<uint64_t> frag_offsets;
2350  if (!ra_exe_unit.union_all) {
2351  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2352  }
2353  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2354  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2355  const auto fragments_it =
2356  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2357  CHECK(fragments_it != all_tables_fragments.end());
2358  const auto& fragments = *fragments_it->second;
2359  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2360  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2361  const auto& fragment = fragments[frag_id];
2362  num_rows.push_back(fragment.getNumTuples());
2363  } else {
2364  size_t total_row_count{0};
2365  for (const auto& fragment : fragments) {
2366  total_row_count += fragment.getNumTuples();
2367  }
2368  num_rows.push_back(total_row_count);
2369  }
2370  const auto frag_offsets_it =
2371  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2372  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2373  const auto& offsets = frag_offsets_it->second;
2374  CHECK_LT(frag_id, offsets.size());
2375  frag_offsets.push_back(offsets[frag_id]);
2376  }
2377  all_num_rows.push_back(num_rows);
2378  // Fragment offsets of outer table should be ONLY used by rowid for now.
2379  all_frag_offsets.push_back(frag_offsets);
2380  }
2381  return {all_num_rows, all_frag_offsets};
2382 }
2383 
2384 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2385 // all the table fragments.
2387  const RelAlgExecutionUnit& ra_exe_unit,
2388  const FragmentsList& selected_fragments) const {
2389  const auto& input_descs = ra_exe_unit.input_descs;
2390  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2391  if (nest_level < 1 ||
2392  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2393  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2394  (ra_exe_unit.join_quals.empty() &&
2395  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2396  return false;
2397  }
2398  const int table_id = inner_col_desc.getScanDesc().getTableId();
2399  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2400  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2401  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2402  return fragments.size() > 1;
2403 }
2404 
2406  const ColumnDescriptor* cd,
2407  const InputColDescriptor& inner_col_desc,
2408  const RelAlgExecutionUnit& ra_exe_unit,
2409  const FragmentsList& selected_fragments,
2410  const Data_Namespace::MemoryLevel memory_level) const {
2411  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2412  const int table_id = inner_col_desc.getScanDesc().getTableId();
2413  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2414  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2415  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2416  auto need_linearize = cd->columnType.is_fixlen_array();
2417  if (memory_level == MemoryLevel::GPU_LEVEL) {
2418  // we disable multi-frag linearization for GPU case until we find the reason of
2419  // CUDA 'misaligned address' issue, see #5245
2420  need_linearize = false;
2421  }
2422  return table_id > 0 && need_linearize && fragments.size() > 1;
2423 }
2424 
2425 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2426  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2427  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2428  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2429 }
2430 
2432  const ColumnFetcher& column_fetcher,
2433  const RelAlgExecutionUnit& ra_exe_unit,
2434  const int device_id,
2435  const Data_Namespace::MemoryLevel memory_level,
2436  const std::map<int, const TableFragments*>& all_tables_fragments,
2437  const FragmentsList& selected_fragments,
2439  std::list<ChunkIter>& chunk_iterators,
2440  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2441  DeviceAllocator* device_allocator,
2442  const size_t thread_idx,
2443  const bool allow_runtime_interrupt) {
2444  auto timer = DEBUG_TIMER(__func__);
2445  INJECT_TIMER(fetchChunks);
2446  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2447  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2448  std::vector<size_t> local_col_to_frag_pos;
2449  buildSelectedFragsMapping(selected_fragments_crossjoin,
2450  local_col_to_frag_pos,
2451  col_global_ids,
2452  selected_fragments,
2453  ra_exe_unit);
2454 
2456  selected_fragments_crossjoin);
2457 
2458  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2459  std::vector<std::vector<int64_t>> all_num_rows;
2460  std::vector<std::vector<uint64_t>> all_frag_offsets;
2461  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2462  std::vector<const int8_t*> frag_col_buffers(
2463  plan_state_->global_to_local_col_ids_.size());
2464  for (const auto& col_id : col_global_ids) {
2465  if (allow_runtime_interrupt) {
2466  bool isInterrupted = false;
2467  {
2468  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2469  const auto query_session = getCurrentQuerySession(session_read_lock);
2470  isInterrupted =
2471  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2472  }
2473  if (isInterrupted) {
2474  throw QueryExecutionError(ERR_INTERRUPTED);
2475  }
2476  }
2477  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2478  throw QueryExecutionError(ERR_INTERRUPTED);
2479  }
2480  CHECK(col_id);
2481  const int table_id = col_id->getScanDesc().getTableId();
2482  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2483  if (cd && cd->isVirtualCol) {
2484  CHECK_EQ("rowid", cd->columnName);
2485  continue;
2486  }
2487  const auto fragments_it = all_tables_fragments.find(table_id);
2488  CHECK(fragments_it != all_tables_fragments.end());
2489  const auto fragments = fragments_it->second;
2490  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2491  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2492  CHECK_LT(static_cast<size_t>(it->second),
2493  plan_state_->global_to_local_col_ids_.size());
2494  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2495  if (!fragments->size()) {
2496  return {};
2497  }
2498  CHECK_LT(frag_id, fragments->size());
2499  auto memory_level_for_column = memory_level;
2500  if (plan_state_->columns_to_fetch_.find(
2501  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2502  plan_state_->columns_to_fetch_.end()) {
2503  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2504  }
2505  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2506  frag_col_buffers[it->second] =
2507  column_fetcher.getResultSetColumn(col_id.get(),
2508  memory_level_for_column,
2509  device_id,
2510  device_allocator,
2511  thread_idx);
2512  } else {
2513  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2514  // determine if we need special treatment to linearlize multi-frag table
2515  // i.e., a column that is classified as varlen type, i.e., array
2516  // for now, we only support fixed-length array that contains
2517  // geo point coordianates but we can support more types in this way
2518  if (needLinearizeAllFragments(cd,
2519  *col_id,
2520  ra_exe_unit,
2521  selected_fragments,
2522  memory_level_for_column)) {
2523  frag_col_buffers[it->second] =
2524  column_fetcher.linearizeColumnFragments(table_id,
2525  col_id->getColId(),
2526  all_tables_fragments,
2527  chunks,
2528  chunk_iterators,
2529  memory_level_for_column,
2530  device_id,
2531  device_allocator,
2532  thread_idx);
2533  } else {
2534  frag_col_buffers[it->second] =
2535  column_fetcher.getAllTableColumnFragments(table_id,
2536  col_id->getColId(),
2537  all_tables_fragments,
2538  memory_level_for_column,
2539  device_id,
2540  device_allocator,
2541  thread_idx);
2542  }
2543  } else {
2544  frag_col_buffers[it->second] =
2545  column_fetcher.getOneTableColumnFragment(table_id,
2546  frag_id,
2547  col_id->getColId(),
2548  all_tables_fragments,
2549  chunks,
2550  chunk_iterators,
2551  memory_level_for_column,
2552  device_id,
2553  device_allocator);
2554  }
2555  }
2556  }
2557  all_frag_col_buffers.push_back(frag_col_buffers);
2558  }
2559  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2560  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2561  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2562 }
2563 
2564 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2565 // This is written under the assumption that multiple inputs implies a UNION ALL.
2567  const ColumnFetcher& column_fetcher,
2568  const RelAlgExecutionUnit& ra_exe_unit,
2569  const int device_id,
2570  const Data_Namespace::MemoryLevel memory_level,
2571  const std::map<int, const TableFragments*>& all_tables_fragments,
2572  const FragmentsList& selected_fragments,
2574  std::list<ChunkIter>& chunk_iterators,
2575  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2576  DeviceAllocator* device_allocator,
2577  const size_t thread_idx,
2578  const bool allow_runtime_interrupt) {
2579  auto timer = DEBUG_TIMER(__func__);
2580  INJECT_TIMER(fetchUnionChunks);
2581 
2582  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2583  std::vector<std::vector<int64_t>> all_num_rows;
2584  std::vector<std::vector<uint64_t>> all_frag_offsets;
2585 
2586  CHECK(!selected_fragments.empty());
2587  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2588  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2589  using TableId = int;
2590  TableId const selected_table_id = selected_fragments.front().table_id;
2591  bool const input_descs_index =
2592  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2593  if (!input_descs_index) {
2594  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2595  }
2596  bool const input_col_descs_index =
2597  selected_table_id ==
2598  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2599  if (!input_col_descs_index) {
2600  CHECK_EQ(selected_table_id,
2601  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2602  }
2603  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2604  << " selected_table_id=" << selected_table_id
2605  << " input_descs_index=" << int(input_descs_index)
2606  << " input_col_descs_index=" << int(input_col_descs_index)
2607  << " ra_exe_unit.input_descs="
2608  << shared::printContainer(ra_exe_unit.input_descs)
2609  << " ra_exe_unit.input_col_descs="
2610  << shared::printContainer(ra_exe_unit.input_col_descs);
2611 
2612  // Partition col_global_ids by table_id
2613  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2614  table_id_to_input_col_descs;
2615  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2616  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2617  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2618  }
2619  for (auto const& pair : table_id_to_input_col_descs) {
2620  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2621  std::vector<size_t> local_col_to_frag_pos;
2622 
2623  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2624  local_col_to_frag_pos,
2625  pair.second,
2626  selected_fragments,
2627  ra_exe_unit);
2628 
2630  selected_fragments_crossjoin);
2631 
2632  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2633  if (allow_runtime_interrupt) {
2634  bool isInterrupted = false;
2635  {
2636  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2637  const auto query_session = getCurrentQuerySession(session_read_lock);
2638  isInterrupted =
2639  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2640  }
2641  if (isInterrupted) {
2642  throw QueryExecutionError(ERR_INTERRUPTED);
2643  }
2644  }
2645  std::vector<const int8_t*> frag_col_buffers(
2646  plan_state_->global_to_local_col_ids_.size());
2647  for (const auto& col_id : pair.second) {
2648  CHECK(col_id);
2649  const int table_id = col_id->getScanDesc().getTableId();
2650  CHECK_EQ(table_id, pair.first);
2651  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2652  if (cd && cd->isVirtualCol) {
2653  CHECK_EQ("rowid", cd->columnName);
2654  continue;
2655  }
2656  const auto fragments_it = all_tables_fragments.find(table_id);
2657  CHECK(fragments_it != all_tables_fragments.end());
2658  const auto fragments = fragments_it->second;
2659  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2660  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2661  CHECK_LT(static_cast<size_t>(it->second),
2662  plan_state_->global_to_local_col_ids_.size());
2663  const size_t frag_id = ra_exe_unit.union_all
2664  ? 0
2665  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2666  if (!fragments->size()) {
2667  return {};
2668  }
2669  CHECK_LT(frag_id, fragments->size());
2670  auto memory_level_for_column = memory_level;
2671  if (plan_state_->columns_to_fetch_.find(
2672  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2673  plan_state_->columns_to_fetch_.end()) {
2674  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2675  }
2676  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2677  frag_col_buffers[it->second] =
2678  column_fetcher.getResultSetColumn(col_id.get(),
2679  memory_level_for_column,
2680  device_id,
2681  device_allocator,
2682  thread_idx);
2683  } else {
2684  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2685  frag_col_buffers[it->second] =
2686  column_fetcher.getAllTableColumnFragments(table_id,
2687  col_id->getColId(),
2688  all_tables_fragments,
2689  memory_level_for_column,
2690  device_id,
2691  device_allocator,
2692  thread_idx);
2693  } else {
2694  frag_col_buffers[it->second] =
2695  column_fetcher.getOneTableColumnFragment(table_id,
2696  frag_id,
2697  col_id->getColId(),
2698  all_tables_fragments,
2699  chunks,
2700  chunk_iterators,
2701  memory_level_for_column,
2702  device_id,
2703  device_allocator);
2704  }
2705  }
2706  }
2707  all_frag_col_buffers.push_back(frag_col_buffers);
2708  }
2709  std::vector<std::vector<int64_t>> num_rows;
2710  std::vector<std::vector<uint64_t>> frag_offsets;
2711  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2712  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2713  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2714  all_frag_offsets.insert(
2715  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2716  }
2717  // The hack below assumes a particular table traversal order which is not
2718  // always achieved due to unordered map in the outermost loop. According
2719  // to the code below we expect NULLs in even positions of all_frag_col_buffers[0]
2720  // and odd positions of all_frag_col_buffers[1]. As an additional hack we
2721  // swap these vectors if NULLs are not on expected positions.
2722  if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
2723  !all_frag_col_buffers[0][1]) {
2724  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2725  }
2726  // UNION ALL hacks.
2727  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2728  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2729  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2730  }
2731  if (input_descs_index == input_col_descs_index) {
2732  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2733  }
2734 
2735  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2736  << " all_num_rows=" << shared::printContainer(all_num_rows)
2737  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2738  << " input_col_descs_index=" << input_col_descs_index;
2739  return {{all_frag_col_buffers[input_descs_index]},
2740  {{all_num_rows[0][input_descs_index]}},
2741  {{all_frag_offsets[0][input_descs_index]}}};
2742 }
2743 
2744 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2745  const size_t scan_idx,
2746  const RelAlgExecutionUnit& ra_exe_unit) {
2747  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2748  scan_idx > 0 &&
2749  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2750  !selected_fragments[scan_idx].fragment_ids.empty()) {
2751  // Fetch all fragments
2752  return {size_t(0)};
2753  }
2754 
2755  return selected_fragments[scan_idx].fragment_ids;
2756 }
2757 
2759  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2760  std::vector<size_t>& local_col_to_frag_pos,
2761  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2762  const FragmentsList& selected_fragments,
2763  const RelAlgExecutionUnit& ra_exe_unit) {
2764  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2765  size_t frag_pos{0};
2766  const auto& input_descs = ra_exe_unit.input_descs;
2767  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2768  const int table_id = input_descs[scan_idx].getTableId();
2769  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2770  selected_fragments_crossjoin.push_back(
2771  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2772  for (const auto& col_id : col_global_ids) {
2773  CHECK(col_id);
2774  const auto& input_desc = col_id->getScanDesc();
2775  if (input_desc.getTableId() != table_id ||
2776  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2777  continue;
2778  }
2779  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2780  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2781  CHECK_LT(static_cast<size_t>(it->second),
2782  plan_state_->global_to_local_col_ids_.size());
2783  local_col_to_frag_pos[it->second] = frag_pos;
2784  }
2785  ++frag_pos;
2786  }
2787 }
2788 
2790  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2791  std::vector<size_t>& local_col_to_frag_pos,
2792  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2793  const FragmentsList& selected_fragments,
2794  const RelAlgExecutionUnit& ra_exe_unit) {
2795  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2796  size_t frag_pos{0};
2797  const auto& input_descs = ra_exe_unit.input_descs;
2798  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2799  const int table_id = input_descs[scan_idx].getTableId();
2800  // selected_fragments here is from assignFragsToKernelDispatch
2801  // execution_kernel.fragments
2802  if (selected_fragments[0].table_id != table_id) { // TODO 0
2803  continue;
2804  }
2805  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2806  selected_fragments_crossjoin.push_back(
2807  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2808  {size_t(1)}); // TODO
2809  for (const auto& col_id : col_global_ids) {
2810  CHECK(col_id);
2811  const auto& input_desc = col_id->getScanDesc();
2812  if (input_desc.getTableId() != table_id ||
2813  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2814  continue;
2815  }
2816  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2817  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2818  CHECK_LT(static_cast<size_t>(it->second),
2819  plan_state_->global_to_local_col_ids_.size());
2820  local_col_to_frag_pos[it->second] = frag_pos;
2821  }
2822  ++frag_pos;
2823  }
2824 }
2825 
2826 namespace {
2827 
2829  public:
2830  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2832  for (auto out : out_vec_) {
2833  delete[] out;
2834  }
2835  }
2836 
2837  private:
2838  std::vector<int64_t*> out_vec_;
2839 };
2840 } // namespace
2841 
2843  const RelAlgExecutionUnit& ra_exe_unit,
2844  const CompilationResult& compilation_result,
2845  const bool hoist_literals,
2846  ResultSetPtr& results,
2847  const std::vector<Analyzer::Expr*>& target_exprs,
2848  const ExecutorDeviceType device_type,
2849  std::vector<std::vector<const int8_t*>>& col_buffers,
2850  QueryExecutionContext* query_exe_context,
2851  const std::vector<std::vector<int64_t>>& num_rows,
2852  const std::vector<std::vector<uint64_t>>& frag_offsets,
2853  Data_Namespace::DataMgr* data_mgr,
2854  const int device_id,
2855  const uint32_t start_rowid,
2856  const uint32_t num_tables,
2857  const bool allow_runtime_interrupt,
2858  RenderInfo* render_info) {
2859  INJECT_TIMER(executePlanWithoutGroupBy);
2860  auto timer = DEBUG_TIMER(__func__);
2861  CHECK(!results);
2862  if (col_buffers.empty()) {
2863  return 0;
2864  }
2865 
2866  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2867  if (render_info) {
2868  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2869  // here, we are in non-insitu mode.
2870  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2871  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2872  "currently unsupported.";
2873  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2874  }
2875 
2876  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2877  std::vector<int64_t*> out_vec;
2878  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2879  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2880  std::unique_ptr<OutVecOwner> output_memory_scope;
2881  if (allow_runtime_interrupt) {
2882  bool isInterrupted = false;
2883  {
2884  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2885  const auto query_session = getCurrentQuerySession(session_read_lock);
2886  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2887  }
2888  if (isInterrupted) {
2889  throw QueryExecutionError(ERR_INTERRUPTED);
2890  }
2891  }
2892  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2893  throw QueryExecutionError(ERR_INTERRUPTED);
2894  }
2895  if (device_type == ExecutorDeviceType::CPU) {
2896  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2897  compilation_result.generated_code);
2898  CHECK(cpu_generated_code);
2899  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2900  cpu_generated_code.get(),
2901  hoist_literals,
2902  hoist_buf,
2903  col_buffers,
2904  num_rows,
2905  frag_offsets,
2906  0,
2907  &error_code,
2908  num_tables,
2909  join_hash_table_ptrs);
2910  output_memory_scope.reset(new OutVecOwner(out_vec));
2911  } else {
2912  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2913  compilation_result.generated_code);
2914  CHECK(gpu_generated_code);
2915  try {
2916  out_vec = query_exe_context->launchGpuCode(
2917  ra_exe_unit,
2918  gpu_generated_code.get(),
2919  hoist_literals,
2920  hoist_buf,
2921  col_buffers,
2922  num_rows,
2923  frag_offsets,
2924  0,
2925  data_mgr,
2926  blockSize(),
2927  gridSize(),
2928  device_id,
2929  compilation_result.gpu_smem_context.getSharedMemorySize(),
2930  &error_code,
2931  num_tables,
2932  allow_runtime_interrupt,
2933  join_hash_table_ptrs,
2934  render_allocator_map_ptr);
2935  output_memory_scope.reset(new OutVecOwner(out_vec));
2936  } catch (const OutOfMemory&) {
2937  return ERR_OUT_OF_GPU_MEM;
2938  } catch (const std::exception& e) {
2939  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2940  }
2941  }
2942  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2943  error_code == Executor::ERR_DIV_BY_ZERO ||
2944  error_code == Executor::ERR_OUT_OF_TIME ||
2945  error_code == Executor::ERR_INTERRUPTED ||
2947  error_code == Executor::ERR_GEOS) {
2948  return error_code;
2949  }
2950  if (ra_exe_unit.estimator) {
2951  CHECK(!error_code);
2952  results =
2953  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2954  return 0;
2955  }
2956  std::vector<int64_t> reduced_outs;
2957  const auto num_frags = col_buffers.size();
2958  const size_t entry_count =
2959  device_type == ExecutorDeviceType::GPU
2960  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2961  ? 1
2962  : blockSize() * gridSize() * num_frags)
2963  : num_frags;
2964  if (size_t(1) == entry_count) {
2965  for (auto out : out_vec) {
2966  CHECK(out);
2967  reduced_outs.push_back(*out);
2968  }
2969  } else {
2970  size_t out_vec_idx = 0;
2971 
2972  for (const auto target_expr : target_exprs) {
2973  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2974  CHECK(agg_info.is_agg);
2975 
2976  const int num_iterations = agg_info.sql_type.is_geometry()
2977  ? agg_info.sql_type.get_physical_coord_cols()
2978  : 1;
2979 
2980  for (int i = 0; i < num_iterations; i++) {
2981  int64_t val1;
2982  const bool float_argument_input = takes_float_argument(agg_info);
2983  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_MEDIAN) {
2984  CHECK(agg_info.agg_kind == kCOUNT ||
2985  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
2986  agg_info.agg_kind == kAPPROX_MEDIAN);
2987  val1 = out_vec[out_vec_idx][0];
2988  error_code = 0;
2989  } else {
2990  const auto chosen_bytes = static_cast<size_t>(
2991  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2992  std::tie(val1, error_code) = Executor::reduceResults(
2993  agg_info.agg_kind,
2994  agg_info.sql_type,
2995  query_exe_context->getAggInitValForIndex(out_vec_idx),
2996  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2997  out_vec[out_vec_idx],
2998  entry_count,
2999  false,
3000  float_argument_input);
3001  }
3002  if (error_code) {
3003  break;
3004  }
3005  reduced_outs.push_back(val1);
3006  if (agg_info.agg_kind == kAVG ||
3007  (agg_info.agg_kind == kSAMPLE &&
3008  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3009  const auto chosen_bytes = static_cast<size_t>(
3010  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3011  1));
3012  int64_t val2;
3013  std::tie(val2, error_code) = Executor::reduceResults(
3014  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3015  agg_info.sql_type,
3016  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3017  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3018  out_vec[out_vec_idx + 1],
3019  entry_count,
3020  false,
3021  false);
3022  if (error_code) {
3023  break;
3024  }
3025  reduced_outs.push_back(val2);
3026  ++out_vec_idx;
3027  }
3028  ++out_vec_idx;
3029  }
3030  }
3031  }
3032 
3033  if (error_code) {
3034  return error_code;
3035  }
3036 
3037  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3038  auto rows_ptr = std::shared_ptr<ResultSet>(
3039  query_exe_context->query_buffers_->result_sets_[0].release());
3040  rows_ptr->fillOneEntry(reduced_outs);
3041  results = std::move(rows_ptr);
3042  return error_code;
3043 }
3044 
3045 namespace {
3046 
3047 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3048  CHECK(scan_limit);
3049  return results && results->rowCount() < scan_limit;
3050 }
3051 
3052 } // namespace
3053 
3055  const RelAlgExecutionUnit& ra_exe_unit,
3056  const CompilationResult& compilation_result,
3057  const bool hoist_literals,
3058  ResultSetPtr& results,
3059  const ExecutorDeviceType device_type,
3060  std::vector<std::vector<const int8_t*>>& col_buffers,
3061  const std::vector<size_t> outer_tab_frag_ids,
3062  QueryExecutionContext* query_exe_context,
3063  const std::vector<std::vector<int64_t>>& num_rows,
3064  const std::vector<std::vector<uint64_t>>& frag_offsets,
3065  Data_Namespace::DataMgr* data_mgr,
3066  const int device_id,
3067  const int outer_table_id,
3068  const int64_t scan_limit,
3069  const uint32_t start_rowid,
3070  const uint32_t num_tables,
3071  const bool allow_runtime_interrupt,
3072  RenderInfo* render_info) {
3073  auto timer = DEBUG_TIMER(__func__);
3074  INJECT_TIMER(executePlanWithGroupBy);
3075  CHECK(!results);
3076  if (col_buffers.empty()) {
3077  return 0;
3078  }
3079  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3080  // TODO(alex):
3081  // 1. Optimize size (make keys more compact).
3082  // 2. Resize on overflow.
3083  // 3. Optimize runtime.
3084  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3085  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3086  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3087  if (allow_runtime_interrupt) {
3088  bool isInterrupted = false;
3089  {
3090  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3091  const auto query_session = getCurrentQuerySession(session_read_lock);
3092  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3093  }
3094  if (isInterrupted) {
3095  throw QueryExecutionError(ERR_INTERRUPTED);
3096  }
3097  }
3098  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3099  return ERR_INTERRUPTED;
3100  }
3101 
3102  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3103  if (render_info && render_info->useCudaBuffers()) {
3104  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3105  }
3106 
3107  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3108  << " ra_exe_unit.input_descs="
3109  << shared::printContainer(ra_exe_unit.input_descs)
3110  << " ra_exe_unit.input_col_descs="
3111  << shared::printContainer(ra_exe_unit.input_col_descs)
3112  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3113  << " num_rows=" << shared::printContainer(num_rows)
3114  << " frag_offsets=" << shared::printContainer(frag_offsets)
3115  << " query_exe_context->query_buffers_->num_rows_="
3116  << query_exe_context->query_buffers_->num_rows_
3117  << " query_exe_context->query_mem_desc_.getEntryCount()="
3118  << query_exe_context->query_mem_desc_.getEntryCount()
3119  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3120  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3121  << " num_tables=" << num_tables;
3122 
3123  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3124  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3125  // with outer_table_id.
3126  if (ra_exe_unit_copy.union_all) {
3127  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3128  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3129  ra_exe_unit_copy.input_descs.end(),
3130  [outer_table_id](auto const& a, auto const& b) {
3131  return a.getTableId() == outer_table_id &&
3132  b.getTableId() != outer_table_id;
3133  });
3134  while (!ra_exe_unit_copy.input_descs.empty() &&
3135  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3136  ra_exe_unit_copy.input_descs.pop_back();
3137  }
3138  // Filter ra_exe_unit_copy.input_col_descs.
3139  ra_exe_unit_copy.input_col_descs.remove_if(
3140  [outer_table_id](auto const& input_col_desc) {
3141  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3142  });
3143  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3144  }
3145 
3146  if (device_type == ExecutorDeviceType::CPU) {
3147  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3148  compilation_result.generated_code);
3149  CHECK(cpu_generated_code);
3150  query_exe_context->launchCpuCode(
3151  ra_exe_unit_copy,
3152  cpu_generated_code.get(),
3153  hoist_literals,
3154  hoist_buf,
3155  col_buffers,
3156  num_rows,
3157  frag_offsets,
3158  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3159  &error_code,
3160  num_tables,
3161  join_hash_table_ptrs);
3162  } else {
3163  try {
3164  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3165  compilation_result.generated_code);
3166  CHECK(gpu_generated_code);
3167  query_exe_context->launchGpuCode(
3168  ra_exe_unit_copy,
3169  gpu_generated_code.get(),
3170  hoist_literals,
3171  hoist_buf,
3172  col_buffers,
3173  num_rows,
3174  frag_offsets,
3175  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3176  data_mgr,
3177  blockSize(),
3178  gridSize(),
3179  device_id,
3180  compilation_result.gpu_smem_context.getSharedMemorySize(),
3181  &error_code,
3182  num_tables,
3183  allow_runtime_interrupt,
3184  join_hash_table_ptrs,
3185  render_allocator_map_ptr);
3186  } catch (const OutOfMemory&) {
3187  return ERR_OUT_OF_GPU_MEM;
3188  } catch (const OutOfRenderMemory&) {
3189  return ERR_OUT_OF_RENDER_MEM;
3190  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3191  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
3192  } catch (const std::exception& e) {
3193  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3194  }
3195  }
3196 
3197  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3198  error_code == Executor::ERR_DIV_BY_ZERO ||
3199  error_code == Executor::ERR_OUT_OF_TIME ||
3200  error_code == Executor::ERR_INTERRUPTED ||
3202  error_code == Executor::ERR_GEOS) {
3203  return error_code;
3204  }
3205 
3206  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3207  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3208  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3209  query_exe_context->query_mem_desc_);
3210  CHECK(results);
3211  VLOG(2) << "results->rowCount()=" << results->rowCount();
3212  results->holdLiterals(hoist_buf);
3213  }
3214  if (error_code < 0 && render_allocator_map_ptr) {
3215  auto const adjusted_scan_limit =
3216  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3217  // More rows passed the filter than available slots. We don't have a count to check,
3218  // so assume we met the limit if a scan limit is set
3219  if (adjusted_scan_limit != 0) {
3220  return 0;
3221  } else {
3222  return error_code;
3223  }
3224  }
3225  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3226  return error_code; // unlucky, not enough results and we ran out of slots
3227  }
3228 
3229  return 0;
3230 }
3231 
3232 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3233  const int device_id) {
3234  std::vector<int64_t> table_ptrs;
3235  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3236  for (auto hash_table : join_hash_tables) {
3237  if (!hash_table) {
3238  CHECK(table_ptrs.empty());
3239  return {};
3240  }
3241  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3242  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3243  }
3244  return table_ptrs;
3245 }
3246 
3247 void Executor::nukeOldState(const bool allow_lazy_fetch,
3248  const std::vector<InputTableInfo>& query_infos,
3249  const PlanState::DeletedColumnsMap& deleted_cols_map,
3250  const RelAlgExecutionUnit* ra_exe_unit) {
3251  kernel_queue_time_ms_ = 0;
3252  compilation_queue_time_ms_ = 0;
3253  const bool contains_left_deep_outer_join =
3254  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3255  ra_exe_unit->join_quals.end(),
3256  [](const JoinCondition& join_condition) {
3257  return join_condition.type == JoinType::LEFT;
3258  }) != ra_exe_unit->join_quals.end();
3259  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3260  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3261  query_infos,
3262  deleted_cols_map,
3263  this));
3264 }
3265 
3266 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3267  const std::vector<InputTableInfo>& query_infos) {
3268  AUTOMATIC_IR_METADATA(cgen_state_.get());
3269  const auto ld_count = input_descs.size();
3270  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3271  for (size_t i = 0; i < ld_count; ++i) {
3272  CHECK_LT(i, query_infos.size());
3273  const auto frag_count = query_infos[i].info.fragments.size();
3274  if (i > 0) {
3275  cgen_state_->frag_offsets_.push_back(nullptr);
3276  } else {
3277  if (frag_count > 1) {
3278  cgen_state_->frag_offsets_.push_back(
3279  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3280  } else {
3281  cgen_state_->frag_offsets_.push_back(nullptr);
3282  }
3283  }
3284  }
3285 }
3286 
3288  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3289  const std::vector<InputTableInfo>& query_infos,
3290  const MemoryLevel memory_level,
3291  const HashType preferred_hash_type,
3292  ColumnCacheMap& column_cache,
3293  const RegisteredQueryHint& query_hint) {
3294  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3295  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3296  }
3297  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3298  throw QueryExecutionError(ERR_INTERRUPTED);
3299  }
3300  try {
3301  auto tbl = HashJoin::getInstance(qual_bin_oper,
3302  query_infos,
3303  memory_level,
3304  preferred_hash_type,
3305  deviceCountForMemoryLevel(memory_level),
3306  column_cache,
3307  this,
3308  query_hint);
3309  return {tbl, ""};
3310  } catch (const HashJoinFail& e) {
3311  return {nullptr, e.what()};
3312  }
3313 }
3314 
3315 int8_t Executor::warpSize() const {
3316  CHECK(catalog_);
3317  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3318  CHECK(cuda_mgr);
3319  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3320  CHECK(!dev_props.empty());
3321  return dev_props.front().warpSize;
3322 }
3323 
3324 unsigned Executor::gridSize() const {
3325  CHECK(catalog_);
3326  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3327  if (!cuda_mgr) {
3328  return 0;
3329  }
3330  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3331 }
3332 
3333 unsigned Executor::numBlocksPerMP() const {
3334  CHECK(catalog_);
3335  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3336  CHECK(cuda_mgr);
3337  return grid_size_x_ ? std::ceil(grid_size_x_ / cuda_mgr->getMinNumMPsForAllDevices())
3338  : 2;
3339 }
3340 
3341 unsigned Executor::blockSize() const {
3342  CHECK(catalog_);
3343  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3344  if (!cuda_mgr) {
3345  return 0;
3346  }
3347  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3348  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3349 }
3350 
3352  return max_gpu_slab_size_;
3353 }
3354 
3355 int64_t Executor::deviceCycles(int milliseconds) const {
3356  CHECK(catalog_);
3357  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3358  CHECK(cuda_mgr);
3359  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3360  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3361 }
3362 
3363 llvm::Value* Executor::castToFP(llvm::Value* value,
3364  SQLTypeInfo const& from_ti,
3365  SQLTypeInfo const& to_ti) {
3366  AUTOMATIC_IR_METADATA(cgen_state_.get());
3367  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3368  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3369  llvm::Type* fp_type{nullptr};
3370  switch (to_ti.get_size()) {
3371  case 4:
3372  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3373  break;
3374  case 8:
3375  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3376  break;
3377  default:
3378  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3379  }
3380  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3381  if (from_ti.get_scale()) {
3382  value = cgen_state_->ir_builder_.CreateFDiv(
3383  value,
3384  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3385  }
3386  }
3387  return value;
3388 }
3389 
3390 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3391  AUTOMATIC_IR_METADATA(cgen_state_.get());
3392  CHECK(val->getType()->isPointerTy());
3393 
3394  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3395  const auto val_type = val_ptr_type->getElementType();
3396  size_t val_width = 0;
3397  if (val_type->isIntegerTy()) {
3398  val_width = val_type->getIntegerBitWidth();
3399  } else {
3400  if (val_type->isFloatTy()) {
3401  val_width = 32;
3402  } else {
3403  CHECK(val_type->isDoubleTy());
3404  val_width = 64;
3405  }
3406  }
3407  CHECK_LT(size_t(0), val_width);
3408  if (bitWidth == val_width) {
3409  return val;
3410  }
3411  return cgen_state_->ir_builder_.CreateBitCast(
3412  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3413 }
3414 
3415 #define EXECUTE_INCLUDE
3416 #include "ArrayOps.cpp"
3417 #include "DateAdd.cpp"
3418 #include "StringFunctions.cpp"
3419 #undef EXECUTE_INCLUDE
3420 
3421 namespace {
3423  const ColumnDescriptor* deleted_cd) {
3424  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3425  if (deleted_cols_it == deleted_cols_map.end()) {
3426  CHECK(
3427  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3428  } else {
3429  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3430  }
3431 }
3432 } // namespace
3433 
3434 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3435  const RelAlgExecutionUnit& ra_exe_unit,
3436  const CompilationOptions& co) {
3437  if (!co.filter_on_deleted_column) {
3438  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3439  }
3440  auto ra_exe_unit_with_deleted = ra_exe_unit;
3441  PlanState::DeletedColumnsMap deleted_cols_map;
3442  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3443  if (input_table.getSourceType() != InputSourceType::TABLE) {
3444  continue;
3445  }
3446  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3447  CHECK(td);
3448  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3449  if (!deleted_cd) {
3450  continue;
3451  }
3452  CHECK(deleted_cd->columnType.is_boolean());
3453  // check deleted column is not already present
3454  bool found = false;
3455  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3456  if (input_col.get()->getColId() == deleted_cd->columnId &&
3457  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3458  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3459  found = true;
3460  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3461  break;
3462  }
3463  }
3464  if (!found) {
3465  // add deleted column
3466  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3467  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3468  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3469  }
3470  }
3471  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3472 }
3473 
3474 namespace {
3475 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3476 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3477 // return type.
3478 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3479  const int64_t chunk_min,
3480  const int64_t chunk_max,
3481  const SQLTypeInfo& lhs_type,
3482  const SQLTypeInfo& rhs_type) {
3483  const int32_t ldim = lhs_type.get_dimension();
3484  const int32_t rdim = rhs_type.get_dimension();
3485  CHECK(ldim != rdim);
3486  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3487  if (ldim > rdim) {
3488  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3489  return {true, chunk_min / scale, chunk_max / scale};
3490  }
3491 
3492  using checked_int64_t = boost::multiprecision::number<
3493  boost::multiprecision::cpp_int_backend<64,
3494  64,
3495  boost::multiprecision::signed_magnitude,
3496  boost::multiprecision::checked,
3497  void>>;
3498 
3499  try {
3500  auto ret =
3501  std::make_tuple(true,
3502  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3503  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3504  return ret;
3505  } catch (const std::overflow_error& e) {
3506  // noop
3507  }
3508  return std::make_tuple(false, chunk_min, chunk_max);
3509 }
3510 
3511 } // namespace
3512 
3514  const int table_id,
3515  const Fragmenter_Namespace::FragmentInfo& fragment) {
3516  // Skip temporary tables
3517  if (table_id < 0) {
3518  return false;
3519  }
3520 
3521  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
3522  CHECK(td);
3523  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3524  if (!deleted_cd) {
3525  return false;
3526  }
3527 
3528  const auto& chunk_type = deleted_cd->columnType;
3529  CHECK(chunk_type.is_boolean());
3530 
3531  const auto deleted_col_id = deleted_cd->columnId;
3532  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
3533  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
3534  const int64_t chunk_min =
3535  extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3536  const int64_t chunk_max =
3537  extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3538  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
3539  // is true (signifying all rows deleted)
3540  return true;
3541  }
3542  }
3543  return false;
3544 }
3545 
3546 std::pair<bool, int64_t> Executor::skipFragment(
3547  const InputDescriptor& table_desc,
3548  const Fragmenter_Namespace::FragmentInfo& fragment,
3549  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3550  const std::vector<uint64_t>& frag_offsets,
3551  const size_t frag_idx) {
3552  const int table_id = table_desc.getTableId();
3553 
3554  // First check to see if all of fragment is deleted, in which case we know we can skip
3555  if (isFragmentFullyDeleted(table_id, fragment)) {
3556  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
3557  << ", fragment id: " << frag_idx;
3558  return {true, -1};
3559  }
3560 
3561  for (const auto& simple_qual : simple_quals) {
3562  const auto comp_expr =
3563  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3564  if (!comp_expr) {
3565  // is this possible?
3566  return {false, -1};
3567  }
3568  const auto lhs = comp_expr->get_left_operand();
3569  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3570  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3571  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3572  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3573  if (lhs_uexpr) {
3574  CHECK(lhs_uexpr->get_optype() ==
3575  kCAST); // We should have only been passed a cast expression
3576  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3577  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3578  continue;
3579  }
3580  } else {
3581  continue;
3582  }
3583  }
3584  const auto rhs = comp_expr->get_right_operand();
3585  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3586  if (!rhs_const) {
3587  // is this possible?
3588  return {false, -1};
3589  }
3590  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3591  continue;
3592  }
3593  const int col_id = lhs_col->get_column_id();
3594  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3595  int64_t chunk_min{0};
3596  int64_t chunk_max{0};
3597  bool is_rowid{false};
3598  size_t start_rowid{0};
3599  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3600  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3601  if (cd->isVirtualCol) {
3602  CHECK(cd->columnName == "rowid");
3603  const auto& table_generation = getTableGeneration(table_id);
3604  start_rowid = table_generation.start_rowid;
3605  chunk_min = frag_offsets[frag_idx] + start_rowid;
3606  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3607  is_rowid = true;
3608  }
3609  } else {
3610  const auto& chunk_type = lhs_col->get_type_info();
3611  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3612  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3613  }
3614  if (chunk_min > chunk_max) {
3615  // invalid metadata range, do not skip fragment
3616  return {false, -1};
3617  }
3618  if (lhs->get_type_info().is_timestamp() &&
3619  (lhs_col->get_type_info().get_dimension() !=
3620  rhs_const->get_type_info().get_dimension()) &&
3621  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3622  rhs_const->get_type_info().is_high_precision_timestamp())) {
3623  // If original timestamp lhs col has different precision,
3624  // column metadata holds value in original precision
3625  // therefore adjust rhs value to match lhs precision
3626 
3627  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3628  // artificially limit the lhs column range. RHS overflow/underflow is already
3629  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3630  bool is_valid;
3631  std::tie(is_valid, chunk_min, chunk_max) =
3633  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3634  if (!is_valid) {
3635  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3636  "value: "
3637  << std::to_string(chunk_min)
3638  << "\nChunk max value: " << std::to_string(chunk_max)
3639  << "\nLHS col precision is: "
3640  << std::to_string(lhs_col->get_type_info().get_dimension())
3641  << "\nRHS precision is: "
3642  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3643  return {false, -1};
3644  }
3645  }
3646  llvm::LLVMContext local_context;
3647  CgenState local_cgen_state(local_context);
3648  CodeGenerator code_generator(&local_cgen_state, nullptr);
3649 
3650  const auto rhs_val =
3651  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3652 
3653  switch (comp_expr->get_optype()) {
3654  case kGE:
3655  if (chunk_max < rhs_val) {
3656  return {true, -1};
3657  }
3658  break;
3659  case kGT:
3660  if (chunk_max <= rhs_val) {
3661  return {true, -1};
3662  }
3663  break;
3664  case kLE:
3665  if (chunk_min > rhs_val) {
3666  return {true, -1};
3667  }
3668  break;
3669  case kLT:
3670  if (chunk_min >= rhs_val) {
3671  return {true, -1};
3672  }
3673  break;
3674  case kEQ:
3675  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3676  return {true, -1};
3677  } else if (is_rowid) {
3678  return {false, rhs_val - start_rowid};
3679  }
3680  break;
3681  default:
3682  break;
3683  }
3684  }
3685  return {false, -1};
3686 }
3687 
3688 /*
3689  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3690  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3691  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3692  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3693  * if at least one of these skipFragment calls return a true statment in its first value.
3694  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3695  * second value only if its first value is "false".
3696  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3697  * i.e., we only skip if none of the quals trigger the code to update the
3698  * rowid_lookup_key
3699  * - Only AND operations are valid and considered:
3700  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3701  * the skip
3702  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3703  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3704  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3705  * implemented differently, which cause the first one to skip correctly, but the second
3706  * one will not skip.
3707  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3708  * possible
3709  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3710  * projection, no skipping
3711  */
3712 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3713  const InputDescriptor& table_desc,
3714  const RelAlgExecutionUnit& ra_exe_unit,
3715  const Fragmenter_Namespace::FragmentInfo& fragment,
3716  const std::vector<uint64_t>& frag_offsets,
3717  const size_t frag_idx) {
3718  std::pair<bool, int64_t> skip_frag{false, -1};
3719  for (auto& inner_join : ra_exe_unit.join_quals) {
3720  if (inner_join.type != JoinType::INNER) {
3721  continue;
3722  }
3723 
3724  // extracting all the conjunctive simple_quals from the quals stored for the inner
3725  // join
3726  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3727  for (auto& qual : inner_join.quals) {
3728  auto temp_qual = qual_to_conjunctive_form(qual);
3729  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3730  temp_qual.simple_quals.begin(),
3731  temp_qual.simple_quals.end());
3732  }
3733  auto temp_skip_frag = skipFragment(
3734  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3735  if (temp_skip_frag.second != -1) {
3736  skip_frag.second = temp_skip_frag.second;
3737  return skip_frag;
3738  } else {
3739  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3740  }
3741  }
3742  return skip_frag;
3743 }
3744 
3746  const std::unordered_set<PhysicalInput>& phys_inputs) {
3747  AggregatedColRange agg_col_range_cache;
3748  CHECK(catalog_);
3749  std::unordered_set<int> phys_table_ids;
3750  for (const auto& phys_input : phys_inputs) {
3751  phys_table_ids.insert(phys_input.table_id);
3752  }
3753  std::vector<InputTableInfo> query_infos;
3754  for (const int table_id : phys_table_ids) {
3755  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3756  }
3757  for (const auto& phys_input : phys_inputs) {
3758  const auto cd =
3759  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3760  CHECK(cd);
3761  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3762  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3763  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3764  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3765  agg_col_range_cache.setColRange(phys_input, col_range);
3766  }
3767  }
3768  return agg_col_range_cache;
3769 }
3770 
3772  const std::unordered_set<PhysicalInput>& phys_inputs) {
3773  StringDictionaryGenerations string_dictionary_generations;
3774  CHECK(catalog_);
3775  for (const auto& phys_input : phys_inputs) {
3776  const auto cd =
3777  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3778  CHECK(cd);
3779  const auto& col_ti =
3780  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3781  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3782  const int dict_id = col_ti.get_comp_param();
3783  const auto dd = catalog_->getMetadataForDict(dict_id);
3784  CHECK(dd && dd->stringDict);
3785  string_dictionary_generations.setGeneration(dict_id,
3786  dd->stringDict->storageEntryCount());
3787  }
3788  }
3789  return string_dictionary_generations;
3790 }
3791 
3793  std::unordered_set<int> phys_table_ids) {
3794  TableGenerations table_generations;
3795  for (const int table_id : phys_table_ids) {
3796  const auto table_info = getTableInfo(table_id);
3797  table_generations.setGeneration(
3798  table_id,
3799  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3800  }
3801  return table_generations;
3802 }
3803 
3804 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3805  const std::unordered_set<int>& phys_table_ids) {
3806  CHECK(catalog_);
3807  row_set_mem_owner_ =
3808  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
3809  row_set_mem_owner_->setDictionaryGenerations(
3810  computeStringDictionaryGenerations(phys_inputs));
3811  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3812  table_generations_ = computeTableGenerations(phys_table_ids);
3813 }
3814 
3816  return executor_session_mutex_;
3817 }
3818 
3820  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3821  return current_query_session_;
3822 }
3823 
3825  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3826  if (!query_session.empty()) {
3827  current_query_session_ = query_session;
3828  }
3829 }
3830 
3831 void Executor::setRunningExecutorId(const size_t id,
3832  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3833  running_query_executor_id_ = id;
3834 }
3835 
3836 size_t Executor::getRunningExecutorId(mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3837  return running_query_executor_id_;
3838 }
3839 
3840 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
3841  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3842  // if current_query_session is equal to the candidate_query_session,
3843  // or it is empty session we consider
3844  return !candidate_query_session.empty() &&
3845  (current_query_session_ == candidate_query_session);
3846 }
3847 
3849  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3850  current_query_session_ = "";
3851  running_query_executor_id_ = Executor::UNITARY_EXECUTOR_ID;
3852 }
3853 
3855  const QuerySessionId& query_session_id,
3856  const std::string& query_str,
3857  const std::string& query_submitted_time) {
3858  if (!query_session_id.empty()) {
3859  // if session is valid, do update 1) the exact executor id and 2) query status
3860  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3861  updateQuerySessionExecutorAssignment(
3862  query_session_id, query_submitted_time, executor_id_, write_lock);
3863  updateQuerySessionStatusWithLock(query_session_id,
3864  query_submitted_time,
3865  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3866  write_lock);
3867  }
3868  return {query_session_id, query_str};
3869 }
3870 
3872  // check whether we are okay to execute the "pending" query
3873  // i.e., before running the query check if this query session is "ALREADY" interrupted
3874  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3875  if (query_session.empty()) {
3876  return;
3877  }
3878  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3879  // something goes wrong since we assume this is caller's responsibility
3880  // (call this function only for enrolled query session)
3881  if (!queries_session_map_.count(query_session)) {
3882  VLOG(1) << "Interrupting pending query is not available since the query session is "
3883  "not enrolled";
3884  } else {
3885  // here the query session is enrolled but the interrupt flag is not registered
3886  VLOG(1)
3887  << "Interrupting pending query is not available since its interrupt flag is "
3888  "not registered";
3889  }
3890  return;
3891  }
3892  if (queries_interrupt_flag_[query_session]) {
3894  }
3895 }
3896 
3898  const std::string& submitted_time_str,
3899  const bool acquire_spin_lock) {
3900  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3901  // clear the interrupt-related info for a finished query
3902  if (query_session.empty()) {
3903  return;
3904  }
3905  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
3906  if (query_session.compare(current_query_session_) == 0 &&
3907  running_query_executor_id_ == executor_id_) {
3908  invalidateRunningQuerySession(session_write_lock);
3909  if (acquire_spin_lock) {
3910  // try to unlock executor's internal spin lock (let say "L") iff it is acquired
3911  // otherwise we do not need to care about the "L" lock
3912  // i.e., import table does not have a code path towards Executor
3913  // so we just exploit executor's session management code and also global interrupt
3914  // flag excepting this "L" lock
3915  execute_spin_lock_.clear(std::memory_order_release);
3916  }
3917  resetInterrupt();
3918  }
3919 }
3920 
3922  std::shared_ptr<const query_state::QueryState>& query_state,
3923  const QuerySessionStatus::QueryStatus new_query_status) {
3924  // update the running query session's the current status
3925  if (query_state) {
3926  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3927  auto query_session = query_state->getConstSessionInfo()->get_session_id();
3928  if (query_session.empty()) {
3929  return;
3930  }
3931  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3932  current_query_session_ = query_session;
3933  running_query_executor_id_ = executor_id_;
3934  }
3935  updateQuerySessionStatusWithLock(query_session,
3936  query_state->getQuerySubmittedTime(),
3937  new_query_status,
3938  session_write_lock);
3939  }
3940 }
3941 
3943  const QuerySessionId& query_session,
3944  const std::string& submitted_time_str,
3945  const QuerySessionStatus::QueryStatus new_query_status) {
3946  // update the running query session's the current status
3947  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3948  if (query_session.empty()) {
3949  return;
3950  }
3951  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3952  current_query_session_ = query_session;
3953  running_query_executor_id_ = executor_id_;
3954  }
3955  updateQuerySessionStatusWithLock(
3956  query_session, submitted_time_str, new_query_status, session_write_lock);
3957 }
3958 
3960  const QuerySessionId& query_session,
3961  const std::string& query_str,
3962  const std::string& submitted_time_str,
3963  const size_t executor_id,
3964  const QuerySessionStatus::QueryStatus query_session_status) {
3965  // enroll the query session into the Executor's session map
3966  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3967  if (query_session.empty()) {
3968  return;
3969  }
3970 
3971  addToQuerySessionList(query_session,
3972  query_str,
3973  submitted_time_str,
3974  executor_id,
3975  query_session_status,
3976  session_write_lock);
3977 
3978  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING) {
3979  current_query_session_ = query_session;
3980  running_query_executor_id_ = executor_id_;
3981  }
3982 }
3983 
3985  const std::string& query_str,
3986  const std::string& submitted_time_str,
3987  const size_t executor_id,
3988  const QuerySessionStatus::QueryStatus query_status,
3989  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3990  // an internal API that enrolls the query session into the Executor's session map
3991  if (queries_session_map_.count(query_session)) {
3992  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
3993  queries_session_map_.at(query_session).erase(submitted_time_str);
3994  queries_session_map_.at(query_session)
3995  .emplace(submitted_time_str,
3996  QuerySessionStatus(query_session,
3997  executor_id,
3998  query_str,
3999  submitted_time_str,
4000  query_status));
4001  } else {
4002  queries_session_map_.at(query_session)
4003  .emplace(submitted_time_str,
4004  QuerySessionStatus(query_session,
4005  executor_id,
4006  query_str,
4007  submitted_time_str,
4008  query_status));
4009  }
4010  } else {
4011  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4012  executor_per_query_map.emplace(
4013  submitted_time_str,
4015  query_session, executor_id, query_str, submitted_time_str, query_status));
4016  queries_session_map_.emplace(query_session, executor_per_query_map);
4017  }
4018  return queries_interrupt_flag_.emplace(query_session, false).second;
4019 }
4020 
4022  const QuerySessionId& query_session,
4023  const std::string& submitted_time_str,
4024  const QuerySessionStatus::QueryStatus updated_query_status,
4025  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4026  // an internal API that updates query session status
4027  if (query_session.empty()) {
4028  return false;
4029  }
4030  if (queries_session_map_.count(query_session)) {
4031  for (auto& query_status : queries_session_map_.at(query_session)) {
4032  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4033  // no time difference --> found the target query status
4034  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4035  auto prev_status = query_status.second.getQueryStatus();
4036  if (prev_status == updated_query_status) {
4037  return false;
4038  }
4039  query_status.second.setQueryStatus(updated_query_status);
4040  return true;
4041  }
4042  }
4043  }
4044  return false;
4045 }
4046 
4048  const QuerySessionId& query_session,
4049  const std::string& submitted_time_str,
4050  const size_t executor_id,
4051  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4052  // update the executor id of the query session
4053  if (query_session.empty()) {
4054  return false;
4055  }
4056  if (queries_session_map_.count(query_session)) {
4057  auto storage = queries_session_map_.at(query_session);
4058  for (auto it = storage.begin(); it != storage.end(); it++) {
4059  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4060  // no time difference --> found the target query status
4061  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4062  queries_session_map_.at(query_session)
4063  .at(submitted_time_str)
4064  .setExecutorId(executor_id);
4065  return true;
4066  }
4067  }
4068  }
4069  return false;
4070 }
4071 
4073  const QuerySessionId& query_session,
4074  const std::string& submitted_time_str,
4075  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4076  if (query_session.empty()) {
4077  return false;
4078  }
4079  if (queries_session_map_.count(query_session)) {
4080  auto& storage = queries_session_map_.at(query_session);
4081  if (storage.size() > 1) {
4082  // in this case we only remove query executor info
4083  for (auto it = storage.begin(); it != storage.end(); it++) {
4084  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4085  // no time difference && have the same executor id--> found the target query
4086  if (it->second.getExecutorId() == executor_id_ &&
4087  submitted_time_str.compare(target_submitted_t_str) == 0) {
4088  storage.erase(it);
4089  return true;
4090  }
4091  }
4092  } else if (storage.size() == 1) {
4093  // here this session only has a single query executor
4094  // so we clear both executor info and its interrupt flag
4095  queries_session_map_.erase(query_session);
4096  queries_interrupt_flag_.erase(query_session);
4097  if (interrupted_.load()) {
4098  interrupted_.store(false);
4099  VLOG(1) << "RESET Executor " << this << " that had previously been interrupted";
4100  }
4101  return true;
4102  }
4103  }
4104  return false;
4105 }
4106 
4108  const QuerySessionId& query_session,
4109  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4110  if (query_session.empty()) {
4111  return;
4112  }
4113  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4114  queries_interrupt_flag_[query_session] = true;
4115  }
4116 }
4117 
4119  const std::string& query_session,
4120  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4121  if (query_session.empty()) {
4122  return;
4123  }
4124  queries_interrupt_flag_[query_session] = false;
4125 }
4126 
4128  const QuerySessionId& query_session,
4129  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4130  if (query_session.empty()) {
4131  return false;
4132  }
4133  auto flag_it = queries_interrupt_flag_.find(query_session);
4134  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4135  flag_it->second;
4136 }
4137 
4139  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4140  return checkIsQuerySessionInterrupted(current_query_session_, session_read_lock);
4141 }
4142 
4144  const QuerySessionId& query_session,
4145  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4146  if (query_session.empty()) {
4147  return false;
4148  }
4149  return !query_session.empty() && queries_session_map_.count(query_session);
4150 }
4151 
4153  const double runtime_query_check_freq,
4154  const unsigned pending_query_check_freq) const {
4155  // The only one scenario that we intentionally call this function is
4156  // to allow runtime query interrupt in QueryRunner for test cases.
4157  // Because test machine's default setting does not allow runtime query interrupt,
4158  // so we have to turn it on within test code if necessary.
4160  g_pending_query_interrupt_freq = pending_query_check_freq;
4161  g_running_query_interrupt_freq = runtime_query_check_freq;
4164  }
4165 }
4166 
4167 void Executor::addToCardinalityCache(const std::string& cache_key,
4168  const size_t cache_value) {
4170  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4171  cardinality_cache_[cache_key] = cache_value;
4172  VLOG(1) << "Put estimated cardinality to the cache";
4173  }
4174 }
4175 
4177  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4179  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4180  VLOG(1) << "Reuse cached cardinality";
4181  return {true, cardinality_cache_[cache_key]};
4182  }
4183  return {false, -1};
4184 }
4185 
4186 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4187  const QuerySessionId& query_session,
4188  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4189  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4190  auto& query_infos = queries_session_map_.at(query_session);
4191  std::vector<QuerySessionStatus> ret;
4192  for (auto& info : query_infos) {
4193  ret.push_back(QuerySessionStatus(query_session,
4194  info.second.getExecutorId(),
4195  info.second.getQueryStr(),
4196  info.second.getQuerySubmittedTime(),
4197  info.second.getQueryStatus()));
4198  }
4199  return ret;
4200  }
4201  return {};
4202 }
4203 
4204 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4205 
4206 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
4207 // current running query's session ID
4208 std::string Executor::current_query_session_{""};
4209 // running executor's id
4211 // contain the interrupt flag's status per query session
4213 // contain a list of queries per query session
4215 // session lock
4217 
4220 
4223 void* Executor::gpu_active_modules_[max_gpu_count];
4224 std::atomic<bool> Executor::interrupted_{false};
4225 
4226 std::mutex Executor::compilation_mutex_;
4227 std::mutex Executor::kernel_mutex_;
4228 
4230 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4021
int get_table_id() const
Definition: Analyzer.h:194
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3819
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1597
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3745
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4152
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2046
bool g_enable_left_join_filter_hoisting
Definition: Execute.cpp:94
void resetQuerySessionInterruptFlag(const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4118
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:90
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2309
static QuerySessionMap queries_session_map_
Definition: Execute.h:1076
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1083
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3351
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:101
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:3054
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
ExecutorDeviceType getDeviceType() const
double g_bump_allocator_step_reduction
Definition: Execute.cpp:110
std::string cat(Ts &&...args)
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3848
block_size_x_(block_size_x)
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:3871
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1229
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:161
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1154
grid_size_x_(grid_size_x)
bool g_strip_join_covered_quals
Definition: Execute.cpp:100
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:286
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2205
bool g_enable_direct_columnarization
Definition: Execute.cpp:111
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3478
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1088
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1680
std::string toString() const
bool g_enable_lazy_fetch
Definition: Execute.cpp:113
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1034
static const int max_gpu_count
Definition: Execute.h:1027
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2830
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
int64_t float_to_double_bin(int32_t val, bool nullable=false)
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:150
#define LOG(tag)
Definition: Logger.h:194
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
std::vector< size_t > outer_fragment_indices
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1079
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:493
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:80
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1100
Definition: sqldefs.h:35
int64_t getGeneration(const uint32_t id) const
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:336
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:384
Definition: sqldefs.h:36
tuple r
Definition: test_fsi.py:16
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1201
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3434
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3792
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:247
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:216
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:662
Definition: sqldefs.h:49
Definition: sqldefs.h:30
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:324
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3840
void setRunningExecutorId(const size_t id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3831
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:92
static const int32_t ERR_GEOS
Definition: Execute.h:1122
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1032
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:160
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1758
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:3959
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1058
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:83
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1031
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2789
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:645
bool g_is_test_env
Definition: Execute.cpp:130
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3390
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:306
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1125
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3333
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:494
#define CHECK_GT(x, y)
Definition: Logger.h:215
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:256
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4186
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1949
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:495
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2386
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:357
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4127
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3815
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:96
gpu_code_cache_(code_cache_size)
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:185
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:85
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:294
cpu_code_cache_(code_cache_size)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:452
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:127
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:858
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:163
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3232
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3266
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:3513
bool g_null_div_by_zero
Definition: Execute.cpp:82
bool checkIsRunningQuerySessionInterrupted()
Definition: Execute.cpp:4138
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4143
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2566
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:193
int8_t warpSize() const
Definition: Execute.cpp:3315
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:81
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1007
bool g_enable_columnar_output
Definition: Execute.cpp:93
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1855
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:267
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1108
bool is_fixlen_array() const
Definition: sqltypes.h:499
static SysCatalog & instance()
Definition: SysCatalog.h:292
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3984
bool g_from_table_reordering
Definition: Execute.cpp:84
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:97
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
int getDeviceCount() const
Definition: CudaMgr.h:86
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1046
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:221
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3355
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
bool is_integer() const
Definition: sqltypes.h:491
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:388
#define CHECK_NE(x, y)
Definition: Logger.h:212
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4107
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:225
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:900
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1179
std::shared_timed_mutex mapd_shared_mutex
SortAlgorithm
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1970
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:4176
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:282
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2318
ExecutorExplainType explain_type
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:400
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1743
size_t g_big_group_threshold
Definition: Execute.cpp:102
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1114
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1105
float g_filter_push_down_high_frac
Definition: Execute.cpp:91
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1074
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1115
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:298
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:3363
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:989
size_t g_max_memory_allocation_size
Definition: Execute.cpp:105
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:705
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:99
size_t g_approx_quantile_buffer
Definition: Execute.cpp:133
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1108
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:261
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:496
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:98
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1121
StringDictionaryGenerations string_dictionary_generations_
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1078
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:395
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const RegisteredQueryHint &query_hint)
Definition: Execute.cpp:3287
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1330
QueryDescriptionType getQueryDescriptionType() const
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:278
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
executor_id_(executor_id)
bool g_enable_window_functions
Definition: Execute.cpp:103
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4047
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1214
InputSourceType getSourceType() const
size_t g_min_memory_allocation_size
Definition: Execute.cpp:106
#define CHECK_LT(x, y)
Definition: Logger.h:213
Definition: sqltypes.h:51
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:872
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:255
static QuerySessionId current_query_session_
Definition: Execute.h:1070
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:85
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2744
size_t ExecutorId
Definition: Execute.h:365
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
#define CHECK_LE(x, y)
Definition: Logger.h:214
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:383
quantile::TDigest * nullTDigest()
Definition: Execute.cpp:248
Speculative top N algorithm.
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:124
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3546
bool g_enable_experimental_string_functions
bool g_enable_automatic_ir_metadata
Definition: Execute.cpp:136
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3324
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:316
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2337
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1899
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3771
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:79
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:2405
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:486
void updateQuerySessionStatus(std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:3921
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:2842
bool g_cache_string_hash
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3247
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3047
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3712
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2758
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1813
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:537
ThreadId thread_id()
Definition: Logger.cpp:732
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1100
bool g_enable_filter_push_down
Definition: Execute.cpp:89
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:116
bool g_enable_bump_allocator
Definition: Execute.cpp:109
size_t getRunningExecutorId(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3836
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4072
std::string QuerySessionId
Definition: Execute.h:79
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:947
RegisteredQueryHint query_hint
void set_notnull(bool n)
Definition: sqltypes.h:411
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:4167
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static std::atomic< bool > interrupted_
Definition: Execute.h:1033
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const bool acquire_spin_lock)
Definition: Execute.cpp:3897
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:1124
bool g_allow_cpu_retry
Definition: Execute.cpp:81
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
Definition: sqldefs.h:33
size_t g_approx_quantile_centroids
Definition: Execute.cpp:134
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2431
static size_t running_query_executor_id_
Definition: Execute.h:1072
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1417
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2020
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1722
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:932
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:144
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:302
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
Definition: Execute.cpp:3854
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
bool g_optimize_row_initialization
Definition: Execute.cpp:95
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:80
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:195
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:489
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
unsigned blockSize() const
Definition: Execute.cpp:3341
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:80
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
bool g_use_tbb_pool
Definition: Execute.cpp:78
bool is_decimal() const
Definition: sqltypes.h:492
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:553
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3422
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:655
void setCurrentQuerySession(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3824
temporary_tables_(nullptr)
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:210
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1350
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2035
int getNestLevel() const
HashType
Definition: HashTable.h:19
const InputDescriptor & getScanDesc() const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:245
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:114
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1104
#define VLOG(n)
Definition: Logger.h:297
Type timer_start()
Definition: measure.h:42
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1030
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3804
void clearMetaInfoCache()
Definition: Execute.cpp:377
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:192
bool g_enable_table_functions
Definition: Execute.cpp:104
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366
size_t g_gpu_smem_threshold
Definition: Execute.cpp:119
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2247
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1718
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)