OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "CodeGenerator.h"
21 #include "ColumnFetcher.h"
24 #include "DynamicWatchdog.h"
25 #include "EquiJoinCondition.h"
26 #include "ErrorHandling.h"
27 #include "ExpressionRewrite.h"
29 #include "GpuMemUtils.h"
30 #include "InPlaceSort.h"
33 #include "JsonAccessors.h"
36 #include "QueryRewrite.h"
37 #include "QueryTemplateGenerator.h"
38 #include "ResultSetReductionJIT.h"
39 #include "RuntimeFunctions.h"
40 #include "SpeculativeTopN.h"
41 
44 
45 #include "CudaMgr/CudaMgr.h"
47 #include "Parser/ParserNode.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/misc.h"
53 #include "Shared/scope.h"
54 #include "Shared/shard_key.h"
55 #include "Shared/threadpool.h"
56 
57 #include "AggregatedColRange.h"
59 
60 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
61 #include <boost/filesystem/operations.hpp>
62 #include <boost/filesystem/path.hpp>
63 
64 #ifdef HAVE_CUDA
65 #include <cuda.h>
66 #endif // HAVE_CUDA
67 #include <chrono>
68 #include <ctime>
69 #include <future>
70 #include <iostream>
71 #include <memory>
72 #include <numeric>
73 #include <set>
74 #include <thread>
75 
76 bool g_enable_watchdog{false};
78 bool g_use_tbb_pool{false};
81 bool g_allow_cpu_retry{true};
82 bool g_null_div_by_zero{false};
86 extern bool g_enable_smem_group_by;
87 extern std::unique_ptr<llvm::Module> udf_gpu_module;
88 extern std::unique_ptr<llvm::Module> udf_cpu_module;
98 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
102 size_t g_big_group_threshold{20000};
105 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
107  256}; // minimum memory allocation required for projection query output buffer
108  // without pre-flight count
120  4096}; // GPU shared memory threshold (in bytes), if larger
121  // buffer sizes are required we do not use GPU shared
122  // memory optimizations Setting this to 0 means unlimited
123  // (subject to other dynamically calculated caps)
125  true}; // enable use of shared memory when performing group-by with select non-count
126  // aggregates
128  true}; // enable optimizations for using GPU shared memory in implementation of
129  // non-grouped aggregates
130 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
131  // limits the allocation for the output buffer arena
132 
135 
136 extern bool g_cache_string_hash;
137 
138 int const Executor::max_gpu_count;
139 
141 
142 Executor::Executor(const ExecutorId executor_id,
143  const size_t block_size_x,
144  const size_t grid_size_x,
145  const size_t max_gpu_slab_size,
146  const std::string& debug_dir,
147  const std::string& debug_file)
148  : cgen_state_(new CgenState({}, false))
149  , cpu_code_cache_(code_cache_size)
150  , gpu_code_cache_(code_cache_size)
151  , block_size_x_(block_size_x)
152  , grid_size_x_(grid_size_x)
153  , max_gpu_slab_size_(max_gpu_slab_size)
154  , debug_dir_(debug_dir)
155  , debug_file_(debug_file)
156  , executor_id_(executor_id)
157  , catalog_(nullptr)
158  , temporary_tables_(nullptr)
160 
161 std::shared_ptr<Executor> Executor::getExecutor(
162  const ExecutorId executor_id,
163  const std::string& debug_dir,
164  const std::string& debug_file,
165  const SystemParameters& system_parameters) {
166  INJECT_TIMER(getExecutor);
167 
168  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
169  auto it = executors_.find(executor_id);
170  if (it != executors_.end()) {
171  return it->second;
172  }
173  auto executor = std::make_shared<Executor>(executor_id,
174  system_parameters.cuda_block_size,
175  system_parameters.cuda_grid_size,
176  system_parameters.max_gpu_slab_size,
177  debug_dir,
178  debug_file);
179  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
180  return executor;
181 }
182 
184  switch (memory_level) {
187  mapd_unique_lock<mapd_shared_mutex> flush_lock(
188  execute_mutex_); // Don't flush memory while queries are running
189 
191  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
192  // The hash table cache uses CPU memory not managed by the buffer manager. In the
193  // future, we should manage these allocations with the buffer manager directly.
194  // For now, assume the user wants to purge the hash table cache when they clear
195  // CPU memory (currently used in ExecuteTest to lower memory pressure)
197  }
198  break;
199  }
200  default: {
201  throw std::runtime_error(
202  "Clearing memory levels other than the CPU level or GPU level is not "
203  "supported.");
204  }
205  }
206 }
207 
209  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
210 }
211 
213  const int dict_id_in,
214  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
215  const bool with_generation) const {
216  CHECK(row_set_mem_owner);
217  std::lock_guard<std::mutex> lock(
218  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
219  return row_set_mem_owner->getOrAddStringDictProxy(
220  dict_id_in, with_generation, catalog_);
221 }
222 
224  const int dict_id_in,
225  const bool with_generation,
226  const Catalog_Namespace::Catalog* catalog) {
227  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
228  CHECK(catalog);
229  const auto dd = catalog->getMetadataForDict(dict_id);
230  if (dd) {
231  CHECK(dd->stringDict);
232  CHECK_LE(dd->dictNBits, 32);
233  const int64_t generation =
234  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
235  return addStringDict(dd->stringDict, dict_id, generation);
236  }
237  CHECK_EQ(0, dict_id);
238  if (!lit_str_dict_proxy_) {
239  std::shared_ptr<StringDictionary> tsd =
240  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
241  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
242  }
243  return lit_str_dict_proxy_.get();
244 }
245 
247  std::lock_guard<std::mutex> lock(state_mutex_);
248  return t_digests_
249  .emplace_back(std::make_unique<quantile::TDigest>(
251  .get();
252 }
253 
254 bool Executor::isCPUOnly() const {
255  CHECK(catalog_);
256  return !catalog_->getDataMgr().getCudaMgr();
257 }
258 
260  const Analyzer::ColumnVar* col_var) const {
262  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
263 }
264 
266  const Analyzer::ColumnVar* col_var,
267  int n) const {
268  const auto cd = getColumnDescriptor(col_var);
269  if (!cd || n > cd->columnType.get_physical_cols()) {
270  return nullptr;
271  }
273  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
274 }
275 
277  return catalog_;
278 }
279 
281  catalog_ = catalog;
282 }
283 
284 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
285  return row_set_mem_owner_;
286 }
287 
289  return temporary_tables_;
290 }
291 
293  return input_table_info_cache_.getTableInfo(table_id);
294 }
295 
296 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
297  return table_generations_.getGeneration(table_id);
298 }
299 
301  return agg_col_range_cache_.getColRange(phys_input);
302 }
303 
304 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
305  size_t num_bytes = 0;
306  if (!plan_state_) {
307  return 0;
308  }
309  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
310  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
311  continue;
312  }
313 
314  if (fetched_col_pair.first < 0) {
315  num_bytes += 8;
316  } else {
317  const auto cd =
318  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
319  const auto& ti = cd->columnType;
320  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
321  ? 4
322  : ti.get_size();
323  if (sz < 0) {
324  // for varlen types, only account for the pointer/size for each row, for now
325  num_bytes += 16;
326  } else {
327  num_bytes += sz;
328  }
329  }
330  }
331  return num_bytes;
332 }
333 
334 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
335  const std::vector<Analyzer::Expr*>& target_exprs) const {
336  CHECK(plan_state_);
337  CHECK(catalog_);
338  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
339  for (const auto target_expr : target_exprs) {
340  if (!plan_state_->isLazyFetchColumn(target_expr)) {
341  col_lazy_fetch_info.emplace_back(
342  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
343  } else {
344  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
345  CHECK(col_var);
346  auto col_id = col_var->get_column_id();
347  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
348  auto cd = (col_var->get_table_id() > 0)
349  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
350  : nullptr;
351  if (cd && IS_GEO(cd->columnType.get_type())) {
352  // Geo coords cols will be processed in sequence. So we only need to track the
353  // first coords col in lazy fetch info.
354  {
355  auto cd0 =
356  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
357  auto col0_ti = cd0->columnType;
358  CHECK(!cd0->isVirtualCol);
359  auto col0_var = makeExpr<Analyzer::ColumnVar>(
360  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
361  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
362  col_lazy_fetch_info.emplace_back(
363  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
364  }
365  } else {
366  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
367  const auto& col_ti = col_var->get_type_info();
368  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
369  }
370  }
371  }
372  return col_lazy_fetch_info;
373 }
374 
376  input_table_info_cache_.clear();
377  agg_col_range_cache_.clear();
378  table_generations_.clear();
379 }
380 
381 std::vector<int8_t> Executor::serializeLiterals(
382  const std::unordered_map<int, CgenState::LiteralValues>& literals,
383  const int device_id) {
384  if (literals.empty()) {
385  return {};
386  }
387  const auto dev_literals_it = literals.find(device_id);
388  CHECK(dev_literals_it != literals.end());
389  const auto& dev_literals = dev_literals_it->second;
390  size_t lit_buf_size{0};
391  std::vector<std::string> real_strings;
392  std::vector<std::vector<double>> double_array_literals;
393  std::vector<std::vector<int8_t>> align64_int8_array_literals;
394  std::vector<std::vector<int32_t>> int32_array_literals;
395  std::vector<std::vector<int8_t>> align32_int8_array_literals;
396  std::vector<std::vector<int8_t>> int8_array_literals;
397  for (const auto& lit : dev_literals) {
398  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
399  if (lit.which() == 7) {
400  const auto p = boost::get<std::string>(&lit);
401  CHECK(p);
402  real_strings.push_back(*p);
403  } else if (lit.which() == 8) {
404  const auto p = boost::get<std::vector<double>>(&lit);
405  CHECK(p);
406  double_array_literals.push_back(*p);
407  } else if (lit.which() == 9) {
408  const auto p = boost::get<std::vector<int32_t>>(&lit);
409  CHECK(p);
410  int32_array_literals.push_back(*p);
411  } else if (lit.which() == 10) {
412  const auto p = boost::get<std::vector<int8_t>>(&lit);
413  CHECK(p);
414  int8_array_literals.push_back(*p);
415  } else if (lit.which() == 11) {
416  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
417  CHECK(p);
418  if (p->second == 64) {
419  align64_int8_array_literals.push_back(p->first);
420  } else if (p->second == 32) {
421  align32_int8_array_literals.push_back(p->first);
422  } else {
423  CHECK(false);
424  }
425  }
426  }
427  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
428  throw TooManyLiterals();
429  }
430  int16_t crt_real_str_off = lit_buf_size;
431  for (const auto& real_str : real_strings) {
432  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
433  lit_buf_size += real_str.size();
434  }
435  if (double_array_literals.size() > 0) {
436  lit_buf_size = align(lit_buf_size, sizeof(double));
437  }
438  int16_t crt_double_arr_lit_off = lit_buf_size;
439  for (const auto& double_array_literal : double_array_literals) {
440  CHECK_LE(double_array_literal.size(),
441  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
442  lit_buf_size += double_array_literal.size() * sizeof(double);
443  }
444  if (align64_int8_array_literals.size() > 0) {
445  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
446  }
447  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
448  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
449  CHECK_LE(align64_int8_array_literals.size(),
450  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
451  lit_buf_size += align64_int8_array_literal.size();
452  }
453  if (int32_array_literals.size() > 0) {
454  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
455  }
456  int16_t crt_int32_arr_lit_off = lit_buf_size;
457  for (const auto& int32_array_literal : int32_array_literals) {
458  CHECK_LE(int32_array_literal.size(),
459  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
460  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
461  }
462  if (align32_int8_array_literals.size() > 0) {
463  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
464  }
465  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
466  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
467  CHECK_LE(align32_int8_array_literals.size(),
468  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
469  lit_buf_size += align32_int8_array_literal.size();
470  }
471  int16_t crt_int8_arr_lit_off = lit_buf_size;
472  for (const auto& int8_array_literal : int8_array_literals) {
473  CHECK_LE(int8_array_literal.size(),
474  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
475  lit_buf_size += int8_array_literal.size();
476  }
477  unsigned crt_real_str_idx = 0;
478  unsigned crt_double_arr_lit_idx = 0;
479  unsigned crt_align64_int8_arr_lit_idx = 0;
480  unsigned crt_int32_arr_lit_idx = 0;
481  unsigned crt_align32_int8_arr_lit_idx = 0;
482  unsigned crt_int8_arr_lit_idx = 0;
483  std::vector<int8_t> serialized(lit_buf_size);
484  size_t off{0};
485  for (const auto& lit : dev_literals) {
486  const auto lit_bytes = CgenState::literalBytes(lit);
487  off = CgenState::addAligned(off, lit_bytes);
488  switch (lit.which()) {
489  case 0: {
490  const auto p = boost::get<int8_t>(&lit);
491  CHECK(p);
492  serialized[off - lit_bytes] = *p;
493  break;
494  }
495  case 1: {
496  const auto p = boost::get<int16_t>(&lit);
497  CHECK(p);
498  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
499  break;
500  }
501  case 2: {
502  const auto p = boost::get<int32_t>(&lit);
503  CHECK(p);
504  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
505  break;
506  }
507  case 3: {
508  const auto p = boost::get<int64_t>(&lit);
509  CHECK(p);
510  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
511  break;
512  }
513  case 4: {
514  const auto p = boost::get<float>(&lit);
515  CHECK(p);
516  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
517  break;
518  }
519  case 5: {
520  const auto p = boost::get<double>(&lit);
521  CHECK(p);
522  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
523  break;
524  }
525  case 6: {
526  const auto p = boost::get<std::pair<std::string, int>>(&lit);
527  CHECK(p);
528  const auto str_id =
530  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
531  ->getOrAddTransient(p->first)
532  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
533  ->getIdOfString(p->first);
534  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
535  break;
536  }
537  case 7: {
538  const auto p = boost::get<std::string>(&lit);
539  CHECK(p);
540  int32_t off_and_len = crt_real_str_off << 16;
541  const auto& crt_real_str = real_strings[crt_real_str_idx];
542  off_and_len |= static_cast<int16_t>(crt_real_str.size());
543  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
544  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
545  ++crt_real_str_idx;
546  crt_real_str_off += crt_real_str.size();
547  break;
548  }
549  case 8: {
550  const auto p = boost::get<std::vector<double>>(&lit);
551  CHECK(p);
552  int32_t off_and_len = crt_double_arr_lit_off << 16;
553  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
554  int32_t len = crt_double_arr_lit.size();
555  CHECK_EQ((len >> 16), 0);
556  off_and_len |= static_cast<int16_t>(len);
557  int32_t double_array_bytesize = len * sizeof(double);
558  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
559  memcpy(&serialized[crt_double_arr_lit_off],
560  crt_double_arr_lit.data(),
561  double_array_bytesize);
562  ++crt_double_arr_lit_idx;
563  crt_double_arr_lit_off += double_array_bytesize;
564  break;
565  }
566  case 9: {
567  const auto p = boost::get<std::vector<int32_t>>(&lit);
568  CHECK(p);
569  int32_t off_and_len = crt_int32_arr_lit_off << 16;
570  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
571  int32_t len = crt_int32_arr_lit.size();
572  CHECK_EQ((len >> 16), 0);
573  off_and_len |= static_cast<int16_t>(len);
574  int32_t int32_array_bytesize = len * sizeof(int32_t);
575  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
576  memcpy(&serialized[crt_int32_arr_lit_off],
577  crt_int32_arr_lit.data(),
578  int32_array_bytesize);
579  ++crt_int32_arr_lit_idx;
580  crt_int32_arr_lit_off += int32_array_bytesize;
581  break;
582  }
583  case 10: {
584  const auto p = boost::get<std::vector<int8_t>>(&lit);
585  CHECK(p);
586  int32_t off_and_len = crt_int8_arr_lit_off << 16;
587  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
588  int32_t len = crt_int8_arr_lit.size();
589  CHECK_EQ((len >> 16), 0);
590  off_and_len |= static_cast<int16_t>(len);
591  int32_t int8_array_bytesize = len;
592  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
593  memcpy(&serialized[crt_int8_arr_lit_off],
594  crt_int8_arr_lit.data(),
595  int8_array_bytesize);
596  ++crt_int8_arr_lit_idx;
597  crt_int8_arr_lit_off += int8_array_bytesize;
598  break;
599  }
600  case 11: {
601  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
602  CHECK(p);
603  if (p->second == 64) {
604  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
605  const auto& crt_align64_int8_arr_lit =
606  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
607  int32_t len = crt_align64_int8_arr_lit.size();
608  CHECK_EQ((len >> 16), 0);
609  off_and_len |= static_cast<int16_t>(len);
610  int32_t align64_int8_array_bytesize = len;
611  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
612  memcpy(&serialized[crt_align64_int8_arr_lit_off],
613  crt_align64_int8_arr_lit.data(),
614  align64_int8_array_bytesize);
615  ++crt_align64_int8_arr_lit_idx;
616  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
617  } else if (p->second == 32) {
618  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
619  const auto& crt_align32_int8_arr_lit =
620  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
621  int32_t len = crt_align32_int8_arr_lit.size();
622  CHECK_EQ((len >> 16), 0);
623  off_and_len |= static_cast<int16_t>(len);
624  int32_t align32_int8_array_bytesize = len;
625  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
626  memcpy(&serialized[crt_align32_int8_arr_lit_off],
627  crt_align32_int8_arr_lit.data(),
628  align32_int8_array_bytesize);
629  ++crt_align32_int8_arr_lit_idx;
630  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
631  } else {
632  CHECK(false);
633  }
634  break;
635  }
636  default:
637  CHECK(false);
638  }
639  }
640  return serialized;
641 }
642 
643 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
644  if (device_type == ExecutorDeviceType::GPU) {
645  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
646  CHECK(cuda_mgr);
647  return cuda_mgr->getDeviceCount();
648  } else {
649  return 1;
650  }
651 }
652 
654  const Data_Namespace::MemoryLevel memory_level) const {
655  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
656  : deviceCount(ExecutorDeviceType::CPU);
657 }
658 
659 // TODO(alex): remove or split
660 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
661  const SQLTypeInfo& ti,
662  const int64_t agg_init_val,
663  const int8_t out_byte_width,
664  const int64_t* out_vec,
665  const size_t out_vec_sz,
666  const bool is_group_by,
667  const bool float_argument_input) {
668  switch (agg) {
669  case kAVG:
670  case kSUM:
671  if (0 != agg_init_val) {
672  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
673  int64_t agg_result = agg_init_val;
674  for (size_t i = 0; i < out_vec_sz; ++i) {
675  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
676  }
677  return {agg_result, 0};
678  } else {
679  CHECK(ti.is_fp());
680  switch (out_byte_width) {
681  case 4: {
682  int agg_result = static_cast<int32_t>(agg_init_val);
683  for (size_t i = 0; i < out_vec_sz; ++i) {
685  &agg_result,
686  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
687  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
688  }
689  const int64_t converted_bin =
690  float_argument_input
691  ? static_cast<int64_t>(agg_result)
692  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
693  return {converted_bin, 0};
694  break;
695  }
696  case 8: {
697  int64_t agg_result = agg_init_val;
698  for (size_t i = 0; i < out_vec_sz; ++i) {
700  &agg_result,
701  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
702  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
703  }
704  return {agg_result, 0};
705  break;
706  }
707  default:
708  CHECK(false);
709  }
710  }
711  }
712  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
713  int64_t agg_result = 0;
714  for (size_t i = 0; i < out_vec_sz; ++i) {
715  agg_result += out_vec[i];
716  }
717  return {agg_result, 0};
718  } else {
719  CHECK(ti.is_fp());
720  switch (out_byte_width) {
721  case 4: {
722  float r = 0.;
723  for (size_t i = 0; i < out_vec_sz; ++i) {
724  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
725  }
726  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
727  const int64_t converted_bin =
728  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
729  return {converted_bin, 0};
730  }
731  case 8: {
732  double r = 0.;
733  for (size_t i = 0; i < out_vec_sz; ++i) {
734  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
735  }
736  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
737  }
738  default:
739  CHECK(false);
740  }
741  }
742  break;
743  case kCOUNT: {
744  uint64_t agg_result = 0;
745  for (size_t i = 0; i < out_vec_sz; ++i) {
746  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
747  agg_result += out;
748  }
749  return {static_cast<int64_t>(agg_result), 0};
750  }
751  case kMIN: {
752  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
753  int64_t agg_result = agg_init_val;
754  for (size_t i = 0; i < out_vec_sz; ++i) {
755  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
756  }
757  return {agg_result, 0};
758  } else {
759  switch (out_byte_width) {
760  case 4: {
761  int32_t agg_result = static_cast<int32_t>(agg_init_val);
762  for (size_t i = 0; i < out_vec_sz; ++i) {
764  &agg_result,
765  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
766  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
767  }
768  const int64_t converted_bin =
769  float_argument_input
770  ? static_cast<int64_t>(agg_result)
771  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
772  return {converted_bin, 0};
773  }
774  case 8: {
775  int64_t agg_result = agg_init_val;
776  for (size_t i = 0; i < out_vec_sz; ++i) {
778  &agg_result,
779  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
780  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
781  }
782  return {agg_result, 0};
783  }
784  default:
785  CHECK(false);
786  }
787  }
788  }
789  case kMAX:
790  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
791  int64_t agg_result = agg_init_val;
792  for (size_t i = 0; i < out_vec_sz; ++i) {
793  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
794  }
795  return {agg_result, 0};
796  } else {
797  switch (out_byte_width) {
798  case 4: {
799  int32_t agg_result = static_cast<int32_t>(agg_init_val);
800  for (size_t i = 0; i < out_vec_sz; ++i) {
802  &agg_result,
803  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
804  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
805  }
806  const int64_t converted_bin =
807  float_argument_input ? static_cast<int64_t>(agg_result)
808  : float_to_double_bin(agg_result, !ti.get_notnull());
809  return {converted_bin, 0};
810  }
811  case 8: {
812  int64_t agg_result = agg_init_val;
813  for (size_t i = 0; i < out_vec_sz; ++i) {
815  &agg_result,
816  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
817  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
818  }
819  return {agg_result, 0};
820  }
821  default:
822  CHECK(false);
823  }
824  }
825  case kSINGLE_VALUE: {
826  int64_t agg_result = agg_init_val;
827  for (size_t i = 0; i < out_vec_sz; ++i) {
828  if (out_vec[i] != agg_init_val) {
829  if (agg_result == agg_init_val) {
830  agg_result = out_vec[i];
831  } else if (out_vec[i] != agg_result) {
833  }
834  }
835  }
836  return {agg_result, 0};
837  }
838  case kSAMPLE: {
839  int64_t agg_result = agg_init_val;
840  for (size_t i = 0; i < out_vec_sz; ++i) {
841  if (out_vec[i] != agg_init_val) {
842  agg_result = out_vec[i];
843  break;
844  }
845  }
846  return {agg_result, 0};
847  }
848  default:
849  CHECK(false);
850  }
851  abort();
852 }
853 
854 namespace {
855 
857  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
858  auto& first = results_per_device.front().first;
859  CHECK(first);
860  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
861  const auto& next = results_per_device[dev_idx].first;
862  CHECK(next);
863  first->append(*next);
864  }
865  return std::move(first);
866 }
867 
868 } // namespace
869 
871  const RelAlgExecutionUnit& ra_exe_unit) {
872  auto& results_per_device = shared_context.getFragmentResults();
873  if (results_per_device.empty()) {
874  std::vector<TargetInfo> targets;
875  for (const auto target_expr : ra_exe_unit.target_exprs) {
876  targets.push_back(get_target_info(target_expr, g_bigint_count));
877  }
878  return std::make_shared<ResultSet>(targets,
881  row_set_mem_owner_,
882  catalog_,
883  blockSize(),
884  gridSize());
885  }
886  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
887  std::sort(results_per_device.begin(),
888  results_per_device.end(),
889  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
890  CHECK_GE(lhs.second.size(), size_t(1));
891  CHECK_GE(rhs.second.size(), size_t(1));
892  return lhs.second.front() < rhs.second.front();
893  });
894 
895  return get_merged_result(results_per_device);
896 }
897 
899  const RelAlgExecutionUnit& ra_exe_unit,
900  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
901  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
902  const QueryMemoryDescriptor& query_mem_desc) const {
903  auto timer = DEBUG_TIMER(__func__);
904  if (ra_exe_unit.estimator) {
905  return reduce_estimator_results(ra_exe_unit, results_per_device);
906  }
907 
908  if (results_per_device.empty()) {
909  std::vector<TargetInfo> targets;
910  for (const auto target_expr : ra_exe_unit.target_exprs) {
911  targets.push_back(get_target_info(target_expr, g_bigint_count));
912  }
913  return std::make_shared<ResultSet>(targets,
916  nullptr,
917  catalog_,
918  blockSize(),
919  gridSize());
920  }
921 
922  return reduceMultiDeviceResultSets(
923  results_per_device,
924  row_set_mem_owner,
925  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
926 }
927 
928 namespace {
929 
931  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
932  int64_t* compilation_queue_time) {
933  auto clock_begin = timer_start();
934  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
935  *compilation_queue_time = timer_stop(clock_begin);
936  const auto& this_result_set = results_per_device[0].first;
937  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
938  this_result_set->getTargetInfos(),
939  this_result_set->getTargetInitVals());
940  return reduction_jit.codegen();
941 };
942 
943 } // namespace
944 
946  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
947  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
948  const QueryMemoryDescriptor& query_mem_desc) const {
949  auto timer = DEBUG_TIMER(__func__);
950  std::shared_ptr<ResultSet> reduced_results;
951 
952  const auto& first = results_per_device.front().first;
953 
954  if (query_mem_desc.getQueryDescriptionType() ==
956  results_per_device.size() > 1) {
957  const auto total_entry_count = std::accumulate(
958  results_per_device.begin(),
959  results_per_device.end(),
960  size_t(0),
961  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
962  const auto& r = rs.first;
963  return init + r->getQueryMemDesc().getEntryCount();
964  });
965  CHECK(total_entry_count);
966  auto query_mem_desc = first->getQueryMemDesc();
967  query_mem_desc.setEntryCount(total_entry_count);
968  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
971  row_set_mem_owner,
972  catalog_,
973  blockSize(),
974  gridSize());
975  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
976  reduced_results->initializeStorage();
977  switch (query_mem_desc.getEffectiveKeyWidth()) {
978  case 4:
979  first->getStorage()->moveEntriesToBuffer<int32_t>(
980  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
981  break;
982  case 8:
983  first->getStorage()->moveEntriesToBuffer<int64_t>(
984  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
985  break;
986  default:
987  CHECK(false);
988  }
989  } else {
990  reduced_results = first;
991  }
992 
993  int64_t compilation_queue_time = 0;
994  const auto reduction_code =
995  get_reduction_code(results_per_device, &compilation_queue_time);
996 
997  for (size_t i = 1; i < results_per_device.size(); ++i) {
998  reduced_results->getStorage()->reduce(
999  *(results_per_device[i].first->getStorage()), {}, reduction_code);
1000  }
1001  reduced_results->addCompilationQueueTime(compilation_queue_time);
1002  return reduced_results;
1003 }
1004 
1006  const RelAlgExecutionUnit& ra_exe_unit,
1007  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1008  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1009  const QueryMemoryDescriptor& query_mem_desc) const {
1010  if (results_per_device.size() == 1) {
1011  return std::move(results_per_device.front().first);
1012  }
1013  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1015  for (const auto& result : results_per_device) {
1016  auto rows = result.first;
1017  CHECK(rows);
1018  if (!rows) {
1019  continue;
1020  }
1021  SpeculativeTopNMap that(
1022  *rows,
1023  ra_exe_unit.target_exprs,
1024  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1025  m.reduce(that);
1026  }
1027  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1028  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1029  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1030 }
1031 
1032 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
1033  std::unordered_set<int> available_gpus;
1034  if (cat.getDataMgr().gpusPresent()) {
1035  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
1036  CHECK_GT(gpu_count, 0);
1037  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1038  available_gpus.insert(gpu_id);
1039  }
1040  }
1041  return available_gpus;
1042 }
1043 
1044 size_t get_context_count(const ExecutorDeviceType device_type,
1045  const size_t cpu_count,
1046  const size_t gpu_count) {
1047  return device_type == ExecutorDeviceType::GPU ? gpu_count
1048  : static_cast<size_t>(cpu_count);
1049 }
1050 
1051 namespace {
1052 
1053 // Compute a very conservative entry count for the output buffer entry count using no
1054 // other information than the number of tuples in each table and multiplying them
1055 // together.
1056 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1058  // Check for overflows since we're multiplying potentially big table sizes.
1059  using checked_size_t = boost::multiprecision::number<
1060  boost::multiprecision::cpp_int_backend<64,
1061  64,
1062  boost::multiprecision::unsigned_magnitude,
1063  boost::multiprecision::checked,
1064  void>>;
1065  checked_size_t max_groups_buffer_entry_guess = 1;
1066  for (const auto& query_info : query_infos) {
1067  CHECK(!query_info.info.fragments.empty());
1068  auto it = std::max_element(query_info.info.fragments.begin(),
1069  query_info.info.fragments.end(),
1070  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1071  return f1.getNumTuples() < f2.getNumTuples();
1072  });
1073  max_groups_buffer_entry_guess *= it->getNumTuples();
1074  }
1075  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1076  // baseline group layout with that many entries anyway.
1077  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1078  try {
1079  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1080  max_groups_buffer_entry_guess_cap);
1081  } catch (...) {
1082  return max_groups_buffer_entry_guess_cap;
1083  }
1084 }
1085 
1086 std::string get_table_name(const InputDescriptor& input_desc,
1088  const auto source_type = input_desc.getSourceType();
1089  if (source_type == InputSourceType::TABLE) {
1090  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1091  CHECK(td);
1092  return td->tableName;
1093  } else {
1094  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1095  }
1096 }
1097 
1098 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1099  const int device_count) {
1100  if (device_type == ExecutorDeviceType::GPU) {
1101  return device_count * Executor::high_scan_limit;
1102  }
1104 }
1105 
1107  const std::vector<InputTableInfo>& table_infos,
1109  const ExecutorDeviceType device_type,
1110  const int device_count) {
1111  for (const auto target_expr : ra_exe_unit.target_exprs) {
1112  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1113  return;
1114  }
1115  }
1116  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1117  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1118  // Allow a query with no scan limit to run on small tables
1119  return;
1120  }
1121  if (ra_exe_unit.use_bump_allocator) {
1122  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1123  // relative to the size of the input), so we bypass this check for now
1124  return;
1125  }
1126  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1127  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1128  (!ra_exe_unit.scan_limit ||
1129  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1130  std::vector<std::string> table_names;
1131  const auto& input_descs = ra_exe_unit.input_descs;
1132  for (const auto& input_desc : input_descs) {
1133  table_names.push_back(get_table_name(input_desc, cat));
1134  }
1135  if (!ra_exe_unit.scan_limit) {
1136  throw WatchdogException(
1137  "Projection query would require a scan without a limit on table(s): " +
1138  boost::algorithm::join(table_names, ", "));
1139  } else {
1140  throw WatchdogException(
1141  "Projection query output result set on table(s): " +
1142  boost::algorithm::join(table_names, ", ") + " would contain " +
1143  std::to_string(ra_exe_unit.scan_limit) +
1144  " rows, which is more than the current system limit of " +
1145  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1146  }
1147  }
1148 }
1149 
1150 } // namespace
1151 
1152 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1153  const RelAlgExecutionUnit& ra_exe_unit) {
1154  if (ra_exe_unit.input_descs.size() < 2) {
1155  return false;
1156  }
1157 
1158  // We only support loop join at the end of folded joins
1159  // where ra_exe_unit.input_descs.size() > 2 for now.
1160  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1161 
1162  std::optional<size_t> inner_table_idx;
1163  for (size_t i = 0; i < query_infos.size(); ++i) {
1164  if (query_infos[i].table_id == inner_table_id) {
1165  inner_table_idx = i;
1166  break;
1167  }
1168  }
1169  CHECK(inner_table_idx);
1170  return query_infos[*inner_table_idx].info.getNumTuples() <=
1172 }
1173 
1174 namespace {
1175 
1176 template <typename T>
1177 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1178  std::vector<std::string> expr_strs;
1179  for (const auto& expr : expr_container) {
1180  if (!expr) {
1181  expr_strs.emplace_back("NULL");
1182  } else {
1183  expr_strs.emplace_back(expr->toString());
1184  }
1185  }
1186  return expr_strs;
1187 }
1188 
1189 template <>
1190 std::vector<std::string> expr_container_to_string(
1191  const std::list<Analyzer::OrderEntry>& expr_container) {
1192  std::vector<std::string> expr_strs;
1193  for (const auto& expr : expr_container) {
1194  expr_strs.emplace_back(expr.toString());
1195  }
1196  return expr_strs;
1197 }
1198 
1199 std::string join_type_to_string(const JoinType type) {
1200  switch (type) {
1201  case JoinType::INNER:
1202  return "INNER";
1203  case JoinType::LEFT:
1204  return "LEFT";
1205  case JoinType::INVALID:
1206  return "INVALID";
1207  }
1208  UNREACHABLE();
1209  return "";
1210 }
1211 
1212 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1213  switch (algorithm) {
1215  return "ResultSet";
1217  return "Speculative Top N";
1219  return "Streaming Top N";
1220  }
1221  UNREACHABLE();
1222  return "";
1223 }
1224 
1225 } // namespace
1226 
1227 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1228  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1229  // instead of ra_exec_unit description if possible
1230  std::ostringstream os;
1231  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1232  const auto& scan_desc = input_col_desc->getScanDesc();
1233  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1234  << scan_desc.getNestLevel();
1235  }
1236  if (!ra_exe_unit.simple_quals.empty()) {
1237  for (const auto& qual : ra_exe_unit.simple_quals) {
1238  if (qual) {
1239  os << qual->toString() << ",";
1240  }
1241  }
1242  }
1243  if (!ra_exe_unit.quals.empty()) {
1244  for (const auto& qual : ra_exe_unit.quals) {
1245  if (qual) {
1246  os << qual->toString() << ",";
1247  }
1248  }
1249  }
1250  if (!ra_exe_unit.join_quals.empty()) {
1251  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1252  const auto& join_condition = ra_exe_unit.join_quals[i];
1253  os << std::to_string(i) << join_type_to_string(join_condition.type);
1254  for (const auto& qual : join_condition.quals) {
1255  if (qual) {
1256  os << qual->toString() << ",";
1257  }
1258  }
1259  }
1260  }
1261  if (!ra_exe_unit.groupby_exprs.empty()) {
1262  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1263  if (qual) {
1264  os << qual->toString() << ",";
1265  }
1266  }
1267  }
1268  for (const auto& expr : ra_exe_unit.target_exprs) {
1269  if (expr) {
1270  os << expr->toString() << ",";
1271  }
1272  }
1273  os << ::toString(ra_exe_unit.estimator == nullptr);
1274  os << std::to_string(ra_exe_unit.scan_limit);
1275  return os.str();
1276 }
1277 
1278 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1279  os << "\n\tTable/Col/Levels: ";
1280  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1281  const auto& scan_desc = input_col_desc->getScanDesc();
1282  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1283  << scan_desc.getNestLevel() << ") ";
1284  }
1285  if (!ra_exe_unit.simple_quals.empty()) {
1286  os << "\n\tSimple Quals: "
1288  ", ");
1289  }
1290  if (!ra_exe_unit.quals.empty()) {
1291  os << "\n\tQuals: "
1292  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1293  }
1294  if (!ra_exe_unit.join_quals.empty()) {
1295  os << "\n\tJoin Quals: ";
1296  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1297  const auto& join_condition = ra_exe_unit.join_quals[i];
1298  os << "\t\t" << std::to_string(i) << " "
1299  << join_type_to_string(join_condition.type);
1300  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1301  }
1302  }
1303  if (!ra_exe_unit.groupby_exprs.empty()) {
1304  os << "\n\tGroup By: "
1306  ", ");
1307  }
1308  os << "\n\tProjected targets: "
1310  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1311  os << "\n\tSort Info: ";
1312  const auto& sort_info = ra_exe_unit.sort_info;
1313  os << "\n\t Order Entries: "
1314  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1315  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1316  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1317  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1318  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1319  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1320  if (ra_exe_unit.union_all) {
1321  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1322  }
1323  return os;
1324 }
1325 
1326 namespace {
1327 
1329  const size_t new_scan_limit) {
1330  return {ra_exe_unit_in.input_descs,
1331  ra_exe_unit_in.input_col_descs,
1332  ra_exe_unit_in.simple_quals,
1333  ra_exe_unit_in.quals,
1334  ra_exe_unit_in.join_quals,
1335  ra_exe_unit_in.groupby_exprs,
1336  ra_exe_unit_in.target_exprs,
1337  ra_exe_unit_in.estimator,
1338  ra_exe_unit_in.sort_info,
1339  new_scan_limit,
1340  ra_exe_unit_in.query_hint,
1341  ra_exe_unit_in.use_bump_allocator,
1342  ra_exe_unit_in.union_all,
1343  ra_exe_unit_in.query_state};
1344 }
1345 
1346 } // namespace
1347 
1348 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1349  const bool is_agg,
1350  const std::vector<InputTableInfo>& query_infos,
1351  const RelAlgExecutionUnit& ra_exe_unit_in,
1352  const CompilationOptions& co,
1353  const ExecutionOptions& eo,
1355  RenderInfo* render_info,
1356  const bool has_cardinality_estimation,
1357  ColumnCacheMap& column_cache) {
1358  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1359 
1360  ScopeGuard cleanup_post_execution = [this] {
1361  // cleanup/unpin GPU buffer allocations
1362  // TODO: separate out this state into a single object
1363  plan_state_.reset(nullptr);
1364  if (cgen_state_) {
1365  cgen_state_->in_values_bitmaps_.clear();
1366  }
1367  };
1368 
1369  try {
1370  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1371  is_agg,
1372  true,
1373  query_infos,
1374  ra_exe_unit_in,
1375  co,
1376  eo,
1377  cat,
1378  row_set_mem_owner_,
1379  render_info,
1380  has_cardinality_estimation,
1381  column_cache);
1382  if (result) {
1383  result->setKernelQueueTime(kernel_queue_time_ms_);
1384  result->addCompilationQueueTime(compilation_queue_time_ms_);
1385  if (eo.just_validate) {
1386  result->setValidationOnlyRes();
1387  }
1388  }
1389  return result;
1390  } catch (const CompilationRetryNewScanLimit& e) {
1391  auto result =
1392  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1393  is_agg,
1394  false,
1395  query_infos,
1396  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1397  co,
1398  eo,
1399  cat,
1400  row_set_mem_owner_,
1401  render_info,
1402  has_cardinality_estimation,
1403  column_cache);
1404  if (result) {
1405  result->setKernelQueueTime(kernel_queue_time_ms_);
1406  result->addCompilationQueueTime(compilation_queue_time_ms_);
1407  if (eo.just_validate) {
1408  result->setValidationOnlyRes();
1409  }
1410  }
1411  return result;
1412  }
1413 }
1414 
1416  size_t& max_groups_buffer_entry_guess,
1417  const bool is_agg,
1418  const bool allow_single_frag_table_opt,
1419  const std::vector<InputTableInfo>& query_infos,
1420  const RelAlgExecutionUnit& ra_exe_unit_in,
1421  const CompilationOptions& co,
1422  const ExecutionOptions& eo,
1424  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1425  RenderInfo* render_info,
1426  const bool has_cardinality_estimation,
1427  ColumnCacheMap& column_cache) {
1428  INJECT_TIMER(Exec_executeWorkUnit);
1429  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1430  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1431  CHECK(!query_infos.empty());
1432  if (!max_groups_buffer_entry_guess) {
1433  // The query has failed the first execution attempt because of running out
1434  // of group by slots. Make the conservative choice: allocate fragment size
1435  // slots and run on the CPU.
1436  CHECK(device_type == ExecutorDeviceType::CPU);
1437  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1438  }
1439 
1440  int8_t crt_min_byte_width{get_min_byte_width()};
1441  do {
1442  SharedKernelContext shared_context(query_infos);
1443  ColumnFetcher column_fetcher(this, column_cache);
1444  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1445  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1446  if (eo.executor_type == ExecutorType::Native) {
1447  try {
1448  INJECT_TIMER(query_step_compilation);
1449  auto clock_begin = timer_start();
1450  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1451  compilation_queue_time_ms_ += timer_stop(clock_begin);
1452 
1453  query_mem_desc_owned =
1454  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1455  crt_min_byte_width,
1456  has_cardinality_estimation,
1457  ra_exe_unit,
1458  query_infos,
1459  deleted_cols_map,
1460  column_fetcher,
1461  {device_type,
1462  co.hoist_literals,
1463  co.opt_level,
1465  co.allow_lazy_fetch,
1467  co.explain_type,
1469  eo,
1470  render_info,
1471  this);
1472  CHECK(query_mem_desc_owned);
1473  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1474  } catch (CompilationRetryNoCompaction&) {
1475  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1476  continue;
1477  }
1478  } else {
1479  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1480  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1481  CHECK(!query_mem_desc_owned);
1482  query_mem_desc_owned.reset(
1484  }
1485  if (eo.just_explain) {
1486  return executeExplain(*query_comp_desc_owned);
1487  }
1488 
1489  for (const auto target_expr : ra_exe_unit.target_exprs) {
1490  plan_state_->target_exprs_.push_back(target_expr);
1491  }
1492 
1493  if (!eo.just_validate) {
1494  int available_cpus = cpu_threads();
1495  auto available_gpus = get_available_gpus(cat);
1496 
1497  const auto context_count =
1498  get_context_count(device_type, available_cpus, available_gpus.size());
1499  try {
1500  auto kernels = createKernels(shared_context,
1501  ra_exe_unit,
1502  column_fetcher,
1503  query_infos,
1504  eo,
1505  is_agg,
1506  allow_single_frag_table_opt,
1507  context_count,
1508  *query_comp_desc_owned,
1509  *query_mem_desc_owned,
1510  render_info,
1511  available_gpus,
1512  available_cpus);
1513  if (g_use_tbb_pool) {
1514 #ifdef HAVE_TBB
1515  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1516  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1517  std::move(kernels));
1518 #else
1519  throw std::runtime_error(
1520  "This build is not TBB enabled. Restart the server with "
1521  "\"enable-modern-thread-pool\" disabled.");
1522 #endif
1523  } else {
1524  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1525  std::move(kernels));
1526  }
1527  } catch (QueryExecutionError& e) {
1528  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1529  e.getErrorCode() == ERR_OUT_OF_TIME) {
1530  throw QueryExecutionError(ERR_INTERRUPTED);
1531  }
1532  if (e.getErrorCode() == ERR_INTERRUPTED) {
1533  throw QueryExecutionError(ERR_INTERRUPTED);
1534  }
1535  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1536  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1537  crt_min_byte_width <<= 1;
1538  continue;
1539  }
1540  throw;
1541  }
1542  }
1543  if (is_agg) {
1544  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1545  // update query status to let user know we are now in the reduction phase
1546  std::string curRunningSession{""};
1547  std::string curRunningQuerySubmittedTime{""};
1548  bool sessionEnrolled = false;
1550  {
1551  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
1552  executor->getSessionLock());
1553  curRunningSession = executor->getCurrentQuerySession(session_read_lock);
1554  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1555  sessionEnrolled =
1556  executor->checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1557  }
1558  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1559  sessionEnrolled) {
1560  executor->updateQuerySessionStatus(curRunningSession,
1561  curRunningQuerySubmittedTime,
1563  }
1564  }
1565  try {
1566  return collectAllDeviceResults(shared_context,
1567  ra_exe_unit,
1568  *query_mem_desc_owned,
1569  query_comp_desc_owned->getDeviceType(),
1570  row_set_mem_owner);
1571  } catch (ReductionRanOutOfSlots&) {
1572  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1573  } catch (OverflowOrUnderflow&) {
1574  crt_min_byte_width <<= 1;
1575  continue;
1576  } catch (QueryExecutionError& e) {
1577  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1578  << ", what(): " << e.what();
1579  throw QueryExecutionError(e.getErrorCode());
1580  }
1581  }
1582  return resultsUnion(shared_context, ra_exe_unit);
1583 
1584  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1585 
1586  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1589  nullptr,
1590  catalog_,
1591  blockSize(),
1592  gridSize());
1593 }
1594 
1596  const RelAlgExecutionUnit& ra_exe_unit_in,
1597  const InputTableInfo& table_info,
1598  const CompilationOptions& co,
1599  const ExecutionOptions& eo,
1601  PerFragmentCallBack& cb,
1602  const std::set<size_t>& fragment_indexes_param) {
1603  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1604  ColumnCacheMap column_cache;
1605 
1606  std::vector<InputTableInfo> table_infos{table_info};
1607  SharedKernelContext kernel_context(table_infos);
1608 
1609  ColumnFetcher column_fetcher(this, column_cache);
1610  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1611  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1612  {
1613  auto clock_begin = timer_start();
1614  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1615  compilation_queue_time_ms_ += timer_stop(clock_begin);
1616  query_mem_desc_owned =
1617  query_comp_desc_owned->compile(0,
1618  8,
1619  /*has_cardinality_estimation=*/false,
1620  ra_exe_unit,
1621  table_infos,
1622  deleted_cols_map,
1623  column_fetcher,
1624  co,
1625  eo,
1626  nullptr,
1627  this);
1628  }
1629  CHECK(query_mem_desc_owned);
1630  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1631  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1632  const auto& outer_fragments = table_info.info.fragments;
1633 
1634  std::set<size_t> fragment_indexes;
1635  if (fragment_indexes_param.empty()) {
1636  // An empty `fragment_indexes_param` set implies executing
1637  // the query for all fragments in the table. In this
1638  // case, populate `fragment_indexes` with all fragment indexes.
1639  for (size_t i = 0; i < outer_fragments.size(); i++) {
1640  fragment_indexes.emplace(i);
1641  }
1642  } else {
1643  fragment_indexes = fragment_indexes_param;
1644  }
1645 
1646  {
1647  auto clock_begin = timer_start();
1648  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1649  kernel_queue_time_ms_ += timer_stop(clock_begin);
1650 
1651  for (auto fragment_index : fragment_indexes) {
1652  // We may want to consider in the future allowing this to execute on devices other
1653  // than CPU
1654  FragmentsList fragments_list{{table_id, {fragment_index}}};
1655  ExecutionKernel kernel(ra_exe_unit,
1656  co.device_type,
1657  /*device_id=*/0,
1658  eo,
1659  column_fetcher,
1660  *query_comp_desc_owned,
1661  *query_mem_desc_owned,
1662  fragments_list,
1664  /*render_info=*/nullptr,
1665  /*rowid_lookup_key=*/-1);
1666  kernel.run(this, 0, kernel_context);
1667  }
1668  }
1669 
1670  const auto& all_fragment_results = kernel_context.getFragmentResults();
1671 
1672  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
1673  CHECK_EQ(result_fragment_indexes.size(), 1);
1674  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
1675  }
1676 }
1677 
1679  const TableFunctionExecutionUnit exe_unit,
1680  const std::vector<InputTableInfo>& table_infos,
1681  const CompilationOptions& co,
1682  const ExecutionOptions& eo,
1684  INJECT_TIMER(Exec_executeTableFunction);
1685 
1686  if (eo.just_validate) {
1688  /*entry_count=*/0,
1690  /*is_table_function=*/true);
1691  query_mem_desc.setOutputColumnar(true);
1692  return std::make_shared<ResultSet>(
1693  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
1694  co.device_type,
1695  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
1696  this->getRowSetMemoryOwner(),
1697  this->getCatalog(),
1698  this->blockSize(),
1699  this->gridSize());
1700  }
1701 
1702  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1703 
1704  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1705  // framework, we may want to move this up a level
1706 
1707  ColumnFetcher column_fetcher(this, column_cache);
1708  TableFunctionCompilationContext compilation_context;
1709  compilation_context.compile(exe_unit, co, this);
1710 
1711  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1712  return exe_context.execute(
1713  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1714 }
1715 
1717  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1718 }
1719 
1721  const RelAlgExecutionUnit& ra_exe_unit,
1722  const ExecutorDeviceType requested_device_type) {
1723  for (const auto target_expr : ra_exe_unit.target_exprs) {
1724  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1725  if (!ra_exe_unit.groupby_exprs.empty() &&
1726  !isArchPascalOrLater(requested_device_type)) {
1727  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1728  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1729  return ExecutorDeviceType::CPU;
1730  }
1731  }
1732  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1733  return ExecutorDeviceType::CPU;
1734  }
1735  }
1736  return requested_device_type;
1737 }
1738 
1739 namespace {
1740 
1741 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1742  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1743  if (ti.is_fp()) {
1744  if (float_argument_input && ti.get_type() == kFLOAT) {
1745  int64_t float_null_val = 0;
1746  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1747  static_cast<float>(inline_fp_null_val(ti));
1748  return float_null_val;
1749  }
1750  const auto double_null_val = inline_fp_null_val(ti);
1751  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1752  }
1753  return inline_int_null_val(ti);
1754 }
1755 
1756 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1757  std::vector<int64_t>& entry,
1758  const std::vector<Analyzer::Expr*>& target_exprs,
1760  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1761  const auto target_expr = target_exprs[target_idx];
1762  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1763  CHECK(agg_info.is_agg);
1764  target_infos.push_back(agg_info);
1765  if (g_cluster) {
1766  const auto executor = query_mem_desc.getExecutor();
1767  CHECK(executor);
1768  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1769  CHECK(row_set_mem_owner);
1770  const auto& count_distinct_desc =
1771  query_mem_desc.getCountDistinctDescriptor(target_idx);
1772  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1773  CHECK(row_set_mem_owner);
1774  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1775  count_distinct_desc.bitmapPaddedSizeBytes(),
1776  /*thread_idx=*/0); // TODO: can we detect thread idx here?
1777  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1778  continue;
1779  }
1780  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1781  auto count_distinct_set = new std::set<int64_t>();
1782  CHECK(row_set_mem_owner);
1783  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1784  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1785  continue;
1786  }
1787  }
1788  const bool float_argument_input = takes_float_argument(agg_info);
1789  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1790  entry.push_back(0);
1791  } else if (agg_info.agg_kind == kAVG) {
1792  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1793  entry.push_back(0);
1794  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1795  if (agg_info.sql_type.is_geometry()) {
1796  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1797  entry.push_back(0);
1798  }
1799  } else if (agg_info.sql_type.is_varlen()) {
1800  entry.push_back(0);
1801  entry.push_back(0);
1802  } else {
1803  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1804  }
1805  } else {
1806  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1807  }
1808  }
1809 }
1810 
1812  const std::vector<Analyzer::Expr*>& target_exprs_in,
1814  const ExecutorDeviceType device_type) {
1815  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1816  std::vector<Analyzer::Expr*> target_exprs;
1817  for (const auto target_expr : target_exprs_in) {
1818  const auto target_expr_copy =
1819  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1820  CHECK(target_expr_copy);
1821  auto ti = target_expr->get_type_info();
1822  ti.set_notnull(false);
1823  target_expr_copy->set_type_info(ti);
1824  if (target_expr_copy->get_arg()) {
1825  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1826  arg_ti.set_notnull(false);
1827  target_expr_copy->get_arg()->set_type_info(arg_ti);
1828  }
1829  target_exprs_owned_copies.push_back(target_expr_copy);
1830  target_exprs.push_back(target_expr_copy.get());
1831  }
1832  std::vector<TargetInfo> target_infos;
1833  std::vector<int64_t> entry;
1834  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1835  const auto executor = query_mem_desc.getExecutor();
1836  CHECK(executor);
1837  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1838  CHECK(row_set_mem_owner);
1839  auto rs = std::make_shared<ResultSet>(target_infos,
1840  device_type,
1842  row_set_mem_owner,
1843  executor->getCatalog(),
1844  executor->blockSize(),
1845  executor->gridSize());
1846  rs->allocateStorage();
1847  rs->fillOneEntry(entry);
1848  return rs;
1849 }
1850 
1851 } // namespace
1852 
1854  SharedKernelContext& shared_context,
1855  const RelAlgExecutionUnit& ra_exe_unit,
1857  const ExecutorDeviceType device_type,
1858  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1859  auto timer = DEBUG_TIMER(__func__);
1860  auto& result_per_device = shared_context.getFragmentResults();
1861  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1864  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1865  }
1866  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1867  try {
1868  return reduceSpeculativeTopN(
1869  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1870  } catch (const std::bad_alloc&) {
1871  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1872  }
1873  }
1874  const auto shard_count =
1875  device_type == ExecutorDeviceType::GPU
1877  : 0;
1878 
1879  if (shard_count && !result_per_device.empty()) {
1880  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1881  }
1882  return reduceMultiDeviceResults(
1883  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1884 }
1885 
1886 namespace {
1897 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1898  const QueryMemoryDescriptor& input_query_mem_desc,
1899  const ResultSetStorage* output_storage,
1900  size_t output_row_index,
1901  const QueryMemoryDescriptor& output_query_mem_desc,
1902  const std::vector<uint32_t>& top_permutation) {
1903  const auto output_buffer = output_storage->getUnderlyingBuffer();
1904  const auto input_buffer = input_storage->getUnderlyingBuffer();
1905  for (const auto sorted_idx : top_permutation) {
1906  // permuting all group-columns in this result set into the final buffer:
1907  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1908  group_idx++) {
1909  const auto input_column_ptr =
1910  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1911  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1912  const auto output_column_ptr =
1913  output_buffer +
1914  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1915  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1916  memcpy(output_column_ptr,
1917  input_column_ptr,
1918  output_query_mem_desc.groupColWidth(group_idx));
1919  }
1920  // permuting all agg-columns in this result set into the final buffer:
1921  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1922  slot_idx++) {
1923  const auto input_column_ptr =
1924  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1925  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1926  const auto output_column_ptr =
1927  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1928  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1929  memcpy(output_column_ptr,
1930  input_column_ptr,
1931  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1932  }
1933  ++output_row_index;
1934  }
1935  return output_row_index;
1936 }
1937 
1947 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1948  const ResultSetStorage* output_storage,
1949  size_t output_row_index,
1950  const QueryMemoryDescriptor& output_query_mem_desc,
1951  const std::vector<uint32_t>& top_permutation) {
1952  const auto output_buffer = output_storage->getUnderlyingBuffer();
1953  const auto input_buffer = input_storage->getUnderlyingBuffer();
1954  for (const auto sorted_idx : top_permutation) {
1955  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1956  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1957  row_ptr,
1958  output_query_mem_desc.getRowSize());
1959  ++output_row_index;
1960  }
1961  return output_row_index;
1962 }
1963 } // namespace
1964 
1965 // Collect top results from each device, stitch them together and sort. Partial
1966 // results from each device are guaranteed to be disjunct because we only go on
1967 // this path when one of the columns involved is a shard key.
1969  SharedKernelContext& shared_context,
1970  const RelAlgExecutionUnit& ra_exe_unit) const {
1971  auto& result_per_device = shared_context.getFragmentResults();
1972  const auto first_result_set = result_per_device.front().first;
1973  CHECK(first_result_set);
1974  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1975  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1976  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1977  top_query_mem_desc.setEntryCount(0);
1978  for (auto& result : result_per_device) {
1979  const auto result_set = result.first;
1980  CHECK(result_set);
1981  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
1982  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1983  top_query_mem_desc.setEntryCount(new_entry_cnt);
1984  }
1985  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1986  first_result_set->getDeviceType(),
1987  top_query_mem_desc,
1988  first_result_set->getRowSetMemOwner(),
1989  catalog_,
1990  blockSize(),
1991  gridSize());
1992  auto top_storage = top_result_set->allocateStorage();
1993  size_t top_output_row_idx{0};
1994  for (auto& result : result_per_device) {
1995  const auto result_set = result.first;
1996  CHECK(result_set);
1997  const auto& top_permutation = result_set->getPermutationBuffer();
1998  CHECK_LE(top_permutation.size(), top_n);
1999  if (top_query_mem_desc.didOutputColumnar()) {
2000  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2001  result_set->getQueryMemDesc(),
2002  top_storage,
2003  top_output_row_idx,
2004  top_query_mem_desc,
2005  top_permutation);
2006  } else {
2007  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2008  top_storage,
2009  top_output_row_idx,
2010  top_query_mem_desc,
2011  top_permutation);
2012  }
2013  }
2014  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2015  return top_result_set;
2016 }
2017 
2018 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2019  const {
2020  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2021  const auto& join_info = plan_state_->join_info_;
2022  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2023  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2024  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2025  id_to_cond.insert(
2026  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2027  }
2028  return id_to_cond;
2029 }
2030 
2031 namespace {
2032 
2033 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2034  for (const auto& col : fetched_cols) {
2035  if (col.is_lazily_fetched) {
2036  return true;
2037  }
2038  }
2039  return false;
2040 }
2041 
2042 } // namespace
2043 
2044 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2045  SharedKernelContext& shared_context,
2046  const RelAlgExecutionUnit& ra_exe_unit,
2047  ColumnFetcher& column_fetcher,
2048  const std::vector<InputTableInfo>& table_infos,
2049  const ExecutionOptions& eo,
2050  const bool is_agg,
2051  const bool allow_single_frag_table_opt,
2052  const size_t context_count,
2053  const QueryCompilationDescriptor& query_comp_desc,
2055  RenderInfo* render_info,
2056  std::unordered_set<int>& available_gpus,
2057  int& available_cpus) {
2058  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2059 
2060  QueryFragmentDescriptor fragment_descriptor(
2061  ra_exe_unit,
2062  table_infos,
2063  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2064  ? catalog_->getDataMgr().getMemoryInfo(Data_Namespace::MemoryLevel::GPU_LEVEL)
2065  : std::vector<Data_Namespace::MemoryInfo>{},
2068  CHECK(!ra_exe_unit.input_descs.empty());
2069 
2070  const auto device_type = query_comp_desc.getDeviceType();
2071  const bool uses_lazy_fetch =
2072  plan_state_->allow_lazy_fetch_ &&
2073  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
2074  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2075  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2076  const auto device_count = deviceCount(device_type);
2077  CHECK_GT(device_count, 0);
2078 
2079  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2080  shared_context.getFragOffsets(),
2081  device_count,
2082  device_type,
2083  use_multifrag_kernel,
2085  this);
2086  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2087  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2088  }
2089 
2090  if (use_multifrag_kernel) {
2091  VLOG(1) << "Creating multifrag execution kernels";
2092  VLOG(1) << query_mem_desc.toString();
2093 
2094  // NB: We should never be on this path when the query is retried because of running
2095  // out of group by slots; also, for scan only queries on CPU we want the
2096  // high-granularity, fragment by fragment execution instead. For scan only queries on
2097  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2098  // buffer per fragment.
2099  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2100  &execution_kernels,
2101  &column_fetcher,
2102  &eo,
2103  &query_comp_desc,
2104  &query_mem_desc,
2105  render_info](const int device_id,
2106  const FragmentsList& frag_list,
2107  const int64_t rowid_lookup_key) {
2108  execution_kernels.emplace_back(
2109  std::make_unique<ExecutionKernel>(ra_exe_unit,
2111  device_id,
2112  eo,
2113  column_fetcher,
2114  query_comp_desc,
2115  query_mem_desc,
2116  frag_list,
2118  render_info,
2119  rowid_lookup_key));
2120  };
2121  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2122  } else {
2123  VLOG(1) << "Creating one execution kernel per fragment";
2124  VLOG(1) << query_mem_desc.toString();
2125 
2126  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2128  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2129  const auto max_frag_size =
2130  table_infos.front().info.getFragmentNumTuplesUpperBound();
2131  if (max_frag_size < query_mem_desc.getEntryCount()) {
2132  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2133  << " to match max fragment size " << max_frag_size
2134  << " for kernel per fragment execution path.";
2135  throw CompilationRetryNewScanLimit(max_frag_size);
2136  }
2137  }
2138 
2139  size_t frag_list_idx{0};
2140  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2141  &execution_kernels,
2142  &column_fetcher,
2143  &eo,
2144  &frag_list_idx,
2145  &device_type,
2146  &query_comp_desc,
2147  &query_mem_desc,
2148  render_info](const int device_id,
2149  const FragmentsList& frag_list,
2150  const int64_t rowid_lookup_key) {
2151  if (!frag_list.size()) {
2152  return;
2153  }
2154  CHECK_GE(device_id, 0);
2155 
2156  execution_kernels.emplace_back(
2157  std::make_unique<ExecutionKernel>(ra_exe_unit,
2158  device_type,
2159  device_id,
2160  eo,
2161  column_fetcher,
2162  query_comp_desc,
2163  query_mem_desc,
2164  frag_list,
2166  render_info,
2167  rowid_lookup_key));
2168  ++frag_list_idx;
2169  };
2170 
2171  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2172  ra_exe_unit);
2173  }
2174 
2175  return execution_kernels;
2176 }
2177 
2178 template <typename THREAD_POOL>
2180  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2181  auto clock_begin = timer_start();
2182  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2183  kernel_queue_time_ms_ += timer_stop(clock_begin);
2184 
2185  THREAD_POOL thread_pool;
2186  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2187  size_t kernel_idx = 1;
2188  for (auto& kernel : kernels) {
2189  thread_pool.spawn(
2190  [this, &shared_context, parent_thread_id = logger::thread_id()](
2191  ExecutionKernel* kernel, const size_t crt_kernel_idx) {
2192  CHECK(kernel);
2193  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2194  const size_t thread_idx = crt_kernel_idx % cpu_threads();
2195  kernel->run(this, thread_idx, shared_context);
2196  },
2197  kernel.get(),
2198  kernel_idx++);
2199  }
2200  thread_pool.join();
2201 }
2202 
2204  const RelAlgExecutionUnit& ra_exe_unit,
2205  const ExecutorDeviceType device_type,
2206  const size_t table_idx,
2207  const size_t outer_frag_idx,
2208  std::map<int, const TableFragments*>& selected_tables_fragments,
2209  const std::unordered_map<int, const Analyzer::BinOper*>&
2210  inner_table_id_to_join_condition) {
2211  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2212  auto table_frags_it = selected_tables_fragments.find(table_id);
2213  CHECK(table_frags_it != selected_tables_fragments.end());
2214  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2215  const auto outer_table_fragments_it =
2216  selected_tables_fragments.find(outer_input_desc.getTableId());
2217  const auto outer_table_fragments = outer_table_fragments_it->second;
2218  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2219  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2220  if (!table_idx) {
2221  return {outer_frag_idx};
2222  }
2223  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2224  auto& inner_frags = table_frags_it->second;
2225  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2226  std::vector<size_t> all_frag_ids;
2227  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2228  ++inner_frag_idx) {
2229  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2230  if (skipFragmentPair(outer_fragment_info,
2231  inner_frag_info,
2232  table_idx,
2233  inner_table_id_to_join_condition,
2234  ra_exe_unit,
2235  device_type)) {
2236  continue;
2237  }
2238  all_frag_ids.push_back(inner_frag_idx);
2239  }
2240  return all_frag_ids;
2241 }
2242 
2243 // Returns true iff the join between two fragments cannot yield any results, per
2244 // shard information. The pair can be skipped to avoid full broadcast.
2246  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2247  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2248  const int table_idx,
2249  const std::unordered_map<int, const Analyzer::BinOper*>&
2250  inner_table_id_to_join_condition,
2251  const RelAlgExecutionUnit& ra_exe_unit,
2252  const ExecutorDeviceType device_type) {
2253  if (device_type != ExecutorDeviceType::GPU) {
2254  return false;
2255  }
2256  CHECK(table_idx >= 0 &&
2257  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2258  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2259  // Both tables need to be sharded the same way.
2260  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2261  outer_fragment_info.shard == inner_fragment_info.shard) {
2262  return false;
2263  }
2264  const Analyzer::BinOper* join_condition{nullptr};
2265  if (ra_exe_unit.join_quals.empty()) {
2266  CHECK(!inner_table_id_to_join_condition.empty());
2267  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2268  CHECK(condition_it != inner_table_id_to_join_condition.end());
2269  join_condition = condition_it->second;
2270  CHECK(join_condition);
2271  } else {
2272  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2273  plan_state_->join_info_.join_hash_tables_.size());
2274  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2275  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2276  table_idx) {
2277  CHECK(!join_condition);
2278  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2279  }
2280  }
2281  }
2282  if (!join_condition) {
2283  return false;
2284  }
2285  // TODO(adb): support fragment skipping based on the overlaps operator
2286  if (join_condition->is_overlaps_oper()) {
2287  return false;
2288  }
2289  size_t shard_count{0};
2290  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2291  join_condition->get_left_operand())) {
2292  auto inner_outer_pairs =
2293  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2295  join_condition, this, inner_outer_pairs);
2296  } else {
2297  shard_count = get_shard_count(join_condition, this);
2298  }
2299  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2300  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2301  }
2302  return shard_count;
2303 }
2304 
2305 namespace {
2306 
2309  const int table_id = col_desc->getScanDesc().getTableId();
2310  const int col_id = col_desc->getColId();
2311  return get_column_descriptor_maybe(col_id, table_id, cat);
2312 }
2313 
2314 } // namespace
2315 
2316 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2317  const std::vector<InputDescriptor>& input_descs,
2318  const std::map<int, const TableFragments*>& all_tables_fragments) {
2319  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2320  for (auto& desc : input_descs) {
2321  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2322  CHECK(fragments_it != all_tables_fragments.end());
2323  const auto& fragments = *fragments_it->second;
2324  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2325  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2326  frag_offsets[i] = off;
2327  off += fragments[i].getNumTuples();
2328  }
2329  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2330  }
2331  return tab_id_to_frag_offsets;
2332 }
2333 
2334 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2336  const RelAlgExecutionUnit& ra_exe_unit,
2337  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2338  const std::vector<InputDescriptor>& input_descs,
2339  const std::map<int, const TableFragments*>& all_tables_fragments) {
2340  std::vector<std::vector<int64_t>> all_num_rows;
2341  std::vector<std::vector<uint64_t>> all_frag_offsets;
2342  const auto tab_id_to_frag_offsets =
2343  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2344  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2345  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2346  std::vector<int64_t> num_rows;
2347  std::vector<uint64_t> frag_offsets;
2348  if (!ra_exe_unit.union_all) {
2349  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2350  }
2351  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2352  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2353  const auto fragments_it =
2354  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2355  CHECK(fragments_it != all_tables_fragments.end());
2356  const auto& fragments = *fragments_it->second;
2357  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2358  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2359  const auto& fragment = fragments[frag_id];
2360  num_rows.push_back(fragment.getNumTuples());
2361  } else {
2362  size_t total_row_count{0};
2363  for (const auto& fragment : fragments) {
2364  total_row_count += fragment.getNumTuples();
2365  }
2366  num_rows.push_back(total_row_count);
2367  }
2368  const auto frag_offsets_it =
2369  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2370  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2371  const auto& offsets = frag_offsets_it->second;
2372  CHECK_LT(frag_id, offsets.size());
2373  frag_offsets.push_back(offsets[frag_id]);
2374  }
2375  all_num_rows.push_back(num_rows);
2376  // Fragment offsets of outer table should be ONLY used by rowid for now.
2377  all_frag_offsets.push_back(frag_offsets);
2378  }
2379  return {all_num_rows, all_frag_offsets};
2380 }
2381 
2382 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2383 // all the table fragments.
2385  const RelAlgExecutionUnit& ra_exe_unit,
2386  const FragmentsList& selected_fragments) const {
2387  const auto& input_descs = ra_exe_unit.input_descs;
2388  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2389  if (nest_level < 1 ||
2390  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2391  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2392  (ra_exe_unit.join_quals.empty() &&
2393  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2394  return false;
2395  }
2396  const int table_id = inner_col_desc.getScanDesc().getTableId();
2397  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2398  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2399  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2400  return fragments.size() > 1;
2401 }
2402 
2404  const ColumnDescriptor* cd,
2405  const InputColDescriptor& inner_col_desc,
2406  const RelAlgExecutionUnit& ra_exe_unit,
2407  const FragmentsList& selected_fragments,
2408  const Data_Namespace::MemoryLevel memory_level) const {
2409  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2410  const int table_id = inner_col_desc.getScanDesc().getTableId();
2411  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2412  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2413  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2414  auto need_linearize = cd->columnType.is_fixlen_array();
2415  if (memory_level == MemoryLevel::GPU_LEVEL) {
2416  // we disable multi-frag linearization for GPU case until we find the reason of
2417  // CUDA 'misaligned address' issue, see #5245
2418  need_linearize = false;
2419  }
2420  return table_id > 0 && need_linearize && fragments.size() > 1;
2421 }
2422 
2423 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2424  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2425  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2426  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2427 }
2428 
2430  const ColumnFetcher& column_fetcher,
2431  const RelAlgExecutionUnit& ra_exe_unit,
2432  const int device_id,
2433  const Data_Namespace::MemoryLevel memory_level,
2434  const std::map<int, const TableFragments*>& all_tables_fragments,
2435  const FragmentsList& selected_fragments,
2437  std::list<ChunkIter>& chunk_iterators,
2438  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2439  DeviceAllocator* device_allocator,
2440  const size_t thread_idx,
2441  const bool allow_runtime_interrupt) {
2442  auto timer = DEBUG_TIMER(__func__);
2443  INJECT_TIMER(fetchChunks);
2444  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2445  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2446  std::vector<size_t> local_col_to_frag_pos;
2447  buildSelectedFragsMapping(selected_fragments_crossjoin,
2448  local_col_to_frag_pos,
2449  col_global_ids,
2450  selected_fragments,
2451  ra_exe_unit);
2452 
2454  selected_fragments_crossjoin);
2455 
2456  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2457  std::vector<std::vector<int64_t>> all_num_rows;
2458  std::vector<std::vector<uint64_t>> all_frag_offsets;
2459  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2460  std::vector<const int8_t*> frag_col_buffers(
2461  plan_state_->global_to_local_col_ids_.size());
2462  for (const auto& col_id : col_global_ids) {
2463  if (allow_runtime_interrupt) {
2464  bool isInterrupted = false;
2465  {
2466  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2467  const auto query_session = getCurrentQuerySession(session_read_lock);
2468  isInterrupted =
2469  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2470  }
2471  if (isInterrupted) {
2472  throw QueryExecutionError(ERR_INTERRUPTED);
2473  }
2474  }
2475  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2476  throw QueryExecutionError(ERR_INTERRUPTED);
2477  }
2478  CHECK(col_id);
2479  const int table_id = col_id->getScanDesc().getTableId();
2480  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2481  if (cd && cd->isVirtualCol) {
2482  CHECK_EQ("rowid", cd->columnName);
2483  continue;
2484  }
2485  const auto fragments_it = all_tables_fragments.find(table_id);
2486  CHECK(fragments_it != all_tables_fragments.end());
2487  const auto fragments = fragments_it->second;
2488  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2489  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2490  CHECK_LT(static_cast<size_t>(it->second),
2491  plan_state_->global_to_local_col_ids_.size());
2492  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2493  if (!fragments->size()) {
2494  return {};
2495  }
2496  CHECK_LT(frag_id, fragments->size());
2497  auto memory_level_for_column = memory_level;
2498  if (plan_state_->columns_to_fetch_.find(
2499  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2500  plan_state_->columns_to_fetch_.end()) {
2501  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2502  }
2503  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2504  frag_col_buffers[it->second] =
2505  column_fetcher.getResultSetColumn(col_id.get(),
2506  memory_level_for_column,
2507  device_id,
2508  device_allocator,
2509  thread_idx);
2510  } else {
2511  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2512  // determine if we need special treatment to linearlize multi-frag table
2513  // i.e., a column that is classified as varlen type, i.e., array
2514  // for now, we only support fixed-length array that contains
2515  // geo point coordianates but we can support more types in this way
2516  if (needLinearizeAllFragments(cd,
2517  *col_id,
2518  ra_exe_unit,
2519  selected_fragments,
2520  memory_level_for_column)) {
2521  frag_col_buffers[it->second] =
2522  column_fetcher.linearizeColumnFragments(table_id,
2523  col_id->getColId(),
2524  all_tables_fragments,
2525  chunks,
2526  chunk_iterators,
2527  memory_level_for_column,
2528  device_id,
2529  device_allocator,
2530  thread_idx);
2531  } else {
2532  frag_col_buffers[it->second] =
2533  column_fetcher.getAllTableColumnFragments(table_id,
2534  col_id->getColId(),
2535  all_tables_fragments,
2536  memory_level_for_column,
2537  device_id,
2538  device_allocator,
2539  thread_idx);
2540  }
2541  } else {
2542  frag_col_buffers[it->second] =
2543  column_fetcher.getOneTableColumnFragment(table_id,
2544  frag_id,
2545  col_id->getColId(),
2546  all_tables_fragments,
2547  chunks,
2548  chunk_iterators,
2549  memory_level_for_column,
2550  device_id,
2551  device_allocator);
2552  }
2553  }
2554  }
2555  all_frag_col_buffers.push_back(frag_col_buffers);
2556  }
2557  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2558  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2559  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2560 }
2561 
2562 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2563 // This is written under the assumption that multiple inputs implies a UNION ALL.
2565  const ColumnFetcher& column_fetcher,
2566  const RelAlgExecutionUnit& ra_exe_unit,
2567  const int device_id,
2568  const Data_Namespace::MemoryLevel memory_level,
2569  const std::map<int, const TableFragments*>& all_tables_fragments,
2570  const FragmentsList& selected_fragments,
2572  std::list<ChunkIter>& chunk_iterators,
2573  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2574  DeviceAllocator* device_allocator,
2575  const size_t thread_idx,
2576  const bool allow_runtime_interrupt) {
2577  auto timer = DEBUG_TIMER(__func__);
2578  INJECT_TIMER(fetchUnionChunks);
2579 
2580  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2581  std::vector<std::vector<int64_t>> all_num_rows;
2582  std::vector<std::vector<uint64_t>> all_frag_offsets;
2583 
2584  CHECK(!selected_fragments.empty());
2585  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2586  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2587  using TableId = int;
2588  TableId const selected_table_id = selected_fragments.front().table_id;
2589  bool const input_descs_index =
2590  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2591  if (!input_descs_index) {
2592  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2593  }
2594  bool const input_col_descs_index =
2595  selected_table_id ==
2596  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2597  if (!input_col_descs_index) {
2598  CHECK_EQ(selected_table_id,
2599  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2600  }
2601  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2602  << " selected_table_id=" << selected_table_id
2603  << " input_descs_index=" << int(input_descs_index)
2604  << " input_col_descs_index=" << int(input_col_descs_index)
2605  << " ra_exe_unit.input_descs="
2606  << shared::printContainer(ra_exe_unit.input_descs)
2607  << " ra_exe_unit.input_col_descs="
2608  << shared::printContainer(ra_exe_unit.input_col_descs);
2609 
2610  // Partition col_global_ids by table_id
2611  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2612  table_id_to_input_col_descs;
2613  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2614  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2615  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2616  }
2617  for (auto const& pair : table_id_to_input_col_descs) {
2618  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2619  std::vector<size_t> local_col_to_frag_pos;
2620 
2621  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2622  local_col_to_frag_pos,
2623  pair.second,
2624  selected_fragments,
2625  ra_exe_unit);
2626 
2628  selected_fragments_crossjoin);
2629 
2630  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2631  if (allow_runtime_interrupt) {
2632  bool isInterrupted = false;
2633  {
2634  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2635  const auto query_session = getCurrentQuerySession(session_read_lock);
2636  isInterrupted =
2637  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2638  }
2639  if (isInterrupted) {
2640  throw QueryExecutionError(ERR_INTERRUPTED);
2641  }
2642  }
2643  std::vector<const int8_t*> frag_col_buffers(
2644  plan_state_->global_to_local_col_ids_.size());
2645  for (const auto& col_id : pair.second) {
2646  CHECK(col_id);
2647  const int table_id = col_id->getScanDesc().getTableId();
2648  CHECK_EQ(table_id, pair.first);
2649  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2650  if (cd && cd->isVirtualCol) {
2651  CHECK_EQ("rowid", cd->columnName);
2652  continue;
2653  }
2654  const auto fragments_it = all_tables_fragments.find(table_id);
2655  CHECK(fragments_it != all_tables_fragments.end());
2656  const auto fragments = fragments_it->second;
2657  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2658  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2659  CHECK_LT(static_cast<size_t>(it->second),
2660  plan_state_->global_to_local_col_ids_.size());
2661  const size_t frag_id = ra_exe_unit.union_all
2662  ? 0
2663  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2664  if (!fragments->size()) {
2665  return {};
2666  }
2667  CHECK_LT(frag_id, fragments->size());
2668  auto memory_level_for_column = memory_level;
2669  if (plan_state_->columns_to_fetch_.find(
2670  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2671  plan_state_->columns_to_fetch_.end()) {
2672  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2673  }
2674  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2675  frag_col_buffers[it->second] =
2676  column_fetcher.getResultSetColumn(col_id.get(),
2677  memory_level_for_column,
2678  device_id,
2679  device_allocator,
2680  thread_idx);
2681  } else {
2682  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2683  frag_col_buffers[it->second] =
2684  column_fetcher.getAllTableColumnFragments(table_id,
2685  col_id->getColId(),
2686  all_tables_fragments,
2687  memory_level_for_column,
2688  device_id,
2689  device_allocator,
2690  thread_idx);
2691  } else {
2692  frag_col_buffers[it->second] =
2693  column_fetcher.getOneTableColumnFragment(table_id,
2694  frag_id,
2695  col_id->getColId(),
2696  all_tables_fragments,
2697  chunks,
2698  chunk_iterators,
2699  memory_level_for_column,
2700  device_id,
2701  device_allocator);
2702  }
2703  }
2704  }
2705  all_frag_col_buffers.push_back(frag_col_buffers);
2706  }
2707  std::vector<std::vector<int64_t>> num_rows;
2708  std::vector<std::vector<uint64_t>> frag_offsets;
2709  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2710  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2711  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2712  all_frag_offsets.insert(
2713  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2714  }
2715  // The hack below assumes a particular table traversal order which is not
2716  // always achieved due to unordered map in the outermost loop. According
2717  // to the code below we expect NULLs in even positions of all_frag_col_buffers[0]
2718  // and odd positions of all_frag_col_buffers[1]. As an additional hack we
2719  // swap these vectors if NULLs are not on expected positions.
2720  if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
2721  !all_frag_col_buffers[0][1]) {
2722  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2723  }
2724  // UNION ALL hacks.
2725  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2726  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2727  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2728  }
2729  if (input_descs_index == input_col_descs_index) {
2730  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2731  }
2732 
2733  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2734  << " all_num_rows=" << shared::printContainer(all_num_rows)
2735  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2736  << " input_col_descs_index=" << input_col_descs_index;
2737  return {{all_frag_col_buffers[input_descs_index]},
2738  {{all_num_rows[0][input_descs_index]}},
2739  {{all_frag_offsets[0][input_descs_index]}}};
2740 }
2741 
2742 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2743  const size_t scan_idx,
2744  const RelAlgExecutionUnit& ra_exe_unit) {
2745  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2746  scan_idx > 0 &&
2747  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2748  !selected_fragments[scan_idx].fragment_ids.empty()) {
2749  // Fetch all fragments
2750  return {size_t(0)};
2751  }
2752 
2753  return selected_fragments[scan_idx].fragment_ids;
2754 }
2755 
2757  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2758  std::vector<size_t>& local_col_to_frag_pos,
2759  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2760  const FragmentsList& selected_fragments,
2761  const RelAlgExecutionUnit& ra_exe_unit) {
2762  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2763  size_t frag_pos{0};
2764  const auto& input_descs = ra_exe_unit.input_descs;
2765  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2766  const int table_id = input_descs[scan_idx].getTableId();
2767  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2768  selected_fragments_crossjoin.push_back(
2769  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2770  for (const auto& col_id : col_global_ids) {
2771  CHECK(col_id);
2772  const auto& input_desc = col_id->getScanDesc();
2773  if (input_desc.getTableId() != table_id ||
2774  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2775  continue;
2776  }
2777  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2778  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2779  CHECK_LT(static_cast<size_t>(it->second),
2780  plan_state_->global_to_local_col_ids_.size());
2781  local_col_to_frag_pos[it->second] = frag_pos;
2782  }
2783  ++frag_pos;
2784  }
2785 }
2786 
2788  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2789  std::vector<size_t>& local_col_to_frag_pos,
2790  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2791  const FragmentsList& selected_fragments,
2792  const RelAlgExecutionUnit& ra_exe_unit) {
2793  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2794  size_t frag_pos{0};
2795  const auto& input_descs = ra_exe_unit.input_descs;
2796  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2797  const int table_id = input_descs[scan_idx].getTableId();
2798  // selected_fragments here is from assignFragsToKernelDispatch
2799  // execution_kernel.fragments
2800  if (selected_fragments[0].table_id != table_id) { // TODO 0
2801  continue;
2802  }
2803  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2804  selected_fragments_crossjoin.push_back(
2805  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2806  {size_t(1)}); // TODO
2807  for (const auto& col_id : col_global_ids) {
2808  CHECK(col_id);
2809  const auto& input_desc = col_id->getScanDesc();
2810  if (input_desc.getTableId() != table_id ||
2811  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2812  continue;
2813  }
2814  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2815  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2816  CHECK_LT(static_cast<size_t>(it->second),
2817  plan_state_->global_to_local_col_ids_.size());
2818  local_col_to_frag_pos[it->second] = frag_pos;
2819  }
2820  ++frag_pos;
2821  }
2822 }
2823 
2824 namespace {
2825 
2827  public:
2828  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2830  for (auto out : out_vec_) {
2831  delete[] out;
2832  }
2833  }
2834 
2835  private:
2836  std::vector<int64_t*> out_vec_;
2837 };
2838 } // namespace
2839 
2841  const RelAlgExecutionUnit& ra_exe_unit,
2842  const CompilationResult& compilation_result,
2843  const bool hoist_literals,
2844  ResultSetPtr& results,
2845  const std::vector<Analyzer::Expr*>& target_exprs,
2846  const ExecutorDeviceType device_type,
2847  std::vector<std::vector<const int8_t*>>& col_buffers,
2848  QueryExecutionContext* query_exe_context,
2849  const std::vector<std::vector<int64_t>>& num_rows,
2850  const std::vector<std::vector<uint64_t>>& frag_offsets,
2851  Data_Namespace::DataMgr* data_mgr,
2852  const int device_id,
2853  const uint32_t start_rowid,
2854  const uint32_t num_tables,
2855  const bool allow_runtime_interrupt,
2856  RenderInfo* render_info) {
2857  INJECT_TIMER(executePlanWithoutGroupBy);
2858  auto timer = DEBUG_TIMER(__func__);
2859  CHECK(!results);
2860  if (col_buffers.empty()) {
2861  return 0;
2862  }
2863 
2864  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2865  if (render_info) {
2866  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2867  // here, we are in non-insitu mode.
2868  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2869  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2870  "currently unsupported.";
2871  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2872  }
2873 
2874  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2875  std::vector<int64_t*> out_vec;
2876  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2877  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2878  std::unique_ptr<OutVecOwner> output_memory_scope;
2879  if (allow_runtime_interrupt) {
2880  bool isInterrupted = false;
2881  {
2882  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2883  const auto query_session = getCurrentQuerySession(session_read_lock);
2884  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2885  }
2886  if (isInterrupted) {
2887  throw QueryExecutionError(ERR_INTERRUPTED);
2888  }
2889  }
2890  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2891  throw QueryExecutionError(ERR_INTERRUPTED);
2892  }
2893  if (device_type == ExecutorDeviceType::CPU) {
2894  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2895  compilation_result.generated_code);
2896  CHECK(cpu_generated_code);
2897  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2898  cpu_generated_code.get(),
2899  hoist_literals,
2900  hoist_buf,
2901  col_buffers,
2902  num_rows,
2903  frag_offsets,
2904  0,
2905  &error_code,
2906  num_tables,
2907  join_hash_table_ptrs);
2908  output_memory_scope.reset(new OutVecOwner(out_vec));
2909  } else {
2910  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2911  compilation_result.generated_code);
2912  CHECK(gpu_generated_code);
2913  try {
2914  out_vec = query_exe_context->launchGpuCode(
2915  ra_exe_unit,
2916  gpu_generated_code.get(),
2917  hoist_literals,
2918  hoist_buf,
2919  col_buffers,
2920  num_rows,
2921  frag_offsets,
2922  0,
2923  data_mgr,
2924  blockSize(),
2925  gridSize(),
2926  device_id,
2927  compilation_result.gpu_smem_context.getSharedMemorySize(),
2928  &error_code,
2929  num_tables,
2930  allow_runtime_interrupt,
2931  join_hash_table_ptrs,
2932  render_allocator_map_ptr);
2933  output_memory_scope.reset(new OutVecOwner(out_vec));
2934  } catch (const OutOfMemory&) {
2935  return ERR_OUT_OF_GPU_MEM;
2936  } catch (const std::exception& e) {
2937  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2938  }
2939  }
2940  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2941  error_code == Executor::ERR_DIV_BY_ZERO ||
2942  error_code == Executor::ERR_OUT_OF_TIME ||
2943  error_code == Executor::ERR_INTERRUPTED ||
2945  error_code == Executor::ERR_GEOS) {
2946  return error_code;
2947  }
2948  if (ra_exe_unit.estimator) {
2949  CHECK(!error_code);
2950  results =
2951  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2952  return 0;
2953  }
2954  std::vector<int64_t> reduced_outs;
2955  const auto num_frags = col_buffers.size();
2956  const size_t entry_count =
2957  device_type == ExecutorDeviceType::GPU
2958  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2959  ? 1
2960  : blockSize() * gridSize() * num_frags)
2961  : num_frags;
2962  if (size_t(1) == entry_count) {
2963  for (auto out : out_vec) {
2964  CHECK(out);
2965  reduced_outs.push_back(*out);
2966  }
2967  } else {
2968  size_t out_vec_idx = 0;
2969 
2970  for (const auto target_expr : target_exprs) {
2971  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2972  CHECK(agg_info.is_agg);
2973 
2974  const int num_iterations = agg_info.sql_type.is_geometry()
2975  ? agg_info.sql_type.get_physical_coord_cols()
2976  : 1;
2977 
2978  for (int i = 0; i < num_iterations; i++) {
2979  int64_t val1;
2980  const bool float_argument_input = takes_float_argument(agg_info);
2981  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_MEDIAN) {
2982  CHECK(agg_info.agg_kind == kCOUNT ||
2983  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
2984  agg_info.agg_kind == kAPPROX_MEDIAN);
2985  val1 = out_vec[out_vec_idx][0];
2986  error_code = 0;
2987  } else {
2988  const auto chosen_bytes = static_cast<size_t>(
2989  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2990  std::tie(val1, error_code) = Executor::reduceResults(
2991  agg_info.agg_kind,
2992  agg_info.sql_type,
2993  query_exe_context->getAggInitValForIndex(out_vec_idx),
2994  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2995  out_vec[out_vec_idx],
2996  entry_count,
2997  false,
2998  float_argument_input);
2999  }
3000  if (error_code) {
3001  break;
3002  }
3003  reduced_outs.push_back(val1);
3004  if (agg_info.agg_kind == kAVG ||
3005  (agg_info.agg_kind == kSAMPLE &&
3006  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3007  const auto chosen_bytes = static_cast<size_t>(
3008  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3009  1));
3010  int64_t val2;
3011  std::tie(val2, error_code) = Executor::reduceResults(
3012  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3013  agg_info.sql_type,
3014  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3015  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3016  out_vec[out_vec_idx + 1],
3017  entry_count,
3018  false,
3019  false);
3020  if (error_code) {
3021  break;
3022  }
3023  reduced_outs.push_back(val2);
3024  ++out_vec_idx;
3025  }
3026  ++out_vec_idx;
3027  }
3028  }
3029  }
3030 
3031  if (error_code) {
3032  return error_code;
3033  }
3034 
3035  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3036  auto rows_ptr = std::shared_ptr<ResultSet>(
3037  query_exe_context->query_buffers_->result_sets_[0].release());
3038  rows_ptr->fillOneEntry(reduced_outs);
3039  results = std::move(rows_ptr);
3040  return error_code;
3041 }
3042 
3043 namespace {
3044 
3045 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3046  CHECK(scan_limit);
3047  return results && results->rowCount() < scan_limit;
3048 }
3049 
3050 } // namespace
3051 
3053  const RelAlgExecutionUnit& ra_exe_unit,
3054  const CompilationResult& compilation_result,
3055  const bool hoist_literals,
3056  ResultSetPtr& results,
3057  const ExecutorDeviceType device_type,
3058  std::vector<std::vector<const int8_t*>>& col_buffers,
3059  const std::vector<size_t> outer_tab_frag_ids,
3060  QueryExecutionContext* query_exe_context,
3061  const std::vector<std::vector<int64_t>>& num_rows,
3062  const std::vector<std::vector<uint64_t>>& frag_offsets,
3063  Data_Namespace::DataMgr* data_mgr,
3064  const int device_id,
3065  const int outer_table_id,
3066  const int64_t scan_limit,
3067  const uint32_t start_rowid,
3068  const uint32_t num_tables,
3069  const bool allow_runtime_interrupt,
3070  RenderInfo* render_info) {
3071  auto timer = DEBUG_TIMER(__func__);
3072  INJECT_TIMER(executePlanWithGroupBy);
3073  CHECK(!results);
3074  if (col_buffers.empty()) {
3075  return 0;
3076  }
3077  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3078  // TODO(alex):
3079  // 1. Optimize size (make keys more compact).
3080  // 2. Resize on overflow.
3081  // 3. Optimize runtime.
3082  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3083  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3084  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3085  if (allow_runtime_interrupt) {
3086  bool isInterrupted = false;
3087  {
3088  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3089  const auto query_session = getCurrentQuerySession(session_read_lock);
3090  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3091  }
3092  if (isInterrupted) {
3093  throw QueryExecutionError(ERR_INTERRUPTED);
3094  }
3095  }
3096  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3097  return ERR_INTERRUPTED;
3098  }
3099 
3100  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3101  if (render_info && render_info->useCudaBuffers()) {
3102  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3103  }
3104 
3105  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3106  << " ra_exe_unit.input_descs="
3107  << shared::printContainer(ra_exe_unit.input_descs)
3108  << " ra_exe_unit.input_col_descs="
3109  << shared::printContainer(ra_exe_unit.input_col_descs)
3110  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3111  << " num_rows=" << shared::printContainer(num_rows)
3112  << " frag_offsets=" << shared::printContainer(frag_offsets)
3113  << " query_exe_context->query_buffers_->num_rows_="
3114  << query_exe_context->query_buffers_->num_rows_
3115  << " query_exe_context->query_mem_desc_.getEntryCount()="
3116  << query_exe_context->query_mem_desc_.getEntryCount()
3117  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3118  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3119  << " num_tables=" << num_tables;
3120 
3121  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3122  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3123  // with outer_table_id.
3124  if (ra_exe_unit_copy.union_all) {
3125  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3126  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3127  ra_exe_unit_copy.input_descs.end(),
3128  [outer_table_id](auto const& a, auto const& b) {
3129  return a.getTableId() == outer_table_id &&
3130  b.getTableId() != outer_table_id;
3131  });
3132  while (!ra_exe_unit_copy.input_descs.empty() &&
3133  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3134  ra_exe_unit_copy.input_descs.pop_back();
3135  }
3136  // Filter ra_exe_unit_copy.input_col_descs.
3137  ra_exe_unit_copy.input_col_descs.remove_if(
3138  [outer_table_id](auto const& input_col_desc) {
3139  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3140  });
3141  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3142  }
3143 
3144  if (device_type == ExecutorDeviceType::CPU) {
3145  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3146  compilation_result.generated_code);
3147  CHECK(cpu_generated_code);
3148  query_exe_context->launchCpuCode(
3149  ra_exe_unit_copy,
3150  cpu_generated_code.get(),
3151  hoist_literals,
3152  hoist_buf,
3153  col_buffers,
3154  num_rows,
3155  frag_offsets,
3156  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3157  &error_code,
3158  num_tables,
3159  join_hash_table_ptrs);
3160  } else {
3161  try {
3162  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3163  compilation_result.generated_code);
3164  CHECK(gpu_generated_code);
3165  query_exe_context->launchGpuCode(
3166  ra_exe_unit_copy,
3167  gpu_generated_code.get(),
3168  hoist_literals,
3169  hoist_buf,
3170  col_buffers,
3171  num_rows,
3172  frag_offsets,
3173  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3174  data_mgr,
3175  blockSize(),
3176  gridSize(),
3177  device_id,
3178  compilation_result.gpu_smem_context.getSharedMemorySize(),
3179  &error_code,
3180  num_tables,
3181  allow_runtime_interrupt,
3182  join_hash_table_ptrs,
3183  render_allocator_map_ptr);
3184  } catch (const OutOfMemory&) {
3185  return ERR_OUT_OF_GPU_MEM;
3186  } catch (const OutOfRenderMemory&) {
3187  return ERR_OUT_OF_RENDER_MEM;
3188  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3189  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
3190  } catch (const std::exception& e) {
3191  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3192  }
3193  }
3194 
3195  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3196  error_code == Executor::ERR_DIV_BY_ZERO ||
3197  error_code == Executor::ERR_OUT_OF_TIME ||
3198  error_code == Executor::ERR_INTERRUPTED ||
3200  error_code == Executor::ERR_GEOS) {
3201  return error_code;
3202  }
3203 
3204  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3205  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3206  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3207  query_exe_context->query_mem_desc_);
3208  CHECK(results);
3209  VLOG(2) << "results->rowCount()=" << results->rowCount();
3210  results->holdLiterals(hoist_buf);
3211  }
3212  if (error_code < 0 && render_allocator_map_ptr) {
3213  auto const adjusted_scan_limit =
3214  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3215  // More rows passed the filter than available slots. We don't have a count to check,
3216  // so assume we met the limit if a scan limit is set
3217  if (adjusted_scan_limit != 0) {
3218  return 0;
3219  } else {
3220  return error_code;
3221  }
3222  }
3223  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3224  return error_code; // unlucky, not enough results and we ran out of slots
3225  }
3226 
3227  return 0;
3228 }
3229 
3230 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3231  const int device_id) {
3232  std::vector<int64_t> table_ptrs;
3233  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3234  for (auto hash_table : join_hash_tables) {
3235  if (!hash_table) {
3236  CHECK(table_ptrs.empty());
3237  return {};
3238  }
3239  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3240  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3241  }
3242  return table_ptrs;
3243 }
3244 
3245 void Executor::nukeOldState(const bool allow_lazy_fetch,
3246  const std::vector<InputTableInfo>& query_infos,
3247  const PlanState::DeletedColumnsMap& deleted_cols_map,
3248  const RelAlgExecutionUnit* ra_exe_unit) {
3249  kernel_queue_time_ms_ = 0;
3250  compilation_queue_time_ms_ = 0;
3251  const bool contains_left_deep_outer_join =
3252  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3253  ra_exe_unit->join_quals.end(),
3254  [](const JoinCondition& join_condition) {
3255  return join_condition.type == JoinType::LEFT;
3256  }) != ra_exe_unit->join_quals.end();
3257  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3258  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3259  query_infos,
3260  deleted_cols_map,
3261  this));
3262 }
3263 
3264 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3265  const std::vector<InputTableInfo>& query_infos) {
3266  AUTOMATIC_IR_METADATA(cgen_state_.get());
3267  const auto ld_count = input_descs.size();
3268  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3269  for (size_t i = 0; i < ld_count; ++i) {
3270  CHECK_LT(i, query_infos.size());
3271  const auto frag_count = query_infos[i].info.fragments.size();
3272  if (i > 0) {
3273  cgen_state_->frag_offsets_.push_back(nullptr);
3274  } else {
3275  if (frag_count > 1) {
3276  cgen_state_->frag_offsets_.push_back(
3277  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3278  } else {
3279  cgen_state_->frag_offsets_.push_back(nullptr);
3280  }
3281  }
3282  }
3283 }
3284 
3286  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3287  const std::vector<InputTableInfo>& query_infos,
3288  const MemoryLevel memory_level,
3289  const HashType preferred_hash_type,
3290  ColumnCacheMap& column_cache,
3291  const QueryHint& query_hint) {
3292  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3293  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3294  }
3295  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3296  throw QueryExecutionError(ERR_INTERRUPTED);
3297  }
3298  try {
3299  auto tbl = HashJoin::getInstance(qual_bin_oper,
3300  query_infos,
3301  memory_level,
3302  preferred_hash_type,
3303  deviceCountForMemoryLevel(memory_level),
3304  column_cache,
3305  this,
3306  query_hint);
3307  return {tbl, ""};
3308  } catch (const HashJoinFail& e) {
3309  return {nullptr, e.what()};
3310  }
3311 }
3312 
3313 int8_t Executor::warpSize() const {
3314  CHECK(catalog_);
3315  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3316  CHECK(cuda_mgr);
3317  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3318  CHECK(!dev_props.empty());
3319  return dev_props.front().warpSize;
3320 }
3321 
3322 unsigned Executor::gridSize() const {
3323  CHECK(catalog_);
3324  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3325  if (!cuda_mgr) {
3326  return 0;
3327  }
3328  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3329 }
3330 
3331 unsigned Executor::numBlocksPerMP() const {
3332  CHECK(catalog_);
3333  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3334  CHECK(cuda_mgr);
3335  return grid_size_x_ ? std::ceil(grid_size_x_ / cuda_mgr->getMinNumMPsForAllDevices())
3336  : 2;
3337 }
3338 
3339 unsigned Executor::blockSize() const {
3340  CHECK(catalog_);
3341  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3342  if (!cuda_mgr) {
3343  return 0;
3344  }
3345  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3346  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3347 }
3348 
3350  return max_gpu_slab_size_;
3351 }
3352 
3353 int64_t Executor::deviceCycles(int milliseconds) const {
3354  CHECK(catalog_);
3355  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3356  CHECK(cuda_mgr);
3357  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3358  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3359 }
3360 
3361 llvm::Value* Executor::castToFP(llvm::Value* value,
3362  SQLTypeInfo const& from_ti,
3363  SQLTypeInfo const& to_ti) {
3364  AUTOMATIC_IR_METADATA(cgen_state_.get());
3365  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3366  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3367  llvm::Type* fp_type{nullptr};
3368  switch (to_ti.get_size()) {
3369  case 4:
3370  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3371  break;
3372  case 8:
3373  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3374  break;
3375  default:
3376  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3377  }
3378  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3379  if (from_ti.get_scale()) {
3380  value = cgen_state_->ir_builder_.CreateFDiv(
3381  value,
3382  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3383  }
3384  }
3385  return value;
3386 }
3387 
3388 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3389  AUTOMATIC_IR_METADATA(cgen_state_.get());
3390  CHECK(val->getType()->isPointerTy());
3391 
3392  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3393  const auto val_type = val_ptr_type->getElementType();
3394  size_t val_width = 0;
3395  if (val_type->isIntegerTy()) {
3396  val_width = val_type->getIntegerBitWidth();
3397  } else {
3398  if (val_type->isFloatTy()) {
3399  val_width = 32;
3400  } else {
3401  CHECK(val_type->isDoubleTy());
3402  val_width = 64;
3403  }
3404  }
3405  CHECK_LT(size_t(0), val_width);
3406  if (bitWidth == val_width) {
3407  return val;
3408  }
3409  return cgen_state_->ir_builder_.CreateBitCast(
3410  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3411 }
3412 
3413 #define EXECUTE_INCLUDE
3414 #include "ArrayOps.cpp"
3415 #include "DateAdd.cpp"
3416 #include "StringFunctions.cpp"
3417 #undef EXECUTE_INCLUDE
3418 
3419 namespace {
3421  const ColumnDescriptor* deleted_cd) {
3422  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3423  if (deleted_cols_it == deleted_cols_map.end()) {
3424  CHECK(
3425  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3426  } else {
3427  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3428  }
3429 }
3430 } // namespace
3431 
3432 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3433  const RelAlgExecutionUnit& ra_exe_unit,
3434  const CompilationOptions& co) {
3435  if (!co.filter_on_deleted_column) {
3436  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3437  }
3438  auto ra_exe_unit_with_deleted = ra_exe_unit;
3439  PlanState::DeletedColumnsMap deleted_cols_map;
3440  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3441  if (input_table.getSourceType() != InputSourceType::TABLE) {
3442  continue;
3443  }
3444  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3445  CHECK(td);
3446  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3447  if (!deleted_cd) {
3448  continue;
3449  }
3450  CHECK(deleted_cd->columnType.is_boolean());
3451  // check deleted column is not already present
3452  bool found = false;
3453  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3454  if (input_col.get()->getColId() == deleted_cd->columnId &&
3455  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3456  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3457  found = true;
3458  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3459  break;
3460  }
3461  }
3462  if (!found) {
3463  // add deleted column
3464  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3465  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3466  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3467  }
3468  }
3469  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3470 }
3471 
3472 namespace {
3473 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3474 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3475 // return type.
3476 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3477  const int64_t chunk_min,
3478  const int64_t chunk_max,
3479  const SQLTypeInfo& lhs_type,
3480  const SQLTypeInfo& rhs_type) {
3481  const int32_t ldim = lhs_type.get_dimension();
3482  const int32_t rdim = rhs_type.get_dimension();
3483  CHECK(ldim != rdim);
3484  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3485  if (ldim > rdim) {
3486  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3487  return {true, chunk_min / scale, chunk_max / scale};
3488  }
3489 
3490  using checked_int64_t = boost::multiprecision::number<
3491  boost::multiprecision::cpp_int_backend<64,
3492  64,
3493  boost::multiprecision::signed_magnitude,
3494  boost::multiprecision::checked,
3495  void>>;
3496 
3497  try {
3498  auto ret =
3499  std::make_tuple(true,
3500  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3501  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3502  return ret;
3503  } catch (const std::overflow_error& e) {
3504  // noop
3505  }
3506  return std::make_tuple(false, chunk_min, chunk_max);
3507 }
3508 
3509 } // namespace
3510 
3512  const int table_id,
3513  const Fragmenter_Namespace::FragmentInfo& fragment) {
3514  // Skip temporary tables
3515  if (table_id < 0) {
3516  return false;
3517  }
3518 
3519  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
3520  CHECK(td);
3521  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3522  if (!deleted_cd) {
3523  return false;
3524  }
3525 
3526  const auto& chunk_type = deleted_cd->columnType;
3527  CHECK(chunk_type.is_boolean());
3528 
3529  const auto deleted_col_id = deleted_cd->columnId;
3530  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
3531  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
3532  const int64_t chunk_min =
3533  extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3534  const int64_t chunk_max =
3535  extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3536  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
3537  // is true (signifying all rows deleted)
3538  return true;
3539  }
3540  }
3541  return false;
3542 }
3543 
3544 std::pair<bool, int64_t> Executor::skipFragment(
3545  const InputDescriptor& table_desc,
3546  const Fragmenter_Namespace::FragmentInfo& fragment,
3547  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3548  const std::vector<uint64_t>& frag_offsets,
3549  const size_t frag_idx) {
3550  const int table_id = table_desc.getTableId();
3551 
3552  // First check to see if all of fragment is deleted, in which case we know we can skip
3553  if (isFragmentFullyDeleted(table_id, fragment)) {
3554  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
3555  << ", fragment id: " << frag_idx;
3556  return {true, -1};
3557  }
3558 
3559  for (const auto& simple_qual : simple_quals) {
3560  const auto comp_expr =
3561  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3562  if (!comp_expr) {
3563  // is this possible?
3564  return {false, -1};
3565  }
3566  const auto lhs = comp_expr->get_left_operand();
3567  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3568  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3569  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3570  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3571  if (lhs_uexpr) {
3572  CHECK(lhs_uexpr->get_optype() ==
3573  kCAST); // We should have only been passed a cast expression
3574  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3575  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3576  continue;
3577  }
3578  } else {
3579  continue;
3580  }
3581  }
3582  const auto rhs = comp_expr->get_right_operand();
3583  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3584  if (!rhs_const) {
3585  // is this possible?
3586  return {false, -1};
3587  }
3588  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3589  continue;
3590  }
3591  const int col_id = lhs_col->get_column_id();
3592  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3593  int64_t chunk_min{0};
3594  int64_t chunk_max{0};
3595  bool is_rowid{false};
3596  size_t start_rowid{0};
3597  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3598  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3599  if (cd->isVirtualCol) {
3600  CHECK(cd->columnName == "rowid");
3601  const auto& table_generation = getTableGeneration(table_id);
3602  start_rowid = table_generation.start_rowid;
3603  chunk_min = frag_offsets[frag_idx] + start_rowid;
3604  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3605  is_rowid = true;
3606  }
3607  } else {
3608  const auto& chunk_type = lhs_col->get_type_info();
3609  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3610  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3611  }
3612  if (chunk_min > chunk_max) {
3613  // invalid metadata range, do not skip fragment
3614  return {false, -1};
3615  }
3616  if (lhs->get_type_info().is_timestamp() &&
3617  (lhs_col->get_type_info().get_dimension() !=
3618  rhs_const->get_type_info().get_dimension()) &&
3619  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3620  rhs_const->get_type_info().is_high_precision_timestamp())) {
3621  // If original timestamp lhs col has different precision,
3622  // column metadata holds value in original precision
3623  // therefore adjust rhs value to match lhs precision
3624 
3625  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3626  // artificially limit the lhs column range. RHS overflow/underflow is already
3627  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3628  bool is_valid;
3629  std::tie(is_valid, chunk_min, chunk_max) =
3631  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3632  if (!is_valid) {
3633  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3634  "value: "
3635  << std::to_string(chunk_min)
3636  << "\nChunk max value: " << std::to_string(chunk_max)
3637  << "\nLHS col precision is: "
3638  << std::to_string(lhs_col->get_type_info().get_dimension())
3639  << "\nRHS precision is: "
3640  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3641  return {false, -1};
3642  }
3643  }
3644  llvm::LLVMContext local_context;
3645  CgenState local_cgen_state(local_context);
3646  CodeGenerator code_generator(&local_cgen_state, nullptr);
3647 
3648  const auto rhs_val =
3649  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3650 
3651  switch (comp_expr->get_optype()) {
3652  case kGE:
3653  if (chunk_max < rhs_val) {
3654  return {true, -1};
3655  }
3656  break;
3657  case kGT:
3658  if (chunk_max <= rhs_val) {
3659  return {true, -1};
3660  }
3661  break;
3662  case kLE:
3663  if (chunk_min > rhs_val) {
3664  return {true, -1};
3665  }
3666  break;
3667  case kLT:
3668  if (chunk_min >= rhs_val) {
3669  return {true, -1};
3670  }
3671  break;
3672  case kEQ:
3673  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3674  return {true, -1};
3675  } else if (is_rowid) {
3676  return {false, rhs_val - start_rowid};
3677  }
3678  break;
3679  default:
3680  break;
3681  }
3682  }
3683  return {false, -1};
3684 }
3685 
3686 /*
3687  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3688  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3689  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3690  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3691  * if at least one of these skipFragment calls return a true statment in its first value.
3692  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3693  * second value only if its first value is "false".
3694  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3695  * i.e., we only skip if none of the quals trigger the code to update the
3696  * rowid_lookup_key
3697  * - Only AND operations are valid and considered:
3698  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3699  * the skip
3700  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3701  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3702  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3703  * implemented differently, which cause the first one to skip correctly, but the second
3704  * one will not skip.
3705  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3706  * possible
3707  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3708  * projection, no skipping
3709  */
3710 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3711  const InputDescriptor& table_desc,
3712  const RelAlgExecutionUnit& ra_exe_unit,
3713  const Fragmenter_Namespace::FragmentInfo& fragment,
3714  const std::vector<uint64_t>& frag_offsets,
3715  const size_t frag_idx) {
3716  std::pair<bool, int64_t> skip_frag{false, -1};
3717  for (auto& inner_join : ra_exe_unit.join_quals) {
3718  if (inner_join.type != JoinType::INNER) {
3719  continue;
3720  }
3721 
3722  // extracting all the conjunctive simple_quals from the quals stored for the inner
3723  // join
3724  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3725  for (auto& qual : inner_join.quals) {
3726  auto temp_qual = qual_to_conjunctive_form(qual);
3727  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3728  temp_qual.simple_quals.begin(),
3729  temp_qual.simple_quals.end());
3730  }
3731  auto temp_skip_frag = skipFragment(
3732  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3733  if (temp_skip_frag.second != -1) {
3734  skip_frag.second = temp_skip_frag.second;
3735  return skip_frag;
3736  } else {
3737  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3738  }
3739  }
3740  return skip_frag;
3741 }
3742 
3744  const std::unordered_set<PhysicalInput>& phys_inputs) {
3745  AggregatedColRange agg_col_range_cache;
3746  CHECK(catalog_);
3747  std::unordered_set<int> phys_table_ids;
3748  for (const auto& phys_input : phys_inputs) {
3749  phys_table_ids.insert(phys_input.table_id);
3750  }
3751  std::vector<InputTableInfo> query_infos;
3752  for (const int table_id : phys_table_ids) {
3753  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3754  }
3755  for (const auto& phys_input : phys_inputs) {
3756  const auto cd =
3757  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3758  CHECK(cd);
3759  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3760  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3761  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3762  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3763  agg_col_range_cache.setColRange(phys_input, col_range);
3764  }
3765  }
3766  return agg_col_range_cache;
3767 }
3768 
3770  const std::unordered_set<PhysicalInput>& phys_inputs) {
3771  StringDictionaryGenerations string_dictionary_generations;
3772  CHECK(catalog_);
3773  for (const auto& phys_input : phys_inputs) {
3774  const auto cd =
3775  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3776  CHECK(cd);
3777  const auto& col_ti =
3778  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3779  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3780  const int dict_id = col_ti.get_comp_param();
3781  const auto dd = catalog_->getMetadataForDict(dict_id);
3782  CHECK(dd && dd->stringDict);
3783  string_dictionary_generations.setGeneration(dict_id,
3784  dd->stringDict->storageEntryCount());
3785  }
3786  }
3787  return string_dictionary_generations;
3788 }
3789 
3791  std::unordered_set<int> phys_table_ids) {
3792  TableGenerations table_generations;
3793  for (const int table_id : phys_table_ids) {
3794  const auto table_info = getTableInfo(table_id);
3795  table_generations.setGeneration(
3796  table_id,
3797  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3798  }
3799  return table_generations;
3800 }
3801 
3802 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3803  const std::unordered_set<int>& phys_table_ids) {
3804  CHECK(catalog_);
3805  row_set_mem_owner_ =
3806  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
3807  row_set_mem_owner_->setDictionaryGenerations(
3808  computeStringDictionaryGenerations(phys_inputs));
3809  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3810  table_generations_ = computeTableGenerations(phys_table_ids);
3811 }
3812 
3814  return executor_session_mutex_;
3815 }
3816 
3818  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3819  return current_query_session_;
3820 }
3821 
3823  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3824  if (!query_session.empty()) {
3825  current_query_session_ = query_session;
3826  }
3827 }
3828 
3829 void Executor::setRunningExecutorId(const size_t id,
3830  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3831  running_query_executor_id_ = id;
3832 }
3833 
3834 size_t Executor::getRunningExecutorId(mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3835  return running_query_executor_id_;
3836 }
3837 
3838 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
3839  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3840  // if current_query_session is equal to the candidate_query_session,
3841  // or it is empty session we consider
3842  return !candidate_query_session.empty() &&
3843  (current_query_session_ == candidate_query_session);
3844 }
3845 
3847  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3848  current_query_session_ = "";
3849  running_query_executor_id_ = Executor::UNITARY_EXECUTOR_ID;
3850 }
3851 
3853  const QuerySessionId& query_session_id,
3854  const std::string& query_str,
3855  const std::string& query_submitted_time) {
3856  if (!query_session_id.empty()) {
3857  // if session is valid, do update 1) the exact executor id and 2) query status
3858  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3859  updateQuerySessionExecutorAssignment(
3860  query_session_id, query_submitted_time, executor_id_, write_lock);
3861  updateQuerySessionStatusWithLock(query_session_id,
3862  query_submitted_time,
3863  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3864  write_lock);
3865  }
3866  return {query_session_id, query_str};
3867 }
3868 
3870  // check whether we are okay to execute the "pending" query
3871  // i.e., before running the query check if this query session is "ALREADY" interrupted
3872  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3873  if (query_session.empty()) {
3874  return;
3875  }
3876  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3877  // something goes wrong since we assume this is caller's responsibility
3878  // (call this function only for enrolled query session)
3879  if (!queries_session_map_.count(query_session)) {
3880  VLOG(1) << "Interrupting pending query is not available since the query session is "
3881  "not enrolled";
3882  } else {
3883  // here the query session is enrolled but the interrupt flag is not registered
3884  VLOG(1)
3885  << "Interrupting pending query is not available since its interrupt flag is "
3886  "not registered";
3887  }
3888  return;
3889  }
3890  if (queries_interrupt_flag_[query_session]) {
3892  }
3893 }
3894 
3896  const std::string& submitted_time_str,
3897  const bool acquire_spin_lock) {
3898  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3899  // clear the interrupt-related info for a finished query
3900  if (query_session.empty()) {
3901  return;
3902  }
3903  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
3904  if (query_session.compare(current_query_session_) == 0 &&
3905  running_query_executor_id_ == executor_id_) {
3906  invalidateRunningQuerySession(session_write_lock);
3907  if (acquire_spin_lock) {
3908  // try to unlock executor's internal spin lock (let say "L") iff it is acquired
3909  // otherwise we do not need to care about the "L" lock
3910  // i.e., import table does not have a code path towards Executor
3911  // so we just exploit executor's session management code and also global interrupt
3912  // flag excepting this "L" lock
3913  execute_spin_lock_.clear(std::memory_order_release);
3914  }
3915  resetInterrupt();
3916  }
3917 }
3918 
3920  std::shared_ptr<const query_state::QueryState>& query_state,
3921  const QuerySessionStatus::QueryStatus new_query_status) {
3922  // update the running query session's the current status
3923  if (query_state) {
3924  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3925  auto query_session = query_state->getConstSessionInfo()->get_session_id();
3926  if (query_session.empty()) {
3927  return;
3928  }
3929  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3930  current_query_session_ = query_session;
3931  running_query_executor_id_ = executor_id_;
3932  }
3933  updateQuerySessionStatusWithLock(query_session,
3934  query_state->getQuerySubmittedTime(),
3935  new_query_status,
3936  session_write_lock);
3937  }
3938 }
3939 
3941  const QuerySessionId& query_session,
3942  const std::string& submitted_time_str,
3943  const QuerySessionStatus::QueryStatus new_query_status) {
3944  // update the running query session's the current status
3945  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3946  if (query_session.empty()) {
3947  return;
3948  }
3949  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3950  current_query_session_ = query_session;
3951  running_query_executor_id_ = executor_id_;
3952  }
3953  updateQuerySessionStatusWithLock(
3954  query_session, submitted_time_str, new_query_status, session_write_lock);
3955 }
3956 
3958  const QuerySessionId& query_session,
3959  const std::string& query_str,
3960  const std::string& submitted_time_str,
3961  const size_t executor_id,
3962  const QuerySessionStatus::QueryStatus query_session_status) {
3963  // enroll the query session into the Executor's session map
3964  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3965  if (query_session.empty()) {
3966  return;
3967  }
3968 
3969  addToQuerySessionList(query_session,
3970  query_str,
3971  submitted_time_str,
3972  executor_id,
3973  query_session_status,
3974  session_write_lock);
3975 
3976  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING) {
3977  current_query_session_ = query_session;
3978  running_query_executor_id_ = executor_id_;
3979  }
3980 }
3981 
3983  const std::string& query_str,
3984  const std::string& submitted_time_str,
3985  const size_t executor_id,
3986  const QuerySessionStatus::QueryStatus query_status,
3987  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3988  // an internal API that enrolls the query session into the Executor's session map
3989  if (queries_session_map_.count(query_session)) {
3990  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
3991  queries_session_map_.at(query_session).erase(submitted_time_str);
3992  queries_session_map_.at(query_session)
3993  .emplace(submitted_time_str,
3994  QuerySessionStatus(query_session,
3995  executor_id,
3996  query_str,
3997  submitted_time_str,
3998  query_status));
3999  } else {
4000  queries_session_map_.at(query_session)
4001  .emplace(submitted_time_str,
4002  QuerySessionStatus(query_session,
4003  executor_id,
4004  query_str,
4005  submitted_time_str,
4006  query_status));
4007  }
4008  } else {
4009  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4010  executor_per_query_map.emplace(
4011  submitted_time_str,
4013  query_session, executor_id, query_str, submitted_time_str, query_status));
4014  queries_session_map_.emplace(query_session, executor_per_query_map);
4015  }
4016  return queries_interrupt_flag_.emplace(query_session, false).second;
4017 }
4018 
4020  const QuerySessionId& query_session,
4021  const std::string& submitted_time_str,
4022  const QuerySessionStatus::QueryStatus updated_query_status,
4023  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4024  // an internal API that updates query session status
4025  if (query_session.empty()) {
4026  return false;
4027  }
4028  if (queries_session_map_.count(query_session)) {
4029  for (auto& query_status : queries_session_map_.at(query_session)) {
4030  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4031  // no time difference --> found the target query status
4032  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4033  auto prev_status = query_status.second.getQueryStatus();
4034  if (prev_status == updated_query_status) {
4035  return false;
4036  }
4037  query_status.second.setQueryStatus(updated_query_status);
4038  return true;
4039  }
4040  }
4041  }
4042  return false;
4043 }
4044 
4046  const QuerySessionId& query_session,
4047  const std::string& submitted_time_str,
4048  const size_t executor_id,
4049  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4050  // update the executor id of the query session
4051  if (query_session.empty()) {
4052  return false;
4053  }
4054  if (queries_session_map_.count(query_session)) {
4055  auto storage = queries_session_map_.at(query_session);
4056  for (auto it = storage.begin(); it != storage.end(); it++) {
4057  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4058  // no time difference --> found the target query status
4059  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4060  queries_session_map_.at(query_session)
4061  .at(submitted_time_str)
4062  .setExecutorId(executor_id);
4063  return true;
4064  }
4065  }
4066  }
4067  return false;
4068 }
4069 
4071  const QuerySessionId& query_session,
4072  const std::string& submitted_time_str,
4073  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4074  if (query_session.empty()) {
4075  return false;
4076  }
4077  if (queries_session_map_.count(query_session)) {
4078  auto& storage = queries_session_map_.at(query_session);
4079  if (storage.size() > 1) {
4080  // in this case we only remove query executor info
4081  for (auto it = storage.begin(); it != storage.end(); it++) {
4082  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4083  // no time difference && have the same executor id--> found the target query
4084  if (it->second.getExecutorId() == executor_id_ &&
4085  submitted_time_str.compare(target_submitted_t_str) == 0) {
4086  storage.erase(it);
4087  return true;
4088  }
4089  }
4090  } else if (storage.size() == 1) {
4091  // here this session only has a single query executor
4092  // so we clear both executor info and its interrupt flag
4093  queries_session_map_.erase(query_session);
4094  queries_interrupt_flag_.erase(query_session);
4095  if (interrupted_.load()) {
4096  interrupted_.store(false);
4097  VLOG(1) << "RESET Executor " << this << " that had previously been interrupted";
4098  }
4099  return true;
4100  }
4101  }
4102  return false;
4103 }
4104 
4106  const QuerySessionId& query_session,
4107  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4108  if (query_session.empty()) {
4109  return;
4110  }
4111  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4112  queries_interrupt_flag_[query_session] = true;
4113  }
4114 }
4115 
4117  const std::string& query_session,
4118  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4119  if (query_session.empty()) {
4120  return;
4121  }
4122  queries_interrupt_flag_[query_session] = false;
4123 }
4124 
4126  const QuerySessionId& query_session,
4127  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4128  if (query_session.empty()) {
4129  return false;
4130  }
4131  auto flag_it = queries_interrupt_flag_.find(query_session);
4132  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4133  flag_it->second;
4134 }
4135 
4137  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4138  return checkIsQuerySessionInterrupted(current_query_session_, session_read_lock);
4139 }
4140 
4142  const QuerySessionId& query_session,
4143  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4144  if (query_session.empty()) {
4145  return false;
4146  }
4147  return !query_session.empty() && queries_session_map_.count(query_session);
4148 }
4149 
4151  const double runtime_query_check_freq,
4152  const unsigned pending_query_check_freq) const {
4153  // The only one scenario that we intentionally call this function is
4154  // to allow runtime query interrupt in QueryRunner for test cases.
4155  // Because test machine's default setting does not allow runtime query interrupt,
4156  // so we have to turn it on within test code if necessary.
4158  g_pending_query_interrupt_freq = pending_query_check_freq;
4159  g_running_query_interrupt_freq = runtime_query_check_freq;
4162  }
4163 }
4164 
4165 void Executor::addToCardinalityCache(const std::string& cache_key,
4166  const size_t cache_value) {
4168  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4169  cardinality_cache_[cache_key] = cache_value;
4170  VLOG(1) << "Put estimated cardinality to the cache";
4171  }
4172 }
4173 
4175  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4177  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4178  VLOG(1) << "Reuse cached cardinality";
4179  return {true, cardinality_cache_[cache_key]};
4180  }
4181  return {false, -1};
4182 }
4183 
4184 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4185  const QuerySessionId& query_session,
4186  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4187  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4188  auto& query_infos = queries_session_map_.at(query_session);
4189  std::vector<QuerySessionStatus> ret;
4190  for (auto& info : query_infos) {
4191  ret.push_back(QuerySessionStatus(query_session,
4192  info.second.getExecutorId(),
4193  info.second.getQueryStr(),
4194  info.second.getQuerySubmittedTime(),
4195  info.second.getQueryStatus()));
4196  }
4197  return ret;
4198  }
4199  return {};
4200 }
4201 
4202 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4203 
4204 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
4205 // current running query's session ID
4206 std::string Executor::current_query_session_{""};
4207 // running executor's id
4209 // contain the interrupt flag's status per query session
4211 // contain a list of queries per query session
4213 // session lock
4215 
4218 
4221 void* Executor::gpu_active_modules_[max_gpu_count];
4222 std::atomic<bool> Executor::interrupted_{false};
4223 
4224 std::mutex Executor::compilation_mutex_;
4225 std::mutex Executor::kernel_mutex_;
4226 
4228 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4019
int get_table_id() const
Definition: Analyzer.h:194
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3817
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1595
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3743
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4150
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2044
bool g_enable_left_join_filter_hoisting
Definition: Execute.cpp:94
void resetQuerySessionInterruptFlag(const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4116
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:90
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2307
static QuerySessionMap queries_session_map_
Definition: Execute.h:1077
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1084
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3349
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:101
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:3052
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
ExecutorDeviceType getDeviceType() const
double g_bump_allocator_step_reduction
Definition: Execute.cpp:110
std::string cat(Ts &&...args)
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3846
block_size_x_(block_size_x)
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:3869
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1227
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:159
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1152
grid_size_x_(grid_size_x)
bool g_strip_join_covered_quals
Definition: Execute.cpp:100
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:284
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const QueryHint &query_hint)
Definition: Execute.cpp:3285
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2203
bool g_enable_direct_columnarization
Definition: Execute.cpp:111
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3476
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1086
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1678
std::string toString() const
bool g_enable_lazy_fetch
Definition: Execute.cpp:113
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1032
static const int max_gpu_count
Definition: Execute.h:1028
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2828
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
int64_t float_to_double_bin(int32_t val, bool nullable=false)
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:151
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
std::vector< size_t > outer_fragment_indices
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1080
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:492
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:81
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1101
Definition: sqldefs.h:35
int64_t getGeneration(const uint32_t id) const
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:334
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:384
Definition: sqldefs.h:36
tuple r
Definition: test_fsi.py:16
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1199
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3432
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3790
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:241
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:210
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:660
Definition: sqldefs.h:49
Definition: sqldefs.h:30
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3838
void setRunningExecutorId(const size_t id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3829
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:92
static const int32_t ERR_GEOS
Definition: Execute.h:1123
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1033
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:160
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1756
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:3957
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1056
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:83
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1032
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2787
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:643
bool g_is_test_env
Definition: Execute.cpp:130
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3388
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:304
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1126
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3331
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:493
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:254
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4184
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1947
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:494
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2384
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:357
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4125
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3813
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:96
gpu_code_cache_(code_cache_size)
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:183
int8_t get_min_byte_width()
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:85
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:292
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
cpu_code_cache_(code_cache_size)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:453
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:127
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:856
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:161
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3230
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3264
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:3511
bool g_null_div_by_zero
Definition: Execute.cpp:82
bool checkIsRunningQuerySessionInterrupted()
Definition: Execute.cpp:4136
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4141
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2564
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:193
int8_t warpSize() const
Definition: Execute.cpp:3313
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:82
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1005
bool g_enable_columnar_output
Definition: Execute.cpp:93
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1853
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:265
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1109
bool is_fixlen_array() const
Definition: sqltypes.h:498
static SysCatalog & instance()
Definition: SysCatalog.h:292
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3982
bool g_from_table_reordering
Definition: Execute.cpp:84
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:97
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
int getDeviceCount() const
Definition: CudaMgr.h:86
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1044
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:222
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3353
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
bool is_integer() const
Definition: sqltypes.h:490
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:388
#define CHECK_NE(x, y)
Definition: Logger.h:206
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4105
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:223
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:898
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1177
std::shared_timed_mutex mapd_shared_mutex
SortAlgorithm
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1968
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:4174
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:280
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2316
ExecutorExplainType explain_type
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:401
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1741
size_t g_big_group_threshold
Definition: Execute.cpp:102
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1115
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1106
float g_filter_push_down_high_frac
Definition: Execute.cpp:91
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1075
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1116
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:296
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:3361
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:990
size_t g_max_memory_allocation_size
Definition: Execute.cpp:105
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:705
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:99
size_t g_approx_quantile_buffer
Definition: Execute.cpp:133
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1106
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:259
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:495
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:98
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1122
StringDictionaryGenerations string_dictionary_generations_
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1079
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:396
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1328
QueryDescriptionType getQueryDescriptionType() const
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:276
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
executor_id_(executor_id)
bool g_enable_window_functions
Definition: Execute.cpp:103
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4045
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1212
InputSourceType getSourceType() const
size_t g_min_memory_allocation_size
Definition: Execute.cpp:106
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:51
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:870
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:255
static QuerySessionId current_query_session_
Definition: Execute.h:1071
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:85
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2742
size_t ExecutorId
Definition: Execute.h:366
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:381
quantile::TDigest * nullTDigest()
Definition: Execute.cpp:246
Speculative top N algorithm.
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:124
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3544
bool g_enable_experimental_string_functions
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3322
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:316
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2335
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1897
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3769
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:79
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:2403
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:486
void updateQuerySessionStatus(std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:3919
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:2840
bool g_cache_string_hash
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3245
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3045
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3710
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2756
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1811
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:538
ThreadId thread_id()
Definition: Logger.cpp:732
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1098
bool g_enable_filter_push_down
Definition: Execute.cpp:89
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:116
bool g_enable_bump_allocator
Definition: Execute.cpp:109
size_t getRunningExecutorId(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3834
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4070
std::string QuerySessionId
Definition: Execute.h:80
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:945
void set_notnull(bool n)
Definition: sqltypes.h:411
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:4165
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static std::atomic< bool > interrupted_
Definition: Execute.h:1034
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const bool acquire_spin_lock)
Definition: Execute.cpp:3895
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:1125
bool g_allow_cpu_retry
Definition: Execute.cpp:81
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
Definition: sqldefs.h:33
size_t g_approx_quantile_centroids
Definition: Execute.cpp:134
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2429
static size_t running_query_executor_id_
Definition: Execute.h:1073
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1415
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2018
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1720
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:930
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:142
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:300
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
Definition: Execute.cpp:3852
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
bool g_optimize_row_initialization
Definition: Execute.cpp:95
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:195
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:488
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
unsigned blockSize() const
Definition: Execute.cpp:3339
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:80
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
bool g_use_tbb_pool
Definition: Execute.cpp:78
bool is_decimal() const
Definition: sqltypes.h:491
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:553
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3420
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:653
void setCurrentQuerySession(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3822
temporary_tables_(nullptr)
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:208
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1348
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2033
int getNestLevel() const
HashType
Definition: HashTable.h:19
const InputDescriptor & getScanDesc() const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:245
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:114
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1105
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1031
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3802
void clearMetaInfoCache()
Definition: Execute.cpp:375
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:193
bool g_enable_table_functions
Definition: Execute.cpp:104
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:367
size_t g_gpu_smem_threshold
Definition: Execute.cpp:119
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2245
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1716
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)