OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Execute.h"
18 
19 #include "AggregateUtils.h"
20 #include "BaselineJoinHashTable.h"
21 #include "CodeGenerator.h"
22 #include "ColumnFetcher.h"
25 #include "DynamicWatchdog.h"
26 #include "EquiJoinCondition.h"
27 #include "ErrorHandling.h"
28 #include "ExpressionRewrite.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
32 #include "JsonAccessors.h"
34 #include "OverlapsJoinHashTable.h"
36 #include "QueryRewrite.h"
37 #include "QueryTemplateGenerator.h"
38 #include "ResultSetReductionJIT.h"
39 #include "RuntimeFunctions.h"
40 #include "SpeculativeTopN.h"
41 
44 
45 #include "CudaMgr/CudaMgr.h"
47 #include "Parser/ParserNode.h"
50 #include "Shared/checked_alloc.h"
51 #include "Shared/measure.h"
52 #include "Shared/misc.h"
53 #include "Shared/scope.h"
54 #include "Shared/shard_key.h"
56 #include "Shared/threadpool.h"
57 
58 #include "AggregatedColRange.h"
60 
61 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
62 #include <boost/filesystem/operations.hpp>
63 #include <boost/filesystem/path.hpp>
64 
65 #ifdef HAVE_CUDA
66 #include <cuda.h>
67 #endif // HAVE_CUDA
68 #include <future>
69 #include <memory>
70 #include <numeric>
71 #include <set>
72 #include <thread>
73 
74 bool g_enable_watchdog{false};
76 bool g_use_tbb_pool{false};
78 bool g_allow_cpu_retry{true};
79 bool g_null_div_by_zero{false};
83 extern bool g_enable_smem_group_by;
84 extern std::unique_ptr<llvm::Module> udf_gpu_module;
85 extern std::unique_ptr<llvm::Module> udf_cpu_module;
94 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
97 size_t g_big_group_threshold{20000};
100 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
102  256}; // minimum memory allocation required for projection query output buffer
103  // without pre-flight count
112  4096}; // GPU shared memory threshold (in bytes), if larger
113  // buffer sizes are required we do not use GPU shared
114  // memory optimizations Setting this to 0 means unlimited
115  // (subject to other dynamically calculated caps)
117  true}; // enable use of shared memory when performing group-by with select non-count
118  // aggregates
120  true}; // enable optimizations for using GPU shared memory in implementation of
121  // non-grouped aggregates
122 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
123  // limits the allocation for the output buffer arena
124 
125 int const Executor::max_gpu_count;
126 
128 
129 Executor::Executor(const ExecutorId executor_id,
130  const size_t block_size_x,
131  const size_t grid_size_x,
132  const size_t max_gpu_slab_size,
133  const std::string& debug_dir,
134  const std::string& debug_file)
135  : cgen_state_(new CgenState({}, false))
136  , cpu_code_cache_(code_cache_size)
137  , gpu_code_cache_(code_cache_size)
138  , block_size_x_(block_size_x)
139  , grid_size_x_(grid_size_x)
140  , max_gpu_slab_size_(max_gpu_slab_size)
141  , debug_dir_(debug_dir)
142  , debug_file_(debug_file)
143  , executor_id_(executor_id)
144  , catalog_(nullptr)
145  , temporary_tables_(nullptr)
147 
148 std::shared_ptr<Executor> Executor::getExecutor(
149  const ExecutorId executor_id,
150  const std::string& debug_dir,
151  const std::string& debug_file,
152  const SystemParameters system_parameters) {
153  INJECT_TIMER(getExecutor);
154 
155  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
156  auto it = executors_.find(executor_id);
157  if (it != executors_.end()) {
158  return it->second;
159  }
160  auto executor = std::make_shared<Executor>(executor_id,
161  system_parameters.cuda_block_size,
162  system_parameters.cuda_grid_size,
163  system_parameters.max_gpu_slab_size,
164  debug_dir,
165  debug_file);
166  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
167  return executor;
168 }
169 
171  switch (memory_level) {
174  mapd_unique_lock<mapd_shared_mutex> flush_lock(
175  execute_mutex_); // Don't flush memory while queries are running
176 
178  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
179  // The hash table cache uses CPU memory not managed by the buffer manager. In the
180  // future, we should manage these allocations with the buffer manager directly.
181  // For now, assume the user wants to purge the hash table cache when they clear
182  // CPU memory (currently used in ExecuteTest to lower memory pressure)
184  }
185  break;
186  }
187  default: {
188  throw std::runtime_error(
189  "Clearing memory levels other than the CPU level or GPU level is not "
190  "supported.");
191  }
192  }
193 }
194 
196  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
197 }
198 
200  const int dict_id_in,
201  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
202  const bool with_generation) const {
203  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
204  CHECK(catalog_);
205  const auto dd = catalog_->getMetadataForDict(dict_id);
206  std::lock_guard<std::mutex> lock(str_dict_mutex_);
207  if (dd) {
208  CHECK(dd->stringDict);
209  CHECK_LE(dd->dictNBits, 32);
210  CHECK(row_set_mem_owner);
211  const auto generation = with_generation
212  ? string_dictionary_generations_.getGeneration(dict_id)
213  : ssize_t(-1);
214  return row_set_mem_owner->addStringDict(dd->stringDict, dict_id, generation);
215  }
216  CHECK_EQ(0, dict_id);
217  if (!lit_str_dict_proxy_) {
218  std::shared_ptr<StringDictionary> tsd =
219  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
220  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
221  }
222  return lit_str_dict_proxy_.get();
223 }
224 
225 bool Executor::isCPUOnly() const {
226  CHECK(catalog_);
227  return !catalog_->getDataMgr().getCudaMgr();
228 }
229 
231  const Analyzer::ColumnVar* col_var) const {
233  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
234 }
235 
237  const Analyzer::ColumnVar* col_var,
238  int n) const {
239  const auto cd = getColumnDescriptor(col_var);
240  if (!cd || n > cd->columnType.get_physical_cols()) {
241  return nullptr;
242  }
244  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
245 }
246 
248  return catalog_;
249 }
250 
252  catalog_ = catalog;
253 }
254 
255 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
256  return row_set_mem_owner_;
257 }
258 
260  return temporary_tables_;
261 }
262 
264  return input_table_info_cache_.getTableInfo(table_id);
265 }
266 
267 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
268  return table_generations_.getGeneration(table_id);
269 }
270 
272  return agg_col_range_cache_.getColRange(phys_input);
273 }
274 
275 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
276  size_t num_bytes = 0;
277  if (!plan_state_) {
278  return 0;
279  }
280  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
281  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
282  continue;
283  }
284 
285  if (fetched_col_pair.first < 0) {
286  num_bytes += 8;
287  } else {
288  const auto cd =
289  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
290  const auto& ti = cd->columnType;
291  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
292  ? 4
293  : ti.get_size();
294  if (sz < 0) {
295  // for varlen types, only account for the pointer/size for each row, for now
296  num_bytes += 16;
297  } else {
298  num_bytes += sz;
299  }
300  }
301  }
302  return num_bytes;
303 }
304 
305 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
306  const std::vector<Analyzer::Expr*>& target_exprs) const {
307  CHECK(plan_state_);
308  CHECK(catalog_);
309  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
310  for (const auto target_expr : target_exprs) {
311  if (!plan_state_->isLazyFetchColumn(target_expr)) {
312  col_lazy_fetch_info.emplace_back(
313  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
314  } else {
315  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
316  CHECK(col_var);
317  auto col_id = col_var->get_column_id();
318  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
319  auto cd = (col_var->get_table_id() > 0)
320  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
321  : nullptr;
322  if (cd && IS_GEO(cd->columnType.get_type())) {
323  // Geo coords cols will be processed in sequence. So we only need to track the
324  // first coords col in lazy fetch info.
325  {
326  auto cd0 =
327  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
328  auto col0_ti = cd0->columnType;
329  CHECK(!cd0->isVirtualCol);
330  auto col0_var = makeExpr<Analyzer::ColumnVar>(
331  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
332  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
333  col_lazy_fetch_info.emplace_back(
334  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
335  }
336  } else {
337  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
338  const auto& col_ti = col_var->get_type_info();
339  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
340  }
341  }
342  }
343  return col_lazy_fetch_info;
344 }
345 
347  input_table_info_cache_.clear();
348  agg_col_range_cache_.clear();
349  string_dictionary_generations_.clear();
350  table_generations_.clear();
351 }
352 
353 std::vector<int8_t> Executor::serializeLiterals(
354  const std::unordered_map<int, CgenState::LiteralValues>& literals,
355  const int device_id) {
356  if (literals.empty()) {
357  return {};
358  }
359  const auto dev_literals_it = literals.find(device_id);
360  CHECK(dev_literals_it != literals.end());
361  const auto& dev_literals = dev_literals_it->second;
362  size_t lit_buf_size{0};
363  std::vector<std::string> real_strings;
364  std::vector<std::vector<double>> double_array_literals;
365  std::vector<std::vector<int8_t>> align64_int8_array_literals;
366  std::vector<std::vector<int32_t>> int32_array_literals;
367  std::vector<std::vector<int8_t>> align32_int8_array_literals;
368  std::vector<std::vector<int8_t>> int8_array_literals;
369  for (const auto& lit : dev_literals) {
370  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
371  if (lit.which() == 7) {
372  const auto p = boost::get<std::string>(&lit);
373  CHECK(p);
374  real_strings.push_back(*p);
375  } else if (lit.which() == 8) {
376  const auto p = boost::get<std::vector<double>>(&lit);
377  CHECK(p);
378  double_array_literals.push_back(*p);
379  } else if (lit.which() == 9) {
380  const auto p = boost::get<std::vector<int32_t>>(&lit);
381  CHECK(p);
382  int32_array_literals.push_back(*p);
383  } else if (lit.which() == 10) {
384  const auto p = boost::get<std::vector<int8_t>>(&lit);
385  CHECK(p);
386  int8_array_literals.push_back(*p);
387  } else if (lit.which() == 11) {
388  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
389  CHECK(p);
390  if (p->second == 64) {
391  align64_int8_array_literals.push_back(p->first);
392  } else if (p->second == 32) {
393  align32_int8_array_literals.push_back(p->first);
394  } else {
395  CHECK(false);
396  }
397  }
398  }
399  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
400  throw TooManyLiterals();
401  }
402  int16_t crt_real_str_off = lit_buf_size;
403  for (const auto& real_str : real_strings) {
404  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
405  lit_buf_size += real_str.size();
406  }
407  if (double_array_literals.size() > 0) {
408  lit_buf_size = align(lit_buf_size, sizeof(double));
409  }
410  int16_t crt_double_arr_lit_off = lit_buf_size;
411  for (const auto& double_array_literal : double_array_literals) {
412  CHECK_LE(double_array_literal.size(),
413  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
414  lit_buf_size += double_array_literal.size() * sizeof(double);
415  }
416  if (align64_int8_array_literals.size() > 0) {
417  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
418  }
419  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
420  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
421  CHECK_LE(align64_int8_array_literals.size(),
422  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
423  lit_buf_size += align64_int8_array_literal.size();
424  }
425  if (int32_array_literals.size() > 0) {
426  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
427  }
428  int16_t crt_int32_arr_lit_off = lit_buf_size;
429  for (const auto& int32_array_literal : int32_array_literals) {
430  CHECK_LE(int32_array_literal.size(),
431  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
432  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
433  }
434  if (align32_int8_array_literals.size() > 0) {
435  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
436  }
437  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
438  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
439  CHECK_LE(align32_int8_array_literals.size(),
440  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
441  lit_buf_size += align32_int8_array_literal.size();
442  }
443  int16_t crt_int8_arr_lit_off = lit_buf_size;
444  for (const auto& int8_array_literal : int8_array_literals) {
445  CHECK_LE(int8_array_literal.size(),
446  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
447  lit_buf_size += int8_array_literal.size();
448  }
449  unsigned crt_real_str_idx = 0;
450  unsigned crt_double_arr_lit_idx = 0;
451  unsigned crt_align64_int8_arr_lit_idx = 0;
452  unsigned crt_int32_arr_lit_idx = 0;
453  unsigned crt_align32_int8_arr_lit_idx = 0;
454  unsigned crt_int8_arr_lit_idx = 0;
455  std::vector<int8_t> serialized(lit_buf_size);
456  size_t off{0};
457  for (const auto& lit : dev_literals) {
458  const auto lit_bytes = CgenState::literalBytes(lit);
459  off = CgenState::addAligned(off, lit_bytes);
460  switch (lit.which()) {
461  case 0: {
462  const auto p = boost::get<int8_t>(&lit);
463  CHECK(p);
464  serialized[off - lit_bytes] = *p;
465  break;
466  }
467  case 1: {
468  const auto p = boost::get<int16_t>(&lit);
469  CHECK(p);
470  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
471  break;
472  }
473  case 2: {
474  const auto p = boost::get<int32_t>(&lit);
475  CHECK(p);
476  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
477  break;
478  }
479  case 3: {
480  const auto p = boost::get<int64_t>(&lit);
481  CHECK(p);
482  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
483  break;
484  }
485  case 4: {
486  const auto p = boost::get<float>(&lit);
487  CHECK(p);
488  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
489  break;
490  }
491  case 5: {
492  const auto p = boost::get<double>(&lit);
493  CHECK(p);
494  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
495  break;
496  }
497  case 6: {
498  const auto p = boost::get<std::pair<std::string, int>>(&lit);
499  CHECK(p);
500  const auto str_id =
502  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
503  ->getOrAddTransient(p->first)
504  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
505  ->getIdOfString(p->first);
506  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
507  break;
508  }
509  case 7: {
510  const auto p = boost::get<std::string>(&lit);
511  CHECK(p);
512  int32_t off_and_len = crt_real_str_off << 16;
513  const auto& crt_real_str = real_strings[crt_real_str_idx];
514  off_and_len |= static_cast<int16_t>(crt_real_str.size());
515  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
516  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
517  ++crt_real_str_idx;
518  crt_real_str_off += crt_real_str.size();
519  break;
520  }
521  case 8: {
522  const auto p = boost::get<std::vector<double>>(&lit);
523  CHECK(p);
524  int32_t off_and_len = crt_double_arr_lit_off << 16;
525  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
526  int32_t len = crt_double_arr_lit.size();
527  CHECK_EQ((len >> 16), 0);
528  off_and_len |= static_cast<int16_t>(len);
529  int32_t double_array_bytesize = len * sizeof(double);
530  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
531  memcpy(&serialized[crt_double_arr_lit_off],
532  crt_double_arr_lit.data(),
533  double_array_bytesize);
534  ++crt_double_arr_lit_idx;
535  crt_double_arr_lit_off += double_array_bytesize;
536  break;
537  }
538  case 9: {
539  const auto p = boost::get<std::vector<int32_t>>(&lit);
540  CHECK(p);
541  int32_t off_and_len = crt_int32_arr_lit_off << 16;
542  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
543  int32_t len = crt_int32_arr_lit.size();
544  CHECK_EQ((len >> 16), 0);
545  off_and_len |= static_cast<int16_t>(len);
546  int32_t int32_array_bytesize = len * sizeof(int32_t);
547  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
548  memcpy(&serialized[crt_int32_arr_lit_off],
549  crt_int32_arr_lit.data(),
550  int32_array_bytesize);
551  ++crt_int32_arr_lit_idx;
552  crt_int32_arr_lit_off += int32_array_bytesize;
553  break;
554  }
555  case 10: {
556  const auto p = boost::get<std::vector<int8_t>>(&lit);
557  CHECK(p);
558  int32_t off_and_len = crt_int8_arr_lit_off << 16;
559  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
560  int32_t len = crt_int8_arr_lit.size();
561  CHECK_EQ((len >> 16), 0);
562  off_and_len |= static_cast<int16_t>(len);
563  int32_t int8_array_bytesize = len;
564  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
565  memcpy(&serialized[crt_int8_arr_lit_off],
566  crt_int8_arr_lit.data(),
567  int8_array_bytesize);
568  ++crt_int8_arr_lit_idx;
569  crt_int8_arr_lit_off += int8_array_bytesize;
570  break;
571  }
572  case 11: {
573  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
574  CHECK(p);
575  if (p->second == 64) {
576  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
577  const auto& crt_align64_int8_arr_lit =
578  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
579  int32_t len = crt_align64_int8_arr_lit.size();
580  CHECK_EQ((len >> 16), 0);
581  off_and_len |= static_cast<int16_t>(len);
582  int32_t align64_int8_array_bytesize = len;
583  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
584  memcpy(&serialized[crt_align64_int8_arr_lit_off],
585  crt_align64_int8_arr_lit.data(),
586  align64_int8_array_bytesize);
587  ++crt_align64_int8_arr_lit_idx;
588  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
589  } else if (p->second == 32) {
590  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
591  const auto& crt_align32_int8_arr_lit =
592  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
593  int32_t len = crt_align32_int8_arr_lit.size();
594  CHECK_EQ((len >> 16), 0);
595  off_and_len |= static_cast<int16_t>(len);
596  int32_t align32_int8_array_bytesize = len;
597  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
598  memcpy(&serialized[crt_align32_int8_arr_lit_off],
599  crt_align32_int8_arr_lit.data(),
600  align32_int8_array_bytesize);
601  ++crt_align32_int8_arr_lit_idx;
602  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
603  } else {
604  CHECK(false);
605  }
606  break;
607  }
608  default:
609  CHECK(false);
610  }
611  }
612  return serialized;
613 }
614 
615 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
616  if (device_type == ExecutorDeviceType::GPU) {
617  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
618  CHECK(cuda_mgr);
619  return cuda_mgr->getDeviceCount();
620  } else {
621  return 1;
622  }
623 }
624 
626  const Data_Namespace::MemoryLevel memory_level) const {
627  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
628  : deviceCount(ExecutorDeviceType::CPU);
629 }
630 
631 // TODO(alex): remove or split
632 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
633  const SQLTypeInfo& ti,
634  const int64_t agg_init_val,
635  const int8_t out_byte_width,
636  const int64_t* out_vec,
637  const size_t out_vec_sz,
638  const bool is_group_by,
639  const bool float_argument_input) {
640  switch (agg) {
641  case kAVG:
642  case kSUM:
643  if (0 != agg_init_val) {
644  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
645  int64_t agg_result = agg_init_val;
646  for (size_t i = 0; i < out_vec_sz; ++i) {
647  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
648  }
649  return {agg_result, 0};
650  } else {
651  CHECK(ti.is_fp());
652  switch (out_byte_width) {
653  case 4: {
654  int agg_result = static_cast<int32_t>(agg_init_val);
655  for (size_t i = 0; i < out_vec_sz; ++i) {
657  &agg_result,
658  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
659  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
660  }
661  const int64_t converted_bin =
662  float_argument_input
663  ? static_cast<int64_t>(agg_result)
664  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
665  return {converted_bin, 0};
666  break;
667  }
668  case 8: {
669  int64_t agg_result = agg_init_val;
670  for (size_t i = 0; i < out_vec_sz; ++i) {
672  &agg_result,
673  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
674  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
675  }
676  return {agg_result, 0};
677  break;
678  }
679  default:
680  CHECK(false);
681  }
682  }
683  }
684  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
685  int64_t agg_result = 0;
686  for (size_t i = 0; i < out_vec_sz; ++i) {
687  agg_result += out_vec[i];
688  }
689  return {agg_result, 0};
690  } else {
691  CHECK(ti.is_fp());
692  switch (out_byte_width) {
693  case 4: {
694  float r = 0.;
695  for (size_t i = 0; i < out_vec_sz; ++i) {
696  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
697  }
698  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
699  const int64_t converted_bin =
700  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
701  return {converted_bin, 0};
702  }
703  case 8: {
704  double r = 0.;
705  for (size_t i = 0; i < out_vec_sz; ++i) {
706  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
707  }
708  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
709  }
710  default:
711  CHECK(false);
712  }
713  }
714  break;
715  case kCOUNT: {
716  uint64_t agg_result = 0;
717  for (size_t i = 0; i < out_vec_sz; ++i) {
718  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
719  agg_result += out;
720  }
721  return {static_cast<int64_t>(agg_result), 0};
722  }
723  case kMIN: {
724  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
725  int64_t agg_result = agg_init_val;
726  for (size_t i = 0; i < out_vec_sz; ++i) {
727  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
728  }
729  return {agg_result, 0};
730  } else {
731  switch (out_byte_width) {
732  case 4: {
733  int32_t agg_result = static_cast<int32_t>(agg_init_val);
734  for (size_t i = 0; i < out_vec_sz; ++i) {
736  &agg_result,
737  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
738  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
739  }
740  const int64_t converted_bin =
741  float_argument_input
742  ? static_cast<int64_t>(agg_result)
743  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
744  return {converted_bin, 0};
745  }
746  case 8: {
747  int64_t agg_result = agg_init_val;
748  for (size_t i = 0; i < out_vec_sz; ++i) {
750  &agg_result,
751  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
752  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
753  }
754  return {agg_result, 0};
755  }
756  default:
757  CHECK(false);
758  }
759  }
760  }
761  case kMAX:
762  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
763  int64_t agg_result = agg_init_val;
764  for (size_t i = 0; i < out_vec_sz; ++i) {
765  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
766  }
767  return {agg_result, 0};
768  } else {
769  switch (out_byte_width) {
770  case 4: {
771  int32_t agg_result = static_cast<int32_t>(agg_init_val);
772  for (size_t i = 0; i < out_vec_sz; ++i) {
774  &agg_result,
775  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
776  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
777  }
778  const int64_t converted_bin =
779  float_argument_input ? static_cast<int64_t>(agg_result)
780  : float_to_double_bin(agg_result, !ti.get_notnull());
781  return {converted_bin, 0};
782  }
783  case 8: {
784  int64_t agg_result = agg_init_val;
785  for (size_t i = 0; i < out_vec_sz; ++i) {
787  &agg_result,
788  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
789  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
790  }
791  return {agg_result, 0};
792  }
793  default:
794  CHECK(false);
795  }
796  }
797  case kSINGLE_VALUE: {
798  int64_t agg_result = agg_init_val;
799  for (size_t i = 0; i < out_vec_sz; ++i) {
800  if (out_vec[i] != agg_init_val) {
801  if (agg_result == agg_init_val) {
802  agg_result = out_vec[i];
803  } else if (out_vec[i] != agg_result) {
805  }
806  }
807  }
808  return {agg_result, 0};
809  }
810  case kSAMPLE: {
811  int64_t agg_result = agg_init_val;
812  for (size_t i = 0; i < out_vec_sz; ++i) {
813  if (out_vec[i] != agg_init_val) {
814  agg_result = out_vec[i];
815  break;
816  }
817  }
818  return {agg_result, 0};
819  }
820  default:
821  CHECK(false);
822  }
823  abort();
824 }
825 
826 namespace {
827 
829  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
830  auto& first = results_per_device.front().first;
831  CHECK(first);
832  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
833  const auto& next = results_per_device[dev_idx].first;
834  CHECK(next);
835  first->append(*next);
836  }
837  return std::move(first);
838 }
839 
840 } // namespace
841 
843  const RelAlgExecutionUnit& ra_exe_unit) {
844  auto& results_per_device = shared_context.getFragmentResults();
845  if (results_per_device.empty()) {
846  std::vector<TargetInfo> targets;
847  for (const auto target_expr : ra_exe_unit.target_exprs) {
848  targets.push_back(get_target_info(target_expr, g_bigint_count));
849  }
850  return std::make_shared<ResultSet>(targets,
853  row_set_mem_owner_,
854  this);
855  }
856  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
857  std::sort(results_per_device.begin(),
858  results_per_device.end(),
859  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
860  CHECK_GE(lhs.second.size(), size_t(1));
861  CHECK_GE(rhs.second.size(), size_t(1));
862  return lhs.second.front() < rhs.second.front();
863  });
864 
865  return get_merged_result(results_per_device);
866 }
867 
869  const RelAlgExecutionUnit& ra_exe_unit,
870  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
871  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
872  const QueryMemoryDescriptor& query_mem_desc) const {
873  auto timer = DEBUG_TIMER(__func__);
874  if (ra_exe_unit.estimator) {
875  return reduce_estimator_results(ra_exe_unit, results_per_device);
876  }
877 
878  if (results_per_device.empty()) {
879  std::vector<TargetInfo> targets;
880  for (const auto target_expr : ra_exe_unit.target_exprs) {
881  targets.push_back(get_target_info(target_expr, g_bigint_count));
882  }
883  return std::make_shared<ResultSet>(
884  targets, ExecutorDeviceType::CPU, QueryMemoryDescriptor(), nullptr, this);
885  }
886 
887  return reduceMultiDeviceResultSets(
888  results_per_device,
889  row_set_mem_owner,
890  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
891 }
892 
893 namespace {
894 
896  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
897  int64_t* compilation_queue_time) {
898  auto clock_begin = timer_start();
899  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
900  *compilation_queue_time = timer_stop(clock_begin);
901  const auto& this_result_set = results_per_device[0].first;
902  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
903  this_result_set->getTargetInfos(),
904  this_result_set->getTargetInitVals());
905  return reduction_jit.codegen();
906 };
907 
908 } // namespace
909 
911  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
912  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
913  const QueryMemoryDescriptor& query_mem_desc) const {
914  auto timer = DEBUG_TIMER(__func__);
915  std::shared_ptr<ResultSet> reduced_results;
916 
917  const auto& first = results_per_device.front().first;
918 
919  if (query_mem_desc.getQueryDescriptionType() ==
921  results_per_device.size() > 1) {
922  const auto total_entry_count = std::accumulate(
923  results_per_device.begin(),
924  results_per_device.end(),
925  size_t(0),
926  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
927  const auto& r = rs.first;
928  return init + r->getQueryMemDesc().getEntryCount();
929  });
930  CHECK(total_entry_count);
931  auto query_mem_desc = first->getQueryMemDesc();
932  query_mem_desc.setEntryCount(total_entry_count);
933  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
936  row_set_mem_owner,
937  this);
938  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
939  reduced_results->initializeStorage();
940  switch (query_mem_desc.getEffectiveKeyWidth()) {
941  case 4:
942  first->getStorage()->moveEntriesToBuffer<int32_t>(
943  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
944  break;
945  case 8:
946  first->getStorage()->moveEntriesToBuffer<int64_t>(
947  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
948  break;
949  default:
950  CHECK(false);
951  }
952  } else {
953  reduced_results = first;
954  }
955 
956  int64_t compilation_queue_time = 0;
957  const auto reduction_code =
958  get_reduction_code(results_per_device, &compilation_queue_time);
959 
960  for (size_t i = 1; i < results_per_device.size(); ++i) {
961  reduced_results->getStorage()->reduce(
962  *(results_per_device[i].first->getStorage()), {}, reduction_code);
963  }
964  reduced_results->addCompilationQueueTime(compilation_queue_time);
965  return reduced_results;
966 }
967 
969  const RelAlgExecutionUnit& ra_exe_unit,
970  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
971  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
972  const QueryMemoryDescriptor& query_mem_desc) const {
973  if (results_per_device.size() == 1) {
974  return std::move(results_per_device.front().first);
975  }
976  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
978  for (const auto& result : results_per_device) {
979  auto rows = result.first;
980  CHECK(rows);
981  if (!rows) {
982  continue;
983  }
984  SpeculativeTopNMap that(
985  *rows,
986  ra_exe_unit.target_exprs,
987  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
988  m.reduce(that);
989  }
990  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
991  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
992  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
993 }
994 
995 std::unordered_set<int> get_available_gpus(const Catalog_Namespace::Catalog& cat) {
996  std::unordered_set<int> available_gpus;
997  if (cat.getDataMgr().gpusPresent()) {
998  int gpu_count = cat.getDataMgr().getCudaMgr()->getDeviceCount();
999  CHECK_GT(gpu_count, 0);
1000  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1001  available_gpus.insert(gpu_id);
1002  }
1003  }
1004  return available_gpus;
1005 }
1006 
1007 size_t get_context_count(const ExecutorDeviceType device_type,
1008  const size_t cpu_count,
1009  const size_t gpu_count) {
1010  return device_type == ExecutorDeviceType::GPU ? gpu_count
1011  : static_cast<size_t>(cpu_count);
1012 }
1013 
1014 namespace {
1015 
1016 // Compute a very conservative entry count for the output buffer entry count using no
1017 // other information than the number of tuples in each table and multiplying them
1018 // together.
1019 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1021  // Check for overflows since we're multiplying potentially big table sizes.
1022  using checked_size_t = boost::multiprecision::number<
1023  boost::multiprecision::cpp_int_backend<64,
1024  64,
1025  boost::multiprecision::unsigned_magnitude,
1026  boost::multiprecision::checked,
1027  void>>;
1028  checked_size_t max_groups_buffer_entry_guess = 1;
1029  for (const auto& query_info : query_infos) {
1030  CHECK(!query_info.info.fragments.empty());
1031  auto it = std::max_element(query_info.info.fragments.begin(),
1032  query_info.info.fragments.end(),
1033  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1034  return f1.getNumTuples() < f2.getNumTuples();
1035  });
1036  max_groups_buffer_entry_guess *= it->getNumTuples();
1037  }
1038  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1039  // baseline group layout with that many entries anyway.
1040  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1041  try {
1042  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1043  max_groups_buffer_entry_guess_cap);
1044  } catch (...) {
1045  return max_groups_buffer_entry_guess_cap;
1046  }
1047 }
1048 
1049 std::string get_table_name(const InputDescriptor& input_desc,
1051  const auto source_type = input_desc.getSourceType();
1052  if (source_type == InputSourceType::TABLE) {
1053  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1054  CHECK(td);
1055  return td->tableName;
1056  } else {
1057  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1058  }
1059 }
1060 
1061 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1062  const int device_count) {
1063  if (device_type == ExecutorDeviceType::GPU) {
1064  return device_count * Executor::high_scan_limit;
1065  }
1067 }
1068 
1070  const std::vector<InputTableInfo>& table_infos,
1072  const ExecutorDeviceType device_type,
1073  const int device_count) {
1074  for (const auto target_expr : ra_exe_unit.target_exprs) {
1075  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1076  return;
1077  }
1078  }
1079  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1080  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1081  // Allow a query with no scan limit to run on small tables
1082  return;
1083  }
1084  if (ra_exe_unit.use_bump_allocator) {
1085  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1086  // relative to the size of the input), so we bypass this check for now
1087  return;
1088  }
1089  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1090  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1091  (!ra_exe_unit.scan_limit ||
1092  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1093  std::vector<std::string> table_names;
1094  const auto& input_descs = ra_exe_unit.input_descs;
1095  for (const auto& input_desc : input_descs) {
1096  table_names.push_back(get_table_name(input_desc, cat));
1097  }
1098  if (!ra_exe_unit.scan_limit) {
1099  throw WatchdogException(
1100  "Projection query would require a scan without a limit on table(s): " +
1101  boost::algorithm::join(table_names, ", "));
1102  } else {
1103  throw WatchdogException(
1104  "Projection query output result set on table(s): " +
1105  boost::algorithm::join(table_names, ", ") + " would contain " +
1106  std::to_string(ra_exe_unit.scan_limit) +
1107  " rows, which is more than the current system limit of " +
1108  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1109  }
1110  }
1111 }
1112 
1113 } // namespace
1114 
1115 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1116  const RelAlgExecutionUnit& ra_exe_unit) {
1117  if (ra_exe_unit.input_descs.size() < 2) {
1118  return false;
1119  }
1120 
1121  // We only support loop join at the end of folded joins
1122  // where ra_exe_unit.input_descs.size() > 2 for now.
1123  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1124 
1125  ssize_t inner_table_idx = -1;
1126  for (size_t i = 0; i < query_infos.size(); ++i) {
1127  if (query_infos[i].table_id == inner_table_id) {
1128  inner_table_idx = i;
1129  break;
1130  }
1131  }
1132  CHECK_NE(ssize_t(-1), inner_table_idx);
1133  return query_infos[inner_table_idx].info.getNumTuples() <=
1135 }
1136 
1137 namespace {
1138 
1139 template <typename T>
1140 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1141  std::vector<std::string> expr_strs;
1142  for (const auto& expr : expr_container) {
1143  if (!expr) {
1144  expr_strs.emplace_back("NULL");
1145  } else {
1146  expr_strs.emplace_back(expr->toString());
1147  }
1148  }
1149  return expr_strs;
1150 }
1151 
1152 template <>
1153 std::vector<std::string> expr_container_to_string(
1154  const std::list<Analyzer::OrderEntry>& expr_container) {
1155  std::vector<std::string> expr_strs;
1156  for (const auto& expr : expr_container) {
1157  expr_strs.emplace_back(expr.toString());
1158  }
1159  return expr_strs;
1160 }
1161 
1162 std::string join_type_to_string(const JoinType type) {
1163  switch (type) {
1164  case JoinType::INNER:
1165  return "INNER";
1166  case JoinType::LEFT:
1167  return "LEFT";
1168  case JoinType::INVALID:
1169  return "INVALID";
1170  }
1171  UNREACHABLE();
1172  return "";
1173 }
1174 
1175 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1176  switch (algorithm) {
1178  return "ResultSet";
1180  return "Speculative Top N";
1182  return "Streaming Top N";
1183  }
1184  UNREACHABLE();
1185  return "";
1186 }
1187 
1188 } // namespace
1189 
1190 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1191  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1192  // instead of ra_exec_unit description if possible
1193  std::ostringstream os;
1194  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1195  const auto& scan_desc = input_col_desc->getScanDesc();
1196  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1197  << scan_desc.getNestLevel();
1198  }
1199  if (!ra_exe_unit.simple_quals.empty()) {
1200  for (const auto& qual : ra_exe_unit.simple_quals) {
1201  if (qual) {
1202  os << qual->toString() << ",";
1203  }
1204  }
1205  }
1206  if (!ra_exe_unit.quals.empty()) {
1207  for (const auto& qual : ra_exe_unit.quals) {
1208  if (qual) {
1209  os << qual->toString() << ",";
1210  }
1211  }
1212  }
1213  if (!ra_exe_unit.join_quals.empty()) {
1214  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1215  const auto& join_condition = ra_exe_unit.join_quals[i];
1216  os << std::to_string(i) << join_type_to_string(join_condition.type);
1217  for (const auto& qual : join_condition.quals) {
1218  if (qual) {
1219  os << qual->toString() << ",";
1220  }
1221  }
1222  }
1223  }
1224  if (!ra_exe_unit.groupby_exprs.empty()) {
1225  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1226  if (qual) {
1227  os << qual->toString() << ",";
1228  }
1229  }
1230  }
1231  for (const auto& expr : ra_exe_unit.target_exprs) {
1232  if (expr) {
1233  os << expr->toString() << ",";
1234  }
1235  }
1236  os << bool_to_string(ra_exe_unit.estimator == nullptr);
1237  os << std::to_string(ra_exe_unit.scan_limit);
1238  return os.str();
1239 }
1240 
1241 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1242  os << "\n\tTable/Col/Levels: ";
1243  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1244  const auto& scan_desc = input_col_desc->getScanDesc();
1245  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1246  << scan_desc.getNestLevel() << ") ";
1247  }
1248  if (!ra_exe_unit.simple_quals.empty()) {
1249  os << "\n\tSimple Quals: "
1251  ", ");
1252  }
1253  if (!ra_exe_unit.quals.empty()) {
1254  os << "\n\tQuals: "
1255  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1256  }
1257  if (!ra_exe_unit.join_quals.empty()) {
1258  os << "\n\tJoin Quals: ";
1259  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1260  const auto& join_condition = ra_exe_unit.join_quals[i];
1261  os << "\t\t" << std::to_string(i) << " "
1262  << join_type_to_string(join_condition.type);
1263  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1264  }
1265  }
1266  if (!ra_exe_unit.groupby_exprs.empty()) {
1267  os << "\n\tGroup By: "
1269  ", ");
1270  }
1271  os << "\n\tProjected targets: "
1273  os << "\n\tHas Estimator: " << bool_to_string(ra_exe_unit.estimator == nullptr);
1274  os << "\n\tSort Info: ";
1275  const auto& sort_info = ra_exe_unit.sort_info;
1276  os << "\n\t Order Entries: "
1277  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1278  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1279  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1280  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1281  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1282  os << "\n\tBump Allocator: " << bool_to_string(ra_exe_unit.use_bump_allocator);
1283  if (ra_exe_unit.union_all) {
1284  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1285  }
1286  return os;
1287 }
1288 
1289 namespace {
1290 
1292  const size_t new_scan_limit) {
1293  return {ra_exe_unit_in.input_descs,
1294  ra_exe_unit_in.input_col_descs,
1295  ra_exe_unit_in.simple_quals,
1296  ra_exe_unit_in.quals,
1297  ra_exe_unit_in.join_quals,
1298  ra_exe_unit_in.groupby_exprs,
1299  ra_exe_unit_in.target_exprs,
1300  ra_exe_unit_in.estimator,
1301  ra_exe_unit_in.sort_info,
1302  new_scan_limit,
1303  ra_exe_unit_in.use_bump_allocator,
1304  ra_exe_unit_in.union_all,
1305  ra_exe_unit_in.query_state};
1306 }
1307 
1308 } // namespace
1309 
1311  size_t& max_groups_buffer_entry_guess,
1312  const bool is_agg,
1313  const std::vector<InputTableInfo>& query_infos,
1314  const RelAlgExecutionUnit& ra_exe_unit_in,
1315  const CompilationOptions& co,
1316  const ExecutionOptions& eo,
1318  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1319  RenderInfo* render_info,
1320  const bool has_cardinality_estimation,
1321  ColumnCacheMap& column_cache) {
1322  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1323 
1324  ScopeGuard cleanup_post_execution = [this] {
1325  // cleanup/unpin GPU buffer allocations
1326  // TODO: separate out this state into a single object
1327  plan_state_.reset(nullptr);
1328  if (cgen_state_) {
1329  cgen_state_->in_values_bitmaps_.clear();
1330  }
1331  };
1332 
1333  try {
1334  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1335  is_agg,
1336  true,
1337  query_infos,
1338  ra_exe_unit_in,
1339  co,
1340  eo,
1341  cat,
1342  row_set_mem_owner,
1343  render_info,
1344  has_cardinality_estimation,
1345  column_cache);
1346  if (result) {
1347  result->setKernelQueueTime(kernel_queue_time_ms_);
1348  result->addCompilationQueueTime(compilation_queue_time_ms_);
1349  }
1350  return result;
1351  } catch (const CompilationRetryNewScanLimit& e) {
1352  auto result =
1353  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1354  is_agg,
1355  false,
1356  query_infos,
1357  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1358  co,
1359  eo,
1360  cat,
1361  row_set_mem_owner,
1362  render_info,
1363  has_cardinality_estimation,
1364  column_cache);
1365  if (result) {
1366  result->setKernelQueueTime(kernel_queue_time_ms_);
1367  result->addCompilationQueueTime(compilation_queue_time_ms_);
1368  }
1369  return result;
1370  }
1371 }
1372 
1374  size_t& max_groups_buffer_entry_guess,
1375  const bool is_agg,
1376  const bool allow_single_frag_table_opt,
1377  const std::vector<InputTableInfo>& query_infos,
1378  const RelAlgExecutionUnit& ra_exe_unit_in,
1379  const CompilationOptions& co,
1380  const ExecutionOptions& eo,
1382  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1383  RenderInfo* render_info,
1384  const bool has_cardinality_estimation,
1385  ColumnCacheMap& column_cache) {
1386  INJECT_TIMER(Exec_executeWorkUnit);
1387  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1388  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1389  CHECK(!query_infos.empty());
1390  if (!max_groups_buffer_entry_guess) {
1391  // The query has failed the first execution attempt because of running out
1392  // of group by slots. Make the conservative choice: allocate fragment size
1393  // slots and run on the CPU.
1394  CHECK(device_type == ExecutorDeviceType::CPU);
1395  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1396  }
1397 
1398  int8_t crt_min_byte_width{get_min_byte_width()};
1399  do {
1400  SharedKernelContext shared_context(query_infos);
1401  ColumnFetcher column_fetcher(this, column_cache);
1402  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1403  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1404  if (eo.executor_type == ExecutorType::Native) {
1405  try {
1406  INJECT_TIMER(query_step_compilation);
1407  auto clock_begin = timer_start();
1408  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1409  compilation_queue_time_ms_ += timer_stop(clock_begin);
1410 
1411  query_mem_desc_owned =
1412  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1413  crt_min_byte_width,
1414  has_cardinality_estimation,
1415  ra_exe_unit,
1416  query_infos,
1417  column_fetcher,
1418  {device_type,
1419  co.hoist_literals,
1420  co.opt_level,
1422  co.allow_lazy_fetch,
1424  co.explain_type,
1426  eo,
1427  render_info,
1428  this);
1429  CHECK(query_mem_desc_owned);
1430  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1431  } catch (CompilationRetryNoCompaction&) {
1432  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1433  continue;
1434  }
1435  } else {
1436  plan_state_.reset(new PlanState(false, this));
1437  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1438  CHECK(!query_mem_desc_owned);
1439  query_mem_desc_owned.reset(
1441  }
1442  if (eo.just_explain) {
1443  return executeExplain(*query_comp_desc_owned);
1444  }
1445 
1446  for (const auto target_expr : ra_exe_unit.target_exprs) {
1447  plan_state_->target_exprs_.push_back(target_expr);
1448  }
1449 
1450  if (!eo.just_validate) {
1451  int available_cpus = cpu_threads();
1452  auto available_gpus = get_available_gpus(cat);
1453 
1454  const auto context_count =
1455  get_context_count(device_type, available_cpus, available_gpus.size());
1456  try {
1457  auto kernels = createKernels(shared_context,
1458  ra_exe_unit,
1459  column_fetcher,
1460  query_infos,
1461  eo,
1462  is_agg,
1463  allow_single_frag_table_opt,
1464  context_count,
1465  *query_comp_desc_owned,
1466  *query_mem_desc_owned,
1467  render_info,
1468  available_gpus,
1469  available_cpus);
1470  if (g_use_tbb_pool) {
1471 #ifdef HAVE_TBB
1472  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1473  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1474  std::move(kernels));
1475 #else
1476  throw std::runtime_error(
1477  "This build is not TBB enabled. Restart the server with "
1478  "\"enable-modern-thread-pool\" disabled.");
1479 #endif
1480  } else {
1481  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1482  std::move(kernels));
1483  }
1484  } catch (QueryExecutionError& e) {
1485  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1486  e.getErrorCode() == ERR_OUT_OF_TIME) {
1487  resetInterrupt();
1488  throw QueryExecutionError(ERR_INTERRUPTED);
1489  }
1490  if (eo.allow_runtime_query_interrupt && interrupted_.load()) {
1491  resetInterrupt();
1492  throw QueryExecutionError(ERR_INTERRUPTED);
1493  }
1494  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1495  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1496  crt_min_byte_width <<= 1;
1497  continue;
1498  }
1499  throw;
1500  }
1501  }
1502  if (is_agg) {
1503  try {
1504  return collectAllDeviceResults(shared_context,
1505  ra_exe_unit,
1506  *query_mem_desc_owned,
1507  query_comp_desc_owned->getDeviceType(),
1508  row_set_mem_owner);
1509  } catch (ReductionRanOutOfSlots&) {
1510  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1511  } catch (OverflowOrUnderflow&) {
1512  crt_min_byte_width <<= 1;
1513  continue;
1514  }
1515  }
1516  return resultsUnion(shared_context, ra_exe_unit);
1517 
1518  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1519 
1520  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1523  nullptr,
1524  this);
1525 }
1526 
1528  const InputTableInfo& table_info,
1529  const CompilationOptions& co,
1530  const ExecutionOptions& eo,
1532  PerFragmentCallBack& cb) {
1533  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1534  ColumnCacheMap column_cache;
1535 
1536  std::vector<InputTableInfo> table_infos{table_info};
1537  SharedKernelContext kernel_context(table_infos);
1538 
1539  ColumnFetcher column_fetcher(this, column_cache);
1540  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1541  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1542  {
1543  auto clock_begin = timer_start();
1544  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1545  compilation_queue_time_ms_ += timer_stop(clock_begin);
1546  query_mem_desc_owned =
1547  query_comp_desc_owned->compile(0,
1548  8,
1549  /*has_cardinality_estimation=*/false,
1550  ra_exe_unit,
1551  table_infos,
1552  column_fetcher,
1553  co,
1554  eo,
1555  nullptr,
1556  this);
1557  }
1558  CHECK(query_mem_desc_owned);
1559  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1560  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1561  const auto& outer_fragments = table_info.info.fragments;
1562 
1563  {
1564  auto clock_begin = timer_start();
1565  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1566  kernel_queue_time_ms_ += timer_stop(clock_begin);
1567 
1568  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1569  ++fragment_index) {
1570  // We may want to consider in the future allowing this to execute on devices other
1571  // than CPU
1572  FragmentsList fragments_list{{table_id, {fragment_index}}};
1573  ExecutionKernel kernel(ra_exe_unit,
1574  co.device_type,
1575  /*device_id=*/0,
1576  eo,
1577  column_fetcher,
1578  *query_comp_desc_owned,
1579  *query_mem_desc_owned,
1580  fragments_list,
1582  /*render_info=*/nullptr,
1583  /*rowid_lookup_key=*/-1,
1584  logger::thread_id());
1585  kernel.run(this, kernel_context);
1586  }
1587  }
1588 
1589  const auto& all_fragment_results = kernel_context.getFragmentResults();
1590 
1591  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1592  ++fragment_index) {
1593  const auto fragment_results = all_fragment_results[fragment_index];
1594  cb(fragment_results.first, outer_fragments[fragment_index]);
1595  }
1596 }
1597 
1599  const TableFunctionExecutionUnit exe_unit,
1600  const std::vector<InputTableInfo>& table_infos,
1601  const CompilationOptions& co,
1602  const ExecutionOptions& eo,
1604  INJECT_TIMER(Exec_executeTableFunction);
1605  nukeOldState(false, table_infos, nullptr);
1606 
1607  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1608  // framework, we may want to move this up a level
1609 
1610  ColumnFetcher column_fetcher(this, column_cache);
1611  TableFunctionCompilationContext compilation_context;
1612  compilation_context.compile(exe_unit, co, this);
1613 
1614  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1615  CHECK_EQ(table_infos.size(), size_t(1));
1616  return exe_context.execute(exe_unit,
1617  table_infos.front(),
1618  &compilation_context,
1619  column_fetcher,
1620  co.device_type,
1621  this);
1622 }
1623 
1625  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1626 }
1627 
1629  const RelAlgExecutionUnit& ra_exe_unit,
1630  const ExecutorDeviceType requested_device_type) {
1631  for (const auto target_expr : ra_exe_unit.target_exprs) {
1632  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1633  if (!ra_exe_unit.groupby_exprs.empty() &&
1634  !isArchPascalOrLater(requested_device_type)) {
1635  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1636  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1637  return ExecutorDeviceType::CPU;
1638  }
1639  }
1640  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1641  return ExecutorDeviceType::CPU;
1642  }
1643  }
1644  return requested_device_type;
1645 }
1646 
1647 namespace {
1648 
1649 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1650  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1651  if (ti.is_fp()) {
1652  if (float_argument_input && ti.get_type() == kFLOAT) {
1653  int64_t float_null_val = 0;
1654  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1655  static_cast<float>(inline_fp_null_val(ti));
1656  return float_null_val;
1657  }
1658  const auto double_null_val = inline_fp_null_val(ti);
1659  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1660  }
1661  return inline_int_null_val(ti);
1662 }
1663 
1664 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1665  std::vector<int64_t>& entry,
1666  const std::vector<Analyzer::Expr*>& target_exprs,
1668  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1669  const auto target_expr = target_exprs[target_idx];
1670  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1671  CHECK(agg_info.is_agg);
1672  target_infos.push_back(agg_info);
1673  if (g_cluster) {
1674  const auto executor = query_mem_desc.getExecutor();
1675  CHECK(executor);
1676  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1677  CHECK(row_set_mem_owner);
1678  const auto& count_distinct_desc =
1679  query_mem_desc.getCountDistinctDescriptor(target_idx);
1680  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1681  CHECK(row_set_mem_owner);
1682  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1683  count_distinct_desc.bitmapPaddedSizeBytes());
1684  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1685  continue;
1686  }
1687  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1688  auto count_distinct_set = new std::set<int64_t>();
1689  CHECK(row_set_mem_owner);
1690  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1691  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1692  continue;
1693  }
1694  }
1695  const bool float_argument_input = takes_float_argument(agg_info);
1696  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1697  entry.push_back(0);
1698  } else if (agg_info.agg_kind == kAVG) {
1699  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1700  entry.push_back(0);
1701  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1702  if (agg_info.sql_type.is_geometry()) {
1703  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1704  entry.push_back(0);
1705  }
1706  } else if (agg_info.sql_type.is_varlen()) {
1707  entry.push_back(0);
1708  entry.push_back(0);
1709  } else {
1710  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1711  }
1712  } else {
1713  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1714  }
1715  }
1716 }
1717 
1719  const std::vector<Analyzer::Expr*>& target_exprs_in,
1721  const ExecutorDeviceType device_type) {
1722  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1723  std::vector<Analyzer::Expr*> target_exprs;
1724  for (const auto target_expr : target_exprs_in) {
1725  const auto target_expr_copy =
1726  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1727  CHECK(target_expr_copy);
1728  auto ti = target_expr->get_type_info();
1729  ti.set_notnull(false);
1730  target_expr_copy->set_type_info(ti);
1731  if (target_expr_copy->get_arg()) {
1732  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1733  arg_ti.set_notnull(false);
1734  target_expr_copy->get_arg()->set_type_info(arg_ti);
1735  }
1736  target_exprs_owned_copies.push_back(target_expr_copy);
1737  target_exprs.push_back(target_expr_copy.get());
1738  }
1739  std::vector<TargetInfo> target_infos;
1740  std::vector<int64_t> entry;
1741  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1742  const auto executor = query_mem_desc.getExecutor();
1743  CHECK(executor);
1744  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1745  CHECK(row_set_mem_owner);
1746  auto rs = std::make_shared<ResultSet>(
1747  target_infos, device_type, query_mem_desc, row_set_mem_owner, executor);
1748  rs->allocateStorage();
1749  rs->fillOneEntry(entry);
1750  return rs;
1751 }
1752 
1753 } // namespace
1754 
1756  SharedKernelContext& shared_context,
1757  const RelAlgExecutionUnit& ra_exe_unit,
1759  const ExecutorDeviceType device_type,
1760  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1761  auto timer = DEBUG_TIMER(__func__);
1762  auto& result_per_device = shared_context.getFragmentResults();
1763  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1766  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1767  }
1768  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1769  try {
1770  return reduceSpeculativeTopN(
1771  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1772  } catch (const std::bad_alloc&) {
1773  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1774  }
1775  }
1776  const auto shard_count =
1777  device_type == ExecutorDeviceType::GPU
1779  : 0;
1780 
1781  if (shard_count && !result_per_device.empty()) {
1782  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1783  }
1784  return reduceMultiDeviceResults(
1785  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1786 }
1787 
1788 namespace {
1799 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1800  const QueryMemoryDescriptor& input_query_mem_desc,
1801  const ResultSetStorage* output_storage,
1802  size_t output_row_index,
1803  const QueryMemoryDescriptor& output_query_mem_desc,
1804  const std::vector<uint32_t>& top_permutation) {
1805  const auto output_buffer = output_storage->getUnderlyingBuffer();
1806  const auto input_buffer = input_storage->getUnderlyingBuffer();
1807  for (const auto sorted_idx : top_permutation) {
1808  // permuting all group-columns in this result set into the final buffer:
1809  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1810  group_idx++) {
1811  const auto input_column_ptr =
1812  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1813  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1814  const auto output_column_ptr =
1815  output_buffer +
1816  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1817  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1818  memcpy(output_column_ptr,
1819  input_column_ptr,
1820  output_query_mem_desc.groupColWidth(group_idx));
1821  }
1822  // permuting all agg-columns in this result set into the final buffer:
1823  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1824  slot_idx++) {
1825  const auto input_column_ptr =
1826  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1827  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1828  const auto output_column_ptr =
1829  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1830  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1831  memcpy(output_column_ptr,
1832  input_column_ptr,
1833  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1834  }
1835  ++output_row_index;
1836  }
1837  return output_row_index;
1838 }
1839 
1849 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
1850  const ResultSetStorage* output_storage,
1851  size_t output_row_index,
1852  const QueryMemoryDescriptor& output_query_mem_desc,
1853  const std::vector<uint32_t>& top_permutation) {
1854  const auto output_buffer = output_storage->getUnderlyingBuffer();
1855  const auto input_buffer = input_storage->getUnderlyingBuffer();
1856  for (const auto sorted_idx : top_permutation) {
1857  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
1858  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
1859  row_ptr,
1860  output_query_mem_desc.getRowSize());
1861  ++output_row_index;
1862  }
1863  return output_row_index;
1864 }
1865 } // namespace
1866 
1867 // Collect top results from each device, stitch them together and sort. Partial
1868 // results from each device are guaranteed to be disjunct because we only go on
1869 // this path when one of the columns involved is a shard key.
1871  SharedKernelContext& shared_context,
1872  const RelAlgExecutionUnit& ra_exe_unit) const {
1873  auto& result_per_device = shared_context.getFragmentResults();
1874  const auto first_result_set = result_per_device.front().first;
1875  CHECK(first_result_set);
1876  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1877  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1878  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1879  top_query_mem_desc.setEntryCount(0);
1880  for (auto& result : result_per_device) {
1881  const auto result_set = result.first;
1882  CHECK(result_set);
1883  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1884  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1885  top_query_mem_desc.setEntryCount(new_entry_cnt);
1886  }
1887  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1888  first_result_set->getDeviceType(),
1889  top_query_mem_desc,
1890  first_result_set->getRowSetMemOwner(),
1891  this);
1892  auto top_storage = top_result_set->allocateStorage();
1893  size_t top_output_row_idx{0};
1894  for (auto& result : result_per_device) {
1895  const auto result_set = result.first;
1896  CHECK(result_set);
1897  const auto& top_permutation = result_set->getPermutationBuffer();
1898  CHECK_LE(top_permutation.size(), top_n);
1899  if (top_query_mem_desc.didOutputColumnar()) {
1900  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1901  result_set->getQueryMemDesc(),
1902  top_storage,
1903  top_output_row_idx,
1904  top_query_mem_desc,
1905  top_permutation);
1906  } else {
1907  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1908  top_storage,
1909  top_output_row_idx,
1910  top_query_mem_desc,
1911  top_permutation);
1912  }
1913  }
1914  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1915  return top_result_set;
1916 }
1917 
1918 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
1919  const {
1920  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1921  const auto& join_info = plan_state_->join_info_;
1922  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1923  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1924  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1925  id_to_cond.insert(
1926  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1927  }
1928  return id_to_cond;
1929 }
1930 
1931 namespace {
1932 
1933 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
1934  for (const auto& col : fetched_cols) {
1935  if (col.is_lazily_fetched) {
1936  return true;
1937  }
1938  }
1939  return false;
1940 }
1941 
1942 } // namespace
1943 
1944 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
1945  SharedKernelContext& shared_context,
1946  const RelAlgExecutionUnit& ra_exe_unit,
1947  ColumnFetcher& column_fetcher,
1948  const std::vector<InputTableInfo>& table_infos,
1949  const ExecutionOptions& eo,
1950  const bool is_agg,
1951  const bool allow_single_frag_table_opt,
1952  const size_t context_count,
1953  const QueryCompilationDescriptor& query_comp_desc,
1955  RenderInfo* render_info,
1956  std::unordered_set<int>& available_gpus,
1957  int& available_cpus) {
1958  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
1959 
1960  QueryFragmentDescriptor fragment_descriptor(
1961  ra_exe_unit,
1962  table_infos,
1963  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
1964  ? catalog_->getDataMgr().getMemoryInfo(Data_Namespace::MemoryLevel::GPU_LEVEL)
1965  : std::vector<Data_Namespace::MemoryInfo>{},
1968  CHECK(!ra_exe_unit.input_descs.empty());
1969 
1970  const auto device_type = query_comp_desc.getDeviceType();
1971  const bool uses_lazy_fetch =
1972  plan_state_->allow_lazy_fetch_ &&
1973  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
1974  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1975  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1976  const auto device_count = deviceCount(device_type);
1977  CHECK_GT(device_count, 0);
1978 
1979  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1980  shared_context.getFragOffsets(),
1981  device_count,
1982  device_type,
1983  use_multifrag_kernel,
1985  this);
1986  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1987  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1988  }
1989 
1990  if (use_multifrag_kernel) {
1991  VLOG(1) << "Creating multifrag execution kernels";
1992  VLOG(1) << query_mem_desc.toString();
1993 
1994  // NB: We should never be on this path when the query is retried because of running
1995  // out of group by slots; also, for scan only queries on CPU we want the
1996  // high-granularity, fragment by fragment execution instead. For scan only queries on
1997  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1998  // buffer per fragment.
1999  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2000  &execution_kernels,
2001  &column_fetcher,
2002  &eo,
2003  &query_comp_desc,
2004  &query_mem_desc,
2005  render_info,
2006  parent_thread_id = logger::thread_id()](
2007  const int device_id,
2008  const FragmentsList& frag_list,
2009  const int64_t rowid_lookup_key) {
2010  execution_kernels.emplace_back(
2011  std::make_unique<ExecutionKernel>(ra_exe_unit,
2013  device_id,
2014  eo,
2015  column_fetcher,
2016  query_comp_desc,
2017  query_mem_desc,
2018  frag_list,
2020  render_info,
2021  rowid_lookup_key,
2022  parent_thread_id));
2023  };
2024  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2025  } else {
2026  VLOG(1) << "Creating one execution kernel per fragment";
2027  VLOG(1) << query_mem_desc.toString();
2028 
2029  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2031  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2032  const auto max_frag_size =
2033  table_infos.front().info.getFragmentNumTuplesUpperBound();
2034  if (max_frag_size < query_mem_desc.getEntryCount()) {
2035  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2036  << " to match max fragment size " << max_frag_size
2037  << " for kernel per fragment execution path.";
2038  throw CompilationRetryNewScanLimit(max_frag_size);
2039  }
2040  }
2041 
2042  size_t frag_list_idx{0};
2043  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2044  &execution_kernels,
2045  &column_fetcher,
2046  &eo,
2047  &frag_list_idx,
2048  &device_type,
2049  &query_comp_desc,
2050  &query_mem_desc,
2051  render_info,
2052  parent_thread_id = logger::thread_id()](
2053  const int device_id,
2054  const FragmentsList& frag_list,
2055  const int64_t rowid_lookup_key) {
2056  if (!frag_list.size()) {
2057  return;
2058  }
2059  CHECK_GE(device_id, 0);
2060 
2061  execution_kernels.emplace_back(
2062  std::make_unique<ExecutionKernel>(ra_exe_unit,
2063  device_type,
2064  device_id,
2065  eo,
2066  column_fetcher,
2067  query_comp_desc,
2068  query_mem_desc,
2069  frag_list,
2071  render_info,
2072  rowid_lookup_key,
2073  parent_thread_id));
2074  ++frag_list_idx;
2075  };
2076 
2077  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2078  ra_exe_unit);
2079  }
2080 
2081  return execution_kernels;
2082 }
2083 
2084 template <typename THREAD_POOL>
2086  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2087  auto clock_begin = timer_start();
2088  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2089  kernel_queue_time_ms_ += timer_stop(clock_begin);
2090 
2091  THREAD_POOL thread_pool;
2092  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2093  for (auto& kernel : kernels) {
2094  thread_pool.append(
2095  [this, &shared_context](ExecutionKernel* kernel) {
2096  CHECK(kernel);
2097  kernel->run(this, shared_context);
2098  },
2099  kernel.get());
2100  }
2101  thread_pool.join();
2102 }
2103 
2105  const RelAlgExecutionUnit& ra_exe_unit,
2106  const ExecutorDeviceType device_type,
2107  const size_t table_idx,
2108  const size_t outer_frag_idx,
2109  std::map<int, const TableFragments*>& selected_tables_fragments,
2110  const std::unordered_map<int, const Analyzer::BinOper*>&
2111  inner_table_id_to_join_condition) {
2112  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2113  auto table_frags_it = selected_tables_fragments.find(table_id);
2114  CHECK(table_frags_it != selected_tables_fragments.end());
2115  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2116  const auto outer_table_fragments_it =
2117  selected_tables_fragments.find(outer_input_desc.getTableId());
2118  const auto outer_table_fragments = outer_table_fragments_it->second;
2119  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2120  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2121  if (!table_idx) {
2122  return {outer_frag_idx};
2123  }
2124  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2125  auto& inner_frags = table_frags_it->second;
2126  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2127  std::vector<size_t> all_frag_ids;
2128  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2129  ++inner_frag_idx) {
2130  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2131  if (skipFragmentPair(outer_fragment_info,
2132  inner_frag_info,
2133  table_idx,
2134  inner_table_id_to_join_condition,
2135  ra_exe_unit,
2136  device_type)) {
2137  continue;
2138  }
2139  all_frag_ids.push_back(inner_frag_idx);
2140  }
2141  return all_frag_ids;
2142 }
2143 
2144 // Returns true iff the join between two fragments cannot yield any results, per
2145 // shard information. The pair can be skipped to avoid full broadcast.
2147  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2148  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2149  const int table_idx,
2150  const std::unordered_map<int, const Analyzer::BinOper*>&
2151  inner_table_id_to_join_condition,
2152  const RelAlgExecutionUnit& ra_exe_unit,
2153  const ExecutorDeviceType device_type) {
2154  if (device_type != ExecutorDeviceType::GPU) {
2155  return false;
2156  }
2157  CHECK(table_idx >= 0 &&
2158  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2159  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2160  // Both tables need to be sharded the same way.
2161  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2162  outer_fragment_info.shard == inner_fragment_info.shard) {
2163  return false;
2164  }
2165  const Analyzer::BinOper* join_condition{nullptr};
2166  if (ra_exe_unit.join_quals.empty()) {
2167  CHECK(!inner_table_id_to_join_condition.empty());
2168  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2169  CHECK(condition_it != inner_table_id_to_join_condition.end());
2170  join_condition = condition_it->second;
2171  CHECK(join_condition);
2172  } else {
2173  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2174  plan_state_->join_info_.join_hash_tables_.size());
2175  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2176  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2177  table_idx) {
2178  CHECK(!join_condition);
2179  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2180  }
2181  }
2182  }
2183  if (!join_condition) {
2184  return false;
2185  }
2186  // TODO(adb): support fragment skipping based on the overlaps operator
2187  if (join_condition->is_overlaps_oper()) {
2188  return false;
2189  }
2190  size_t shard_count{0};
2191  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2192  join_condition->get_left_operand())) {
2193  auto inner_outer_pairs =
2194  normalize_column_pairs(join_condition, *getCatalog(), getTemporaryTables());
2196  join_condition, this, inner_outer_pairs);
2197  } else {
2198  shard_count = get_shard_count(join_condition, this);
2199  }
2200  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2201  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2202  }
2203  return shard_count;
2204 }
2205 
2206 namespace {
2207 
2210  const int table_id = col_desc->getScanDesc().getTableId();
2211  const int col_id = col_desc->getColId();
2212  return get_column_descriptor_maybe(col_id, table_id, cat);
2213 }
2214 
2215 } // namespace
2216 
2217 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2218  const std::vector<InputDescriptor>& input_descs,
2219  const std::map<int, const TableFragments*>& all_tables_fragments) {
2220  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2221  for (auto& desc : input_descs) {
2222  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2223  CHECK(fragments_it != all_tables_fragments.end());
2224  const auto& fragments = *fragments_it->second;
2225  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2226  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2227  frag_offsets[i] = off;
2228  off += fragments[i].getNumTuples();
2229  }
2230  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2231  }
2232  return tab_id_to_frag_offsets;
2233 }
2234 
2235 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2237  const RelAlgExecutionUnit& ra_exe_unit,
2238  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2239  const std::vector<InputDescriptor>& input_descs,
2240  const std::map<int, const TableFragments*>& all_tables_fragments) {
2241  std::vector<std::vector<int64_t>> all_num_rows;
2242  std::vector<std::vector<uint64_t>> all_frag_offsets;
2243  const auto tab_id_to_frag_offsets =
2244  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2245  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2246  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2247  std::vector<int64_t> num_rows;
2248  std::vector<uint64_t> frag_offsets;
2249  if (!ra_exe_unit.union_all) {
2250  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2251  }
2252  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2253  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2254  const auto fragments_it =
2255  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2256  CHECK(fragments_it != all_tables_fragments.end());
2257  const auto& fragments = *fragments_it->second;
2258  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2259  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2260  const auto& fragment = fragments[frag_id];
2261  num_rows.push_back(fragment.getNumTuples());
2262  } else {
2263  size_t total_row_count{0};
2264  for (const auto& fragment : fragments) {
2265  total_row_count += fragment.getNumTuples();
2266  }
2267  num_rows.push_back(total_row_count);
2268  }
2269  const auto frag_offsets_it =
2270  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2271  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2272  const auto& offsets = frag_offsets_it->second;
2273  CHECK_LT(frag_id, offsets.size());
2274  frag_offsets.push_back(offsets[frag_id]);
2275  }
2276  all_num_rows.push_back(num_rows);
2277  // Fragment offsets of outer table should be ONLY used by rowid for now.
2278  all_frag_offsets.push_back(frag_offsets);
2279  }
2280  return {all_num_rows, all_frag_offsets};
2281 }
2282 
2283 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2284 // all the table fragments.
2286  const RelAlgExecutionUnit& ra_exe_unit,
2287  const FragmentsList& selected_fragments) const {
2288  const auto& input_descs = ra_exe_unit.input_descs;
2289  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2290  if (nest_level < 1 ||
2291  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2292  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2293  (ra_exe_unit.join_quals.empty() &&
2294  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2295  return false;
2296  }
2297  const int table_id = inner_col_desc.getScanDesc().getTableId();
2298  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2299  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2300  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2301  return fragments.size() > 1;
2302 }
2303 
2304 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2305  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2306  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2307  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2308 }
2309 
2311  const ColumnFetcher& column_fetcher,
2312  const RelAlgExecutionUnit& ra_exe_unit,
2313  const int device_id,
2314  const Data_Namespace::MemoryLevel memory_level,
2315  const std::map<int, const TableFragments*>& all_tables_fragments,
2316  const FragmentsList& selected_fragments,
2318  std::list<ChunkIter>& chunk_iterators,
2319  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2320  DeviceAllocator* device_allocator) {
2321  auto timer = DEBUG_TIMER(__func__);
2322  INJECT_TIMER(fetchChunks);
2323  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2324  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2325  std::vector<size_t> local_col_to_frag_pos;
2326  buildSelectedFragsMapping(selected_fragments_crossjoin,
2327  local_col_to_frag_pos,
2328  col_global_ids,
2329  selected_fragments,
2330  ra_exe_unit);
2331 
2333  selected_fragments_crossjoin);
2334 
2335  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2336  std::vector<std::vector<int64_t>> all_num_rows;
2337  std::vector<std::vector<uint64_t>> all_frag_offsets;
2338 
2339  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2340  std::vector<const int8_t*> frag_col_buffers(
2341  plan_state_->global_to_local_col_ids_.size());
2342  for (const auto& col_id : col_global_ids) {
2343  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2345  interrupted_.load()) {
2346  resetInterrupt();
2347  throw QueryExecutionError(ERR_INTERRUPTED);
2348  }
2349  CHECK(col_id);
2350  const int table_id = col_id->getScanDesc().getTableId();
2351  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2352  if (cd && cd->isVirtualCol) {
2353  CHECK_EQ("rowid", cd->columnName);
2354  continue;
2355  }
2356  const auto fragments_it = all_tables_fragments.find(table_id);
2357  CHECK(fragments_it != all_tables_fragments.end());
2358  const auto fragments = fragments_it->second;
2359  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2360  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2361  CHECK_LT(static_cast<size_t>(it->second),
2362  plan_state_->global_to_local_col_ids_.size());
2363  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2364  if (!fragments->size()) {
2365  return {};
2366  }
2367  CHECK_LT(frag_id, fragments->size());
2368  auto memory_level_for_column = memory_level;
2369  if (plan_state_->columns_to_fetch_.find(
2370  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2371  plan_state_->columns_to_fetch_.end()) {
2372  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2373  }
2374  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2375  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2376  col_id.get(), memory_level_for_column, device_id, device_allocator);
2377  } else {
2378  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2379  frag_col_buffers[it->second] =
2380  column_fetcher.getAllTableColumnFragments(table_id,
2381  col_id->getColId(),
2382  all_tables_fragments,
2383  memory_level_for_column,
2384  device_id,
2385  device_allocator);
2386  } else {
2387  frag_col_buffers[it->second] =
2388  column_fetcher.getOneTableColumnFragment(table_id,
2389  frag_id,
2390  col_id->getColId(),
2391  all_tables_fragments,
2392  chunks,
2393  chunk_iterators,
2394  memory_level_for_column,
2395  device_id,
2396  device_allocator);
2397  }
2398  }
2399  }
2400  all_frag_col_buffers.push_back(frag_col_buffers);
2401  }
2402  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2403  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2404  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2405 }
2406 
2407 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2408 // This is written under the assumption that multiple inputs implies a UNION ALL.
2410  const ColumnFetcher& column_fetcher,
2411  const RelAlgExecutionUnit& ra_exe_unit,
2412  const int device_id,
2413  const Data_Namespace::MemoryLevel memory_level,
2414  const std::map<int, const TableFragments*>& all_tables_fragments,
2415  const FragmentsList& selected_fragments,
2417  std::list<ChunkIter>& chunk_iterators,
2418  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2419  DeviceAllocator* device_allocator) {
2420  auto timer = DEBUG_TIMER(__func__);
2421  INJECT_TIMER(fetchUnionChunks);
2422 
2423  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2424  std::vector<std::vector<int64_t>> all_num_rows;
2425  std::vector<std::vector<uint64_t>> all_frag_offsets;
2426 
2427  CHECK(!selected_fragments.empty());
2428  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2429  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2430  using TableId = int;
2431  TableId const selected_table_id = selected_fragments.front().table_id;
2432  bool const input_descs_index =
2433  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2434  if (!input_descs_index) {
2435  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2436  }
2437  bool const input_col_descs_index =
2438  selected_table_id ==
2439  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2440  if (!input_col_descs_index) {
2441  CHECK_EQ(selected_table_id,
2442  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2443  }
2444  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2445  << " selected_table_id=" << selected_table_id
2446  << " input_descs_index=" << int(input_descs_index)
2447  << " input_col_descs_index=" << int(input_col_descs_index)
2448  << " ra_exe_unit.input_descs="
2449  << shared::printContainer(ra_exe_unit.input_descs)
2450  << " ra_exe_unit.input_col_descs="
2451  << shared::printContainer(ra_exe_unit.input_col_descs);
2452 
2453  // Partition col_global_ids by table_id
2454  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2455  table_id_to_input_col_descs;
2456  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2457  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2458  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2459  }
2460  for (auto const& pair : table_id_to_input_col_descs) {
2461  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2462  std::vector<size_t> local_col_to_frag_pos;
2463 
2464  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2465  local_col_to_frag_pos,
2466  pair.second,
2467  selected_fragments,
2468  ra_exe_unit);
2469 
2471  selected_fragments_crossjoin);
2472 
2473  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2474  std::vector<const int8_t*> frag_col_buffers(
2475  plan_state_->global_to_local_col_ids_.size());
2476  for (const auto& col_id : pair.second) {
2477  CHECK(col_id);
2478  const int table_id = col_id->getScanDesc().getTableId();
2479  CHECK_EQ(table_id, pair.first);
2480  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2481  if (cd && cd->isVirtualCol) {
2482  CHECK_EQ("rowid", cd->columnName);
2483  continue;
2484  }
2485  const auto fragments_it = all_tables_fragments.find(table_id);
2486  CHECK(fragments_it != all_tables_fragments.end());
2487  const auto fragments = fragments_it->second;
2488  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2489  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2490  CHECK_LT(static_cast<size_t>(it->second),
2491  plan_state_->global_to_local_col_ids_.size());
2492  const size_t frag_id = ra_exe_unit.union_all
2493  ? 0
2494  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2495  if (!fragments->size()) {
2496  return {};
2497  }
2498  CHECK_LT(frag_id, fragments->size());
2499  auto memory_level_for_column = memory_level;
2500  if (plan_state_->columns_to_fetch_.find(
2501  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2502  plan_state_->columns_to_fetch_.end()) {
2503  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2504  }
2505  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2506  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2507  col_id.get(), memory_level_for_column, device_id, device_allocator);
2508  } else {
2509  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2510  frag_col_buffers[it->second] =
2511  column_fetcher.getAllTableColumnFragments(table_id,
2512  col_id->getColId(),
2513  all_tables_fragments,
2514  memory_level_for_column,
2515  device_id,
2516  device_allocator);
2517  } else {
2518  frag_col_buffers[it->second] =
2519  column_fetcher.getOneTableColumnFragment(table_id,
2520  frag_id,
2521  col_id->getColId(),
2522  all_tables_fragments,
2523  chunks,
2524  chunk_iterators,
2525  memory_level_for_column,
2526  device_id,
2527  device_allocator);
2528  }
2529  }
2530  }
2531  all_frag_col_buffers.push_back(frag_col_buffers);
2532  }
2533  std::vector<std::vector<int64_t>> num_rows;
2534  std::vector<std::vector<uint64_t>> frag_offsets;
2535  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2536  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2537  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2538  all_frag_offsets.insert(
2539  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2540  }
2541  // UNION ALL hacks.
2542  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2543  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2544  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2545  }
2546  if (input_descs_index == input_col_descs_index) {
2547  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2548  }
2549 
2550  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2551  << " all_num_rows=" << shared::printContainer(all_num_rows)
2552  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2553  << " input_col_descs_index=" << input_col_descs_index;
2554  return {{all_frag_col_buffers[input_descs_index]},
2555  {{all_num_rows[0][input_descs_index]}},
2556  {{all_frag_offsets[0][input_descs_index]}}};
2557 }
2558 
2559 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2560  const size_t scan_idx,
2561  const RelAlgExecutionUnit& ra_exe_unit) {
2562  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2563  scan_idx > 0 &&
2564  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2565  !selected_fragments[scan_idx].fragment_ids.empty()) {
2566  // Fetch all fragments
2567  return {size_t(0)};
2568  }
2569 
2570  return selected_fragments[scan_idx].fragment_ids;
2571 }
2572 
2574  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2575  std::vector<size_t>& local_col_to_frag_pos,
2576  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2577  const FragmentsList& selected_fragments,
2578  const RelAlgExecutionUnit& ra_exe_unit) {
2579  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2580  size_t frag_pos{0};
2581  const auto& input_descs = ra_exe_unit.input_descs;
2582  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2583  const int table_id = input_descs[scan_idx].getTableId();
2584  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2585  selected_fragments_crossjoin.push_back(
2586  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2587  for (const auto& col_id : col_global_ids) {
2588  CHECK(col_id);
2589  const auto& input_desc = col_id->getScanDesc();
2590  if (input_desc.getTableId() != table_id ||
2591  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2592  continue;
2593  }
2594  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2595  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2596  CHECK_LT(static_cast<size_t>(it->second),
2597  plan_state_->global_to_local_col_ids_.size());
2598  local_col_to_frag_pos[it->second] = frag_pos;
2599  }
2600  ++frag_pos;
2601  }
2602 }
2603 
2605  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2606  std::vector<size_t>& local_col_to_frag_pos,
2607  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2608  const FragmentsList& selected_fragments,
2609  const RelAlgExecutionUnit& ra_exe_unit) {
2610  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2611  size_t frag_pos{0};
2612  const auto& input_descs = ra_exe_unit.input_descs;
2613  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2614  const int table_id = input_descs[scan_idx].getTableId();
2615  // selected_fragments here is from assignFragsToKernelDispatch
2616  // execution_kernel.fragments
2617  if (selected_fragments[0].table_id != table_id) { // TODO 0
2618  continue;
2619  }
2620  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2621  selected_fragments_crossjoin.push_back(
2622  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2623  {size_t(1)}); // TODO
2624  for (const auto& col_id : col_global_ids) {
2625  CHECK(col_id);
2626  const auto& input_desc = col_id->getScanDesc();
2627  if (input_desc.getTableId() != table_id ||
2628  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2629  continue;
2630  }
2631  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2632  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2633  CHECK_LT(static_cast<size_t>(it->second),
2634  plan_state_->global_to_local_col_ids_.size());
2635  local_col_to_frag_pos[it->second] = frag_pos;
2636  }
2637  ++frag_pos;
2638  }
2639 }
2640 
2641 namespace {
2642 
2644  public:
2645  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2647  for (auto out : out_vec_) {
2648  delete[] out;
2649  }
2650  }
2651 
2652  private:
2653  std::vector<int64_t*> out_vec_;
2654 };
2655 } // namespace
2656 
2658  const RelAlgExecutionUnit& ra_exe_unit,
2659  const CompilationResult& compilation_result,
2660  const bool hoist_literals,
2661  ResultSetPtr& results,
2662  const std::vector<Analyzer::Expr*>& target_exprs,
2663  const ExecutorDeviceType device_type,
2664  std::vector<std::vector<const int8_t*>>& col_buffers,
2665  QueryExecutionContext* query_exe_context,
2666  const std::vector<std::vector<int64_t>>& num_rows,
2667  const std::vector<std::vector<uint64_t>>& frag_offsets,
2668  Data_Namespace::DataMgr* data_mgr,
2669  const int device_id,
2670  const uint32_t start_rowid,
2671  const uint32_t num_tables,
2672  RenderInfo* render_info) {
2673  INJECT_TIMER(executePlanWithoutGroupBy);
2674  auto timer = DEBUG_TIMER(__func__);
2675  CHECK(!results);
2676  if (col_buffers.empty()) {
2677  return 0;
2678  }
2679 
2680  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2681  if (render_info) {
2682  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2683  // here, we are in non-insitu mode.
2684  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2685  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2686  "currently unsupported.";
2687  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2688  }
2689 
2690  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2691  std::vector<int64_t*> out_vec;
2692  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2693  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2694  std::unique_ptr<OutVecOwner> output_memory_scope;
2696  interrupted_.load()) {
2697  resetInterrupt();
2698  throw QueryExecutionError(ERR_INTERRUPTED);
2699  }
2700  if (device_type == ExecutorDeviceType::CPU) {
2701  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2702  compilation_result.generated_code);
2703  CHECK(cpu_generated_code);
2704  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2705  cpu_generated_code.get(),
2706  hoist_literals,
2707  hoist_buf,
2708  col_buffers,
2709  num_rows,
2710  frag_offsets,
2711  0,
2712  &error_code,
2713  num_tables,
2714  join_hash_table_ptrs);
2715  output_memory_scope.reset(new OutVecOwner(out_vec));
2716  } else {
2717  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2718  compilation_result.generated_code);
2719  CHECK(gpu_generated_code);
2720  try {
2721  out_vec = query_exe_context->launchGpuCode(
2722  ra_exe_unit,
2723  gpu_generated_code.get(),
2724  hoist_literals,
2725  hoist_buf,
2726  col_buffers,
2727  num_rows,
2728  frag_offsets,
2729  0,
2730  data_mgr,
2731  blockSize(),
2732  gridSize(),
2733  device_id,
2734  compilation_result.gpu_smem_context.getSharedMemorySize(),
2735  &error_code,
2736  num_tables,
2737  join_hash_table_ptrs,
2738  render_allocator_map_ptr);
2739  output_memory_scope.reset(new OutVecOwner(out_vec));
2740  } catch (const OutOfMemory&) {
2741  return ERR_OUT_OF_GPU_MEM;
2742  } catch (const std::exception& e) {
2743  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2744  }
2745  }
2746  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2747  error_code == Executor::ERR_DIV_BY_ZERO ||
2748  error_code == Executor::ERR_OUT_OF_TIME ||
2749  error_code == Executor::ERR_INTERRUPTED ||
2751  error_code == Executor::ERR_GEOS) {
2752  return error_code;
2753  }
2754  if (ra_exe_unit.estimator) {
2755  CHECK(!error_code);
2756  results =
2757  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2758  return 0;
2759  }
2760  std::vector<int64_t> reduced_outs;
2761  const auto num_frags = col_buffers.size();
2762  const size_t entry_count =
2763  device_type == ExecutorDeviceType::GPU
2764  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2765  ? 1
2766  : blockSize() * gridSize() * num_frags)
2767  : num_frags;
2768  if (size_t(1) == entry_count) {
2769  for (auto out : out_vec) {
2770  CHECK(out);
2771  reduced_outs.push_back(*out);
2772  }
2773  } else {
2774  size_t out_vec_idx = 0;
2775 
2776  for (const auto target_expr : target_exprs) {
2777  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2778  CHECK(agg_info.is_agg);
2779 
2780  const int num_iterations = agg_info.sql_type.is_geometry()
2781  ? agg_info.sql_type.get_physical_coord_cols()
2782  : 1;
2783 
2784  for (int i = 0; i < num_iterations; i++) {
2785  int64_t val1;
2786  const bool float_argument_input = takes_float_argument(agg_info);
2787  if (is_distinct_target(agg_info)) {
2788  CHECK(agg_info.agg_kind == kCOUNT ||
2789  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2790  val1 = out_vec[out_vec_idx][0];
2791  error_code = 0;
2792  } else {
2793  const auto chosen_bytes = static_cast<size_t>(
2794  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2795  std::tie(val1, error_code) = Executor::reduceResults(
2796  agg_info.agg_kind,
2797  agg_info.sql_type,
2798  query_exe_context->getAggInitValForIndex(out_vec_idx),
2799  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2800  out_vec[out_vec_idx],
2801  entry_count,
2802  false,
2803  float_argument_input);
2804  }
2805  if (error_code) {
2806  break;
2807  }
2808  reduced_outs.push_back(val1);
2809  if (agg_info.agg_kind == kAVG ||
2810  (agg_info.agg_kind == kSAMPLE &&
2811  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2812  const auto chosen_bytes = static_cast<size_t>(
2813  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
2814  1));
2815  int64_t val2;
2816  std::tie(val2, error_code) = Executor::reduceResults(
2817  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2818  agg_info.sql_type,
2819  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2820  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2821  out_vec[out_vec_idx + 1],
2822  entry_count,
2823  false,
2824  false);
2825  if (error_code) {
2826  break;
2827  }
2828  reduced_outs.push_back(val2);
2829  ++out_vec_idx;
2830  }
2831  ++out_vec_idx;
2832  }
2833  }
2834  }
2835 
2836  if (error_code) {
2837  return error_code;
2838  }
2839 
2840  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2841  auto rows_ptr = std::shared_ptr<ResultSet>(
2842  query_exe_context->query_buffers_->result_sets_[0].release());
2843  rows_ptr->fillOneEntry(reduced_outs);
2844  results = std::move(rows_ptr);
2845  return error_code;
2846 }
2847 
2848 namespace {
2849 
2850 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
2851  CHECK(scan_limit);
2852  return results && results->rowCount() < scan_limit;
2853 }
2854 
2855 } // namespace
2856 
2858  const RelAlgExecutionUnit& ra_exe_unit,
2859  const CompilationResult& compilation_result,
2860  const bool hoist_literals,
2861  ResultSetPtr& results,
2862  const ExecutorDeviceType device_type,
2863  std::vector<std::vector<const int8_t*>>& col_buffers,
2864  const std::vector<size_t> outer_tab_frag_ids,
2865  QueryExecutionContext* query_exe_context,
2866  const std::vector<std::vector<int64_t>>& num_rows,
2867  const std::vector<std::vector<uint64_t>>& frag_offsets,
2868  Data_Namespace::DataMgr* data_mgr,
2869  const int device_id,
2870  const int outer_table_id,
2871  const int64_t scan_limit,
2872  const uint32_t start_rowid,
2873  const uint32_t num_tables,
2874  RenderInfo* render_info) {
2875  auto timer = DEBUG_TIMER(__func__);
2876  INJECT_TIMER(executePlanWithGroupBy);
2877  CHECK(!results);
2878  if (col_buffers.empty()) {
2879  return 0;
2880  }
2881  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2882  // TODO(alex):
2883  // 1. Optimize size (make keys more compact).
2884  // 2. Resize on overflow.
2885  // 3. Optimize runtime.
2886  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2887  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2888  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2890  interrupted_.load()) {
2891  return ERR_INTERRUPTED;
2892  }
2893 
2894  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2895  if (render_info && render_info->useCudaBuffers()) {
2896  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2897  }
2898 
2899  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
2900  << " ra_exe_unit.input_descs="
2901  << shared::printContainer(ra_exe_unit.input_descs)
2902  << " ra_exe_unit.input_col_descs="
2903  << shared::printContainer(ra_exe_unit.input_col_descs)
2904  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
2905  << " num_rows=" << shared::printContainer(num_rows)
2906  << " frag_offsets=" << shared::printContainer(frag_offsets)
2907  << " query_exe_context->query_buffers_->num_rows_="
2908  << query_exe_context->query_buffers_->num_rows_
2909  << " query_exe_context->query_mem_desc_.getEntryCount()="
2910  << query_exe_context->query_mem_desc_.getEntryCount()
2911  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
2912  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
2913  << " num_tables=" << num_tables;
2914 
2915  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
2916  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
2917  // with outer_table_id.
2918  if (ra_exe_unit_copy.union_all) {
2919  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
2920  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
2921  ra_exe_unit_copy.input_descs.end(),
2922  [outer_table_id](auto const& a, auto const& b) {
2923  return a.getTableId() == outer_table_id &&
2924  b.getTableId() != outer_table_id;
2925  });
2926  while (!ra_exe_unit_copy.input_descs.empty() &&
2927  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
2928  ra_exe_unit_copy.input_descs.pop_back();
2929  }
2930  // Filter ra_exe_unit_copy.input_col_descs.
2931  ra_exe_unit_copy.input_col_descs.remove_if(
2932  [outer_table_id](auto const& input_col_desc) {
2933  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2934  });
2935  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
2936  }
2937 
2938  if (device_type == ExecutorDeviceType::CPU) {
2939  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2940  compilation_result.generated_code);
2941  CHECK(cpu_generated_code);
2942  query_exe_context->launchCpuCode(
2943  ra_exe_unit_copy,
2944  cpu_generated_code.get(),
2945  hoist_literals,
2946  hoist_buf,
2947  col_buffers,
2948  num_rows,
2949  frag_offsets,
2950  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2951  &error_code,
2952  num_tables,
2953  join_hash_table_ptrs);
2954  } else {
2955  try {
2956  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2957  compilation_result.generated_code);
2958  CHECK(gpu_generated_code);
2959  query_exe_context->launchGpuCode(
2960  ra_exe_unit_copy,
2961  gpu_generated_code.get(),
2962  hoist_literals,
2963  hoist_buf,
2964  col_buffers,
2965  num_rows,
2966  frag_offsets,
2967  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2968  data_mgr,
2969  blockSize(),
2970  gridSize(),
2971  device_id,
2972  compilation_result.gpu_smem_context.getSharedMemorySize(),
2973  &error_code,
2974  num_tables,
2975  join_hash_table_ptrs,
2976  render_allocator_map_ptr);
2977  } catch (const OutOfMemory&) {
2978  return ERR_OUT_OF_GPU_MEM;
2979  } catch (const OutOfRenderMemory&) {
2980  return ERR_OUT_OF_RENDER_MEM;
2981  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2982  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
2983  } catch (const std::exception& e) {
2984  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2985  }
2986  }
2987 
2988  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2989  error_code == Executor::ERR_DIV_BY_ZERO ||
2990  error_code == Executor::ERR_OUT_OF_TIME ||
2991  error_code == Executor::ERR_INTERRUPTED ||
2993  error_code == Executor::ERR_GEOS) {
2994  return error_code;
2995  }
2996 
2997  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2998  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2999  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3000  query_exe_context->query_mem_desc_);
3001  CHECK(results);
3002  VLOG(2) << "results->rowCount()=" << results->rowCount();
3003  results->holdLiterals(hoist_buf);
3004  }
3005  if (error_code < 0 && render_allocator_map_ptr) {
3006  auto const adjusted_scan_limit =
3007  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3008  // More rows passed the filter than available slots. We don't have a count to check,
3009  // so assume we met the limit if a scan limit is set
3010  if (adjusted_scan_limit != 0) {
3011  return 0;
3012  } else {
3013  return error_code;
3014  }
3015  }
3016  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3017  return error_code; // unlucky, not enough results and we ran out of slots
3018  }
3019 
3020  return 0;
3021 }
3022 
3023 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3024  const int device_id) {
3025  std::vector<int64_t> table_ptrs;
3026  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3027  for (auto hash_table : join_hash_tables) {
3028  if (!hash_table) {
3029  CHECK(table_ptrs.empty());
3030  return {};
3031  }
3032  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3033  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3034  }
3035  return table_ptrs;
3036 }
3037 
3038 void Executor::nukeOldState(const bool allow_lazy_fetch,
3039  const std::vector<InputTableInfo>& query_infos,
3040  const RelAlgExecutionUnit* ra_exe_unit) {
3041  kernel_queue_time_ms_ = 0;
3042  compilation_queue_time_ms_ = 0;
3043  const bool contains_left_deep_outer_join =
3044  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3045  ra_exe_unit->join_quals.end(),
3046  [](const JoinCondition& join_condition) {
3047  return join_condition.type == JoinType::LEFT;
3048  }) != ra_exe_unit->join_quals.end();
3049  cgen_state_.reset(new CgenState(query_infos, contains_left_deep_outer_join));
3050  plan_state_.reset(
3051  new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join, this));
3052 }
3053 
3054 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3055  const std::vector<InputTableInfo>& query_infos) {
3056  const auto ld_count = input_descs.size();
3057  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3058  for (size_t i = 0; i < ld_count; ++i) {
3059  CHECK_LT(i, query_infos.size());
3060  const auto frag_count = query_infos[i].info.fragments.size();
3061  if (i > 0) {
3062  cgen_state_->frag_offsets_.push_back(nullptr);
3063  } else {
3064  if (frag_count > 1) {
3065  cgen_state_->frag_offsets_.push_back(
3066  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3067  } else {
3068  cgen_state_->frag_offsets_.push_back(nullptr);
3069  }
3070  }
3071  }
3072 }
3073 
3075  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3076  const std::vector<InputTableInfo>& query_infos,
3077  const MemoryLevel memory_level,
3078  const JoinHashTableInterface::HashType preferred_hash_type,
3079  ColumnCacheMap& column_cache) {
3080  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3081  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3082  }
3083  // check whether the interrupt flag turns on (non kernel-time query interrupt)
3085  interrupted_.load()) {
3086  resetInterrupt();
3087  throw QueryExecutionError(ERR_INTERRUPTED);
3088  }
3089  try {
3090  auto tbl =
3092  query_infos,
3093  memory_level,
3094  preferred_hash_type,
3095  deviceCountForMemoryLevel(memory_level),
3096  column_cache,
3097  this);
3098  return {tbl, ""};
3099  } catch (const HashJoinFail& e) {
3100  return {nullptr, e.what()};
3101  }
3102 }
3103 
3104 int8_t Executor::warpSize() const {
3105  CHECK(catalog_);
3106  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3107  CHECK(cuda_mgr);
3108  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3109  CHECK(!dev_props.empty());
3110  return dev_props.front().warpSize;
3111 }
3112 
3113 unsigned Executor::gridSize() const {
3114  CHECK(catalog_);
3115  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3116  CHECK(cuda_mgr);
3117  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3118 }
3119 
3120 unsigned Executor::numBlocksPerMP() const {
3121  CHECK(catalog_);
3122  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3123  CHECK(cuda_mgr);
3124  return grid_size_x_ ? std::ceil(grid_size_x_ / cuda_mgr->getMinNumMPsForAllDevices())
3125  : 2;
3126 }
3127 
3128 unsigned Executor::blockSize() const {
3129  CHECK(catalog_);
3130  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3131  CHECK(cuda_mgr);
3132  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3133  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3134 }
3135 
3137  return max_gpu_slab_size_;
3138 }
3139 
3140 int64_t Executor::deviceCycles(int milliseconds) const {
3141  CHECK(catalog_);
3142  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3143  CHECK(cuda_mgr);
3144  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3145  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3146 }
3147 
3148 llvm::Value* Executor::castToFP(llvm::Value* val) {
3149  if (!val->getType()->isIntegerTy()) {
3150  return val;
3151  }
3152 
3153  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
3154  llvm::Type* dest_ty{nullptr};
3155  switch (val_width) {
3156  case 32:
3157  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
3158  break;
3159  case 64:
3160  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
3161  break;
3162  default:
3163  LOG(FATAL) << "Unsupported FP width: " << std::to_string(val_width);
3164  }
3165  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3166 }
3167 
3168 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3169  CHECK(val->getType()->isPointerTy());
3170 
3171  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3172  const auto val_type = val_ptr_type->getElementType();
3173  size_t val_width = 0;
3174  if (val_type->isIntegerTy()) {
3175  val_width = val_type->getIntegerBitWidth();
3176  } else {
3177  if (val_type->isFloatTy()) {
3178  val_width = 32;
3179  } else {
3180  CHECK(val_type->isDoubleTy());
3181  val_width = 64;
3182  }
3183  }
3184  CHECK_LT(size_t(0), val_width);
3185  if (bitWidth == val_width) {
3186  return val;
3187  }
3188  return cgen_state_->ir_builder_.CreateBitCast(
3189  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3190 }
3191 
3192 #define EXECUTE_INCLUDE
3193 #include "ArrayOps.cpp"
3194 #include "DateAdd.cpp"
3195 #include "StringFunctions.cpp"
3196 #undef EXECUTE_INCLUDE
3197 
3199  const CompilationOptions& co) {
3200  if (!co.filter_on_deleted_column) {
3201  return ra_exe_unit;
3202  }
3203  auto ra_exe_unit_with_deleted = ra_exe_unit;
3204  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3205  if (input_table.getSourceType() != InputSourceType::TABLE) {
3206  continue;
3207  }
3208  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3209  CHECK(td);
3210  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3211  if (!deleted_cd) {
3212  continue;
3213  }
3214  CHECK(deleted_cd->columnType.is_boolean());
3215  // check deleted column is not already present
3216  bool found = false;
3217  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3218  if (input_col.get()->getColId() == deleted_cd->columnId &&
3219  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3220  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3221  found = true;
3222  }
3223  }
3224  if (!found) {
3225  // add deleted column
3226  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3227  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3228  }
3229  }
3230  return ra_exe_unit_with_deleted;
3231 }
3232 
3233 namespace {
3234 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3235 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3236 // return type.
3237 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3238  const int64_t chunk_min,
3239  const int64_t chunk_max,
3240  const SQLTypeInfo& lhs_type,
3241  const SQLTypeInfo& rhs_type) {
3242  const int32_t ldim = lhs_type.get_dimension();
3243  const int32_t rdim = rhs_type.get_dimension();
3244  CHECK(ldim != rdim);
3245  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3246  if (ldim > rdim) {
3247  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3248  return {true, chunk_min / scale, chunk_max / scale};
3249  }
3250 
3251  int64_t upscaled_chunk_min;
3252  int64_t upscaled_chunk_max;
3253 
3254  if (__builtin_mul_overflow(chunk_min, scale, &upscaled_chunk_min) ||
3255  __builtin_mul_overflow(chunk_max, scale, &upscaled_chunk_max)) {
3256  return std::make_tuple(false, chunk_min, chunk_max);
3257  }
3258 
3259  return std::make_tuple(true, upscaled_chunk_min, upscaled_chunk_max);
3260 }
3261 
3262 } // namespace
3263 
3264 std::pair<bool, int64_t> Executor::skipFragment(
3265  const InputDescriptor& table_desc,
3266  const Fragmenter_Namespace::FragmentInfo& fragment,
3267  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3268  const std::vector<uint64_t>& frag_offsets,
3269  const size_t frag_idx) {
3270  const int table_id = table_desc.getTableId();
3271  for (const auto& simple_qual : simple_quals) {
3272  const auto comp_expr =
3273  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3274  if (!comp_expr) {
3275  // is this possible?
3276  return {false, -1};
3277  }
3278  const auto lhs = comp_expr->get_left_operand();
3279  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3280  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3281  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3282  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3283  if (lhs_uexpr) {
3284  CHECK(lhs_uexpr->get_optype() ==
3285  kCAST); // We should have only been passed a cast expression
3286  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3287  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3288  continue;
3289  }
3290  } else {
3291  continue;
3292  }
3293  }
3294  const auto rhs = comp_expr->get_right_operand();
3295  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3296  if (!rhs_const) {
3297  // is this possible?
3298  return {false, -1};
3299  }
3300  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3301  continue;
3302  }
3303  const int col_id = lhs_col->get_column_id();
3304  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3305  int64_t chunk_min{0};
3306  int64_t chunk_max{0};
3307  bool is_rowid{false};
3308  size_t start_rowid{0};
3309  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3310  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3311  if (cd->isVirtualCol) {
3312  CHECK(cd->columnName == "rowid");
3313  const auto& table_generation = getTableGeneration(table_id);
3314  start_rowid = table_generation.start_rowid;
3315  chunk_min = frag_offsets[frag_idx] + start_rowid;
3316  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3317  is_rowid = true;
3318  }
3319  } else {
3320  const auto& chunk_type = lhs_col->get_type_info();
3321  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3322  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3323  }
3324  if (lhs->get_type_info().is_timestamp() &&
3325  (lhs_col->get_type_info().get_dimension() !=
3326  rhs_const->get_type_info().get_dimension()) &&
3327  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3328  rhs_const->get_type_info().is_high_precision_timestamp())) {
3329  // If original timestamp lhs col has different precision,
3330  // column metadata holds value in original precision
3331  // therefore adjust rhs value to match lhs precision
3332 
3333  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3334  // artificially limit the lhs column range. RHS overflow/underflow is already
3335  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3336  bool is_valid;
3337  std::tie(is_valid, chunk_min, chunk_max) =
3339  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3340  if (!is_valid) {
3341  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3342  "value: "
3343  << std::to_string(chunk_min)
3344  << "\nChunk max value: " << std::to_string(chunk_max)
3345  << "\nLHS col precision is: "
3346  << std::to_string(lhs_col->get_type_info().get_dimension())
3347  << "\nRHS precision is: "
3348  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3349  return {false, -1};
3350  }
3351  }
3352  CodeGenerator code_generator(this);
3353  const auto rhs_val = code_generator.codegenIntConst(rhs_const)->getSExtValue();
3354 
3355  switch (comp_expr->get_optype()) {
3356  case kGE:
3357  if (chunk_max < rhs_val) {
3358  return {true, -1};
3359  }
3360  break;
3361  case kGT:
3362  if (chunk_max <= rhs_val) {
3363  return {true, -1};
3364  }
3365  break;
3366  case kLE:
3367  if (chunk_min > rhs_val) {
3368  return {true, -1};
3369  }
3370  break;
3371  case kLT:
3372  if (chunk_min >= rhs_val) {
3373  return {true, -1};
3374  }
3375  break;
3376  case kEQ:
3377  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3378  return {true, -1};
3379  } else if (is_rowid) {
3380  return {false, rhs_val - start_rowid};
3381  }
3382  break;
3383  default:
3384  break;
3385  }
3386  }
3387  return {false, -1};
3388 }
3389 
3390 /*
3391  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3392  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3393  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3394  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3395  * if at least one of these skipFragment calls return a true statment in its first value.
3396  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3397  * second value only if its first value is "false".
3398  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3399  * i.e., we only skip if none of the quals trigger the code to update the
3400  * rowid_lookup_key
3401  * - Only AND operations are valid and considered:
3402  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3403  * the skip
3404  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3405  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3406  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3407  * implemented differently, which cause the first one to skip correctly, but the second
3408  * one will not skip.
3409  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3410  * possible
3411  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3412  * projection, no skipping
3413  */
3414 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3415  const InputDescriptor& table_desc,
3416  const RelAlgExecutionUnit& ra_exe_unit,
3417  const Fragmenter_Namespace::FragmentInfo& fragment,
3418  const std::vector<uint64_t>& frag_offsets,
3419  const size_t frag_idx) {
3420  std::pair<bool, int64_t> skip_frag{false, -1};
3421  for (auto& inner_join : ra_exe_unit.join_quals) {
3422  if (inner_join.type != JoinType::INNER) {
3423  continue;
3424  }
3425 
3426  // extracting all the conjunctive simple_quals from the quals stored for the inner
3427  // join
3428  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3429  for (auto& qual : inner_join.quals) {
3430  auto temp_qual = qual_to_conjunctive_form(qual);
3431  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3432  temp_qual.simple_quals.begin(),
3433  temp_qual.simple_quals.end());
3434  }
3435  auto temp_skip_frag = skipFragment(
3436  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3437  if (temp_skip_frag.second != -1) {
3438  skip_frag.second = temp_skip_frag.second;
3439  return skip_frag;
3440  } else {
3441  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3442  }
3443  }
3444  return skip_frag;
3445 }
3446 
3448  const std::unordered_set<PhysicalInput>& phys_inputs) {
3449  AggregatedColRange agg_col_range_cache;
3450  CHECK(catalog_);
3451  std::unordered_set<int> phys_table_ids;
3452  for (const auto& phys_input : phys_inputs) {
3453  phys_table_ids.insert(phys_input.table_id);
3454  }
3455  std::vector<InputTableInfo> query_infos;
3456  for (const int table_id : phys_table_ids) {
3457  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3458  }
3459  for (const auto& phys_input : phys_inputs) {
3460  const auto cd =
3461  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3462  CHECK(cd);
3463  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3464  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3465  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3466  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3467  agg_col_range_cache.setColRange(phys_input, col_range);
3468  }
3469  }
3470  return agg_col_range_cache;
3471 }
3472 
3474  const std::unordered_set<PhysicalInput>& phys_inputs) {
3475  StringDictionaryGenerations string_dictionary_generations;
3476  CHECK(catalog_);
3477  for (const auto& phys_input : phys_inputs) {
3478  const auto cd =
3479  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3480  CHECK(cd);
3481  const auto& col_ti =
3482  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3483  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3484  const int dict_id = col_ti.get_comp_param();
3485  const auto dd = catalog_->getMetadataForDict(dict_id);
3486  CHECK(dd && dd->stringDict);
3487  string_dictionary_generations.setGeneration(dict_id,
3488  dd->stringDict->storageEntryCount());
3489  }
3490  }
3491  return string_dictionary_generations;
3492 }
3493 
3495  std::unordered_set<int> phys_table_ids) {
3496  TableGenerations table_generations;
3497  for (const int table_id : phys_table_ids) {
3498  const auto table_info = getTableInfo(table_id);
3499  table_generations.setGeneration(
3500  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
3501  }
3502  return table_generations;
3503 }
3504 
3505 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3506  const std::unordered_set<int>& phys_table_ids) {
3507  CHECK(catalog_);
3508  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
3509  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3510  string_dictionary_generations_ = computeStringDictionaryGenerations(phys_inputs);
3511  table_generations_ = computeTableGenerations(phys_table_ids);
3512 }
3513 
3515  return executor_session_mutex_;
3516 }
3517 
3518 template <>
3519 void Executor::setCurrentQuerySession(const std::string& query_session,
3520  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3521  if (current_query_session_ == "") {
3522  // only set the query session if it currently has an invalid session
3523  current_query_session_ = query_session;
3524  }
3525 }
3526 
3527 template <>
3529  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3530  return current_query_session_;
3531 }
3532 
3533 template <>
3534 bool Executor::checkCurrentQuerySession(const std::string& candidate_query_session,
3535  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3536  // if current_query_session is equal to the candidate_query_session,
3537  // or it is empty session we consider
3538  return (current_query_session_ == candidate_query_session);
3539 }
3540 
3541 template <>
3542 void Executor::invalidateQuerySession(mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3543  current_query_session_ = "";
3544 }
3545 
3546 template <>
3547 bool Executor::addToQuerySessionList(const std::string& query_session,
3548  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3549  return queries_interrupt_flag_.emplace(query_session, false).second;
3550 }
3551 
3552 template <>
3554  const std::string& query_session,
3555  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3556  return queries_interrupt_flag_.erase(query_session);
3557 }
3558 
3559 template <>
3561  const std::string& query_session,
3562  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3563  queries_interrupt_flag_[query_session] = true;
3564 }
3565 
3566 template <>
3568  const std::string& query_session,
3569  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3570  auto flag_it = queries_interrupt_flag_.find(query_session);
3571  return flag_it != queries_interrupt_flag_.end() && flag_it->second;
3572 }
3573 
3574 void Executor::enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const {
3575  // The only one scenario that we intentionally call this function is
3576  // to allow runtime query interrupt in QueryRunner for test cases.
3577  // Because test machine's default setting does not allow runtime query interrupt,
3578  // so we have to turn it on within test code if necessary.
3580  g_runtime_query_interrupt_frequency = interrupt_freq;
3581 }
3582 
3583 void Executor::addToCardinalityCache(const std::string& cache_key,
3584  const size_t cache_value) {
3586  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
3587  cardinality_cache_[cache_key] = cache_value;
3588  VLOG(1) << "Put estimated cardinality to the cache";
3589  }
3590 }
3591 
3593  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
3595  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
3596  VLOG(1) << "Reuse cached cardinality";
3597  return {true, cardinality_cache_[cache_key]};
3598  }
3599  return {false, -1};
3600 }
3601 
3602 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
3603 
3604 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
3605 std::string Executor::current_query_session_{""};
3608 
3611 
3614 void* Executor::gpu_active_modules_[max_gpu_count];
3615 std::atomic<bool> Executor::interrupted_{false};
3616 
3617 std::mutex Executor::compilation_mutex_;
3618 std::mutex Executor::kernel_mutex_;
3619 
3621 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
int get_table_id() const
Definition: Analyzer.h:195
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:916
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3447
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1944
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:87
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2208
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:926
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2409
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:107
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3136
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:96
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
ExecutorDeviceType getDeviceType() const
llvm::Value * castToFP(llvm::Value *val)
Definition: Execute.cpp:3148
double g_bump_allocator_step_reduction
Definition: Execute.cpp:105
std::string cat(Ts &&...args)
block_size_x_(block_size_x)
const int8_t const int64_t * num_rows
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1190
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:146
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1115
grid_size_x_(grid_size_x)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool g_strip_join_covered_quals
Definition: Execute.cpp:95
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:255
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2104
bool g_enable_direct_columnarization
Definition: Execute.cpp:106
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3237
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1049
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1598
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:995
static const int max_gpu_count
Definition: Execute.h:873
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2645
const std::optional< bool > union_all
int64_t float_to_double_bin(int32_t val, bool nullable=false)
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:78
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const InputTableInfo &table_info, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
std::string & getCurrentQuerySession(SESSION_MAP_LOCK &read_lock)
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:922
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:419
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:927
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:305
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:370
Definition: sqldefs.h:36
std::string join(T const &container, std::string const &delim)
std::string join_type_to_string(const JoinType type)
Definition: Execute.cpp:1162
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3494
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:241
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:210
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:632
Definition: sqldefs.h:49
Definition: sqldefs.h:30
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:89
static const int32_t ERR_GEOS
Definition: Execute.h:949
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:148
void setQuerySessionAsInterrupted(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:878
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2310
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1664
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1019
bool addToQuerySessionList(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:133
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:80
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:877
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2604
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:615
bool g_is_test_env
Definition: Execute.cpp:122
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3168
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:275
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:952
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3120
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:420
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:225
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1849
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:421
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2285
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:347
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3514
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:91
gpu_code_cache_(code_cache_size)
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:170
int8_t get_min_byte_width()
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:90
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:82
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:263
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
cpu_code_cache_(code_cache_size)
static const size_t high_scan_limit
Definition: Execute.h:374
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:119
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:828
void run(Executor *executor, SharedKernelContext &shared_context)
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2857
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3023
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3054
bool g_null_div_by_zero
Definition: Execute.cpp:79
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:189
int8_t warpSize() const
Definition: Execute.cpp:3104
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:968
bool g_enable_columnar_output
Definition: Execute.cpp:90
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1755
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:236
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:935
static SysCatalog & instance()
Definition: SysCatalog.h:288
max_gpu_slab_size_(max_gpu_slab_size)
const bool allow_multifrag
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
bool g_from_table_reordering
Definition: Execute.cpp:81
CHECK(cgen_state)
const bool just_validate
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:95
Classes representing a parse tree.
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:92
std::map< std::string, bool > InterruptFlagMap
Definition: Execute.h:77
int getDeviceCount() const
Definition: CudaMgr.h:95
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1007
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:149
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3140
const SortInfo sort_info
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:276
bool is_integer() const
Definition: sqltypes.h:417
#define INJECT_TIMER(DESC)
Definition: measure.h:91
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:378
#define CHECK_NE(x, y)
Definition: Logger.h:206
const bool with_dynamic_watchdog
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:868
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1140
std::shared_timed_mutex mapd_shared_mutex
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
const double gpu_input_mem_limit_percent
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1870
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:3592
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:251
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2217
ExecutorExplainType explain_type
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1649
size_t g_big_group_threshold
Definition: Execute.cpp:97
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:941
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:932
float g_filter_push_down_high_frac
Definition: Execute.cpp:88
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:919
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:942
std::string bool_to_string(const bool val)
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:267
void setGeneration(const uint32_t id, const size_t generation)
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:835
size_t g_max_memory_allocation_size
Definition: Execute.cpp:100
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:129
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1069
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:230
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3038
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3198
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:422
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:94
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:948
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:921
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1291
QueryDescriptionType getQueryDescriptionType() const
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:247
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2657
executor_id_(executor_id)
bool g_enable_window_functions
Definition: Execute.cpp:98
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1175
InputSourceType getSourceType() const
const std::vector< size_t > outer_fragment_indices
size_t g_min_memory_allocation_size
Definition: Execute.cpp:101
static std::shared_ptr< JoinHashTableInterface > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:842
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:199
static std::string current_query_session_
Definition: Execute.h:917
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2559
void enableRuntimeQueryInterrupt(const unsigned interrupt_freq) const
Definition: Execute.cpp:3574
size_t ExecutorId
Definition: Execute.h:302
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:353
void invalidateQuerySession(SESSION_MAP_LOCK &write_lock)
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1310
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:116
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3264
bool g_enable_experimental_string_functions
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3113
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:260
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2236
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t frag_idx
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1799
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3473
std::unique_ptr< llvm::Module > udf_cpu_module
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:506
bool g_cache_string_hash
Definition: Execute.cpp:93
bool removeFromQuerySessionList(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
bool checkIsQuerySessionInterrupted(const std::string &query_session, SESSION_MAP_LOCK &read_lock)
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2850
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3414
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2573
llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant)
Definition: ConstantIR.cpp:84
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1718
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:454
ThreadId thread_id()
Definition: Logger.cpp:715
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1061
bool g_enable_filter_push_down
Definition: Execute.cpp:86
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:109
bool g_enable_bump_allocator
Definition: Execute.cpp:104
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:910
void set_notnull(bool n)
Definition: sqltypes.h:355
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:3583
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static std::atomic< bool > interrupted_
Definition: Execute.h:879
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
const int8_t * literals
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:951
bool g_allow_cpu_retry
Definition: Execute.cpp:78
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
Definition: sqldefs.h:33
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
const Expr * get_left_operand() const
Definition: Analyzer.h:443
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1373
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:1918
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1628
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:895
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:129
bool checkCurrentQuerySession(const std::string &candidate_query_session, SESSION_MAP_LOCK &read_lock)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:271
mapd_unique_lock< mapd_shared_mutex > write_lock
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:63
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:196
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:415
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1527
unsigned blockSize() const
Definition: Execute.cpp:3128
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:265
const bool allow_runtime_query_interrupt
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:77
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:25
bool g_use_tbb_pool
Definition: Execute.cpp:76
bool is_decimal() const
Definition: sqltypes.h:418
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:625
temporary_tables_(nullptr)
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:195
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1933
int getNestLevel() const
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:3074
const InputDescriptor & getScanDesc() const
#define IS_GEO(T)
Definition: sqltypes.h:173
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:931
StringDictionaryProxy * getStringDictionaryProxy(const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:199
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const TemporaryTables * getTemporaryTables() const
Definition: Execute.cpp:259
const Executor * getExecutor() const
const bool with_watchdog
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:876
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3505
void clearMetaInfoCache()
Definition: Execute.cpp:346
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:120
bool g_enable_table_functions
Definition: Execute.cpp:99
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
void setCurrentQuerySession(const std::string &query_session, SESSION_MAP_LOCK &write_lock)
size_t g_gpu_smem_threshold
Definition: Execute.cpp:111
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2146
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1624
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)