OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <numeric>
32 #include <set>
33 #include <thread>
34 
35 #include "Catalog/Catalog.h"
36 #include "CudaMgr/CudaMgr.h"
38 #include "Parser/ParserNode.h"
68 #include "Shared/checked_alloc.h"
69 #include "Shared/measure.h"
70 #include "Shared/misc.h"
71 #include "Shared/scope.h"
72 #include "Shared/shard_key.h"
73 #include "Shared/threadpool.h"
74 
75 bool g_enable_watchdog{false};
77 bool g_use_tbb_pool{false};
80 bool g_allow_cpu_retry{true};
81 bool g_null_div_by_zero{false};
85 extern bool g_enable_smem_group_by;
86 extern std::unique_ptr<llvm::Module> udf_gpu_module;
87 extern std::unique_ptr<llvm::Module> udf_cpu_module;
97 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
105 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
107  256}; // minimum memory allocation required for projection query output buffer
108  // without pre-flight count
120  4096}; // GPU shared memory threshold (in bytes), if larger
121  // buffer sizes are required we do not use GPU shared
122  // memory optimizations Setting this to 0 means unlimited
123  // (subject to other dynamically calculated caps)
125  true}; // enable use of shared memory when performing group-by with select non-count
126  // aggregates
128  true}; // enable optimizations for using GPU shared memory in implementation of
129  // non-grouped aggregates
130 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
131  // limits the allocation for the output buffer arena
133  10000}; // # rows that we are trying to linearize varlen col in parallel
134 
137 
139 
140 extern bool g_cache_string_hash;
141 
142 int const Executor::max_gpu_count;
143 
145 
146 Executor::Executor(const ExecutorId executor_id,
147  Data_Namespace::DataMgr* data_mgr,
148  const size_t block_size_x,
149  const size_t grid_size_x,
150  const size_t max_gpu_slab_size,
151  const std::string& debug_dir,
152  const std::string& debug_file)
153  : cgen_state_(new CgenState({}, false))
154  , cpu_code_cache_(code_cache_size)
155  , gpu_code_cache_(code_cache_size)
156  , block_size_x_(block_size_x)
157  , grid_size_x_(grid_size_x)
158  , max_gpu_slab_size_(max_gpu_slab_size)
159  , debug_dir_(debug_dir)
160  , debug_file_(debug_file)
161  , executor_id_(executor_id)
162  , catalog_(nullptr)
163  , data_mgr_(data_mgr)
164  , temporary_tables_(nullptr)
166 
167 std::shared_ptr<Executor> Executor::getExecutor(
168  const ExecutorId executor_id,
169  const std::string& debug_dir,
170  const std::string& debug_file,
171  const SystemParameters& system_parameters) {
172  INJECT_TIMER(getExecutor);
173 
174  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
175  auto it = executors_.find(executor_id);
176  if (it != executors_.end()) {
177  return it->second;
178  }
179 
181  auto executor = std::make_shared<Executor>(executor_id,
182  &data_mgr,
183  system_parameters.cuda_block_size,
184  system_parameters.cuda_grid_size,
185  system_parameters.max_gpu_slab_size,
186  debug_dir,
187  debug_file);
188  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
189  return executor;
190 }
191 
193  switch (memory_level) {
196  mapd_unique_lock<mapd_shared_mutex> flush_lock(
197  execute_mutex_); // Don't flush memory while queries are running
198 
200  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
201  // The hash table cache uses CPU memory not managed by the buffer manager. In the
202  // future, we should manage these allocations with the buffer manager directly.
203  // For now, assume the user wants to purge the hash table cache when they clear
204  // CPU memory (currently used in ExecuteTest to lower memory pressure)
206  }
207  break;
208  }
209  default: {
210  throw std::runtime_error(
211  "Clearing memory levels other than the CPU level or GPU level is not "
212  "supported.");
213  }
214  }
215 }
216 
218  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
219 }
220 
222  const int dict_id_in,
223  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
224  const bool with_generation) const {
225  CHECK(row_set_mem_owner);
226  std::lock_guard<std::mutex> lock(
227  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
228  return row_set_mem_owner->getOrAddStringDictProxy(
229  dict_id_in, with_generation, catalog_);
230 }
231 
233  const int dict_id_in,
234  const bool with_generation,
235  const Catalog_Namespace::Catalog* catalog) {
236  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
237  CHECK(catalog);
238  const auto dd = catalog->getMetadataForDict(dict_id);
239  if (dd) {
240  CHECK(dd->stringDict);
241  CHECK_LE(dd->dictNBits, 32);
242  const int64_t generation =
243  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
244  return addStringDict(dd->stringDict, dict_id, generation);
245  }
246  CHECK_EQ(0, dict_id);
247  if (!lit_str_dict_proxy_) {
248  std::shared_ptr<StringDictionary> tsd =
249  std::make_shared<StringDictionary>("", false, true, g_cache_string_hash);
250  lit_str_dict_proxy_.reset(new StringDictionaryProxy(tsd, 0));
251  }
252  return lit_str_dict_proxy_.get();
253 }
254 
256  std::lock_guard<std::mutex> lock(state_mutex_);
257  return t_digests_
258  .emplace_back(std::make_unique<quantile::TDigest>(
260  .get();
261 }
262 
263 bool Executor::isCPUOnly() const {
264  CHECK(data_mgr_);
265  return !data_mgr_->getCudaMgr();
266 }
267 
269  const Analyzer::ColumnVar* col_var) const {
271  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
272 }
273 
275  const Analyzer::ColumnVar* col_var,
276  int n) const {
277  const auto cd = getColumnDescriptor(col_var);
278  if (!cd || n > cd->columnType.get_physical_cols()) {
279  return nullptr;
280  }
282  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
283 }
284 
286  return catalog_;
287 }
288 
290  catalog_ = catalog;
291 }
292 
293 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
294  return row_set_mem_owner_;
295 }
296 
298  return temporary_tables_;
299 }
300 
302  return input_table_info_cache_.getTableInfo(table_id);
303 }
304 
305 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
306  return table_generations_.getGeneration(table_id);
307 }
308 
310  return agg_col_range_cache_.getColRange(phys_input);
311 }
312 
313 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
314  size_t num_bytes = 0;
315  if (!plan_state_) {
316  return 0;
317  }
318  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
319  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
320  continue;
321  }
322 
323  if (fetched_col_pair.first < 0) {
324  num_bytes += 8;
325  } else {
326  const auto cd =
327  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
328  const auto& ti = cd->columnType;
329  const auto sz = ti.get_type() == kTEXT && ti.get_compression() == kENCODING_DICT
330  ? 4
331  : ti.get_size();
332  if (sz < 0) {
333  // for varlen types, only account for the pointer/size for each row, for now
334  num_bytes += 16;
335  } else {
336  num_bytes += sz;
337  }
338  }
339  }
340  return num_bytes;
341 }
342 
343 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
344  const std::vector<Analyzer::Expr*>& target_exprs) const {
345  CHECK(plan_state_);
346  CHECK(catalog_);
347  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
348  for (const auto target_expr : target_exprs) {
349  if (!plan_state_->isLazyFetchColumn(target_expr)) {
350  col_lazy_fetch_info.emplace_back(
351  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
352  } else {
353  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
354  CHECK(col_var);
355  auto col_id = col_var->get_column_id();
356  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
357  auto cd = (col_var->get_table_id() > 0)
358  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
359  : nullptr;
360  if (cd && IS_GEO(cd->columnType.get_type())) {
361  // Geo coords cols will be processed in sequence. So we only need to track the
362  // first coords col in lazy fetch info.
363  {
364  auto cd0 =
365  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
366  auto col0_ti = cd0->columnType;
367  CHECK(!cd0->isVirtualCol);
368  auto col0_var = makeExpr<Analyzer::ColumnVar>(
369  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
370  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
371  col_lazy_fetch_info.emplace_back(
372  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
373  }
374  } else {
375  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
376  const auto& col_ti = col_var->get_type_info();
377  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
378  }
379  }
380  }
381  return col_lazy_fetch_info;
382 }
383 
385  input_table_info_cache_.clear();
386  agg_col_range_cache_.clear();
387  table_generations_.clear();
388 }
389 
390 std::vector<int8_t> Executor::serializeLiterals(
391  const std::unordered_map<int, CgenState::LiteralValues>& literals,
392  const int device_id) {
393  if (literals.empty()) {
394  return {};
395  }
396  const auto dev_literals_it = literals.find(device_id);
397  CHECK(dev_literals_it != literals.end());
398  const auto& dev_literals = dev_literals_it->second;
399  size_t lit_buf_size{0};
400  std::vector<std::string> real_strings;
401  std::vector<std::vector<double>> double_array_literals;
402  std::vector<std::vector<int8_t>> align64_int8_array_literals;
403  std::vector<std::vector<int32_t>> int32_array_literals;
404  std::vector<std::vector<int8_t>> align32_int8_array_literals;
405  std::vector<std::vector<int8_t>> int8_array_literals;
406  for (const auto& lit : dev_literals) {
407  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
408  if (lit.which() == 7) {
409  const auto p = boost::get<std::string>(&lit);
410  CHECK(p);
411  real_strings.push_back(*p);
412  } else if (lit.which() == 8) {
413  const auto p = boost::get<std::vector<double>>(&lit);
414  CHECK(p);
415  double_array_literals.push_back(*p);
416  } else if (lit.which() == 9) {
417  const auto p = boost::get<std::vector<int32_t>>(&lit);
418  CHECK(p);
419  int32_array_literals.push_back(*p);
420  } else if (lit.which() == 10) {
421  const auto p = boost::get<std::vector<int8_t>>(&lit);
422  CHECK(p);
423  int8_array_literals.push_back(*p);
424  } else if (lit.which() == 11) {
425  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
426  CHECK(p);
427  if (p->second == 64) {
428  align64_int8_array_literals.push_back(p->first);
429  } else if (p->second == 32) {
430  align32_int8_array_literals.push_back(p->first);
431  } else {
432  CHECK(false);
433  }
434  }
435  }
436  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
437  throw TooManyLiterals();
438  }
439  int16_t crt_real_str_off = lit_buf_size;
440  for (const auto& real_str : real_strings) {
441  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
442  lit_buf_size += real_str.size();
443  }
444  if (double_array_literals.size() > 0) {
445  lit_buf_size = align(lit_buf_size, sizeof(double));
446  }
447  int16_t crt_double_arr_lit_off = lit_buf_size;
448  for (const auto& double_array_literal : double_array_literals) {
449  CHECK_LE(double_array_literal.size(),
450  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
451  lit_buf_size += double_array_literal.size() * sizeof(double);
452  }
453  if (align64_int8_array_literals.size() > 0) {
454  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
455  }
456  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
457  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
458  CHECK_LE(align64_int8_array_literals.size(),
459  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
460  lit_buf_size += align64_int8_array_literal.size();
461  }
462  if (int32_array_literals.size() > 0) {
463  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
464  }
465  int16_t crt_int32_arr_lit_off = lit_buf_size;
466  for (const auto& int32_array_literal : int32_array_literals) {
467  CHECK_LE(int32_array_literal.size(),
468  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
469  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
470  }
471  if (align32_int8_array_literals.size() > 0) {
472  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
473  }
474  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
475  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
476  CHECK_LE(align32_int8_array_literals.size(),
477  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
478  lit_buf_size += align32_int8_array_literal.size();
479  }
480  int16_t crt_int8_arr_lit_off = lit_buf_size;
481  for (const auto& int8_array_literal : int8_array_literals) {
482  CHECK_LE(int8_array_literal.size(),
483  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
484  lit_buf_size += int8_array_literal.size();
485  }
486  unsigned crt_real_str_idx = 0;
487  unsigned crt_double_arr_lit_idx = 0;
488  unsigned crt_align64_int8_arr_lit_idx = 0;
489  unsigned crt_int32_arr_lit_idx = 0;
490  unsigned crt_align32_int8_arr_lit_idx = 0;
491  unsigned crt_int8_arr_lit_idx = 0;
492  std::vector<int8_t> serialized(lit_buf_size);
493  size_t off{0};
494  for (const auto& lit : dev_literals) {
495  const auto lit_bytes = CgenState::literalBytes(lit);
496  off = CgenState::addAligned(off, lit_bytes);
497  switch (lit.which()) {
498  case 0: {
499  const auto p = boost::get<int8_t>(&lit);
500  CHECK(p);
501  serialized[off - lit_bytes] = *p;
502  break;
503  }
504  case 1: {
505  const auto p = boost::get<int16_t>(&lit);
506  CHECK(p);
507  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
508  break;
509  }
510  case 2: {
511  const auto p = boost::get<int32_t>(&lit);
512  CHECK(p);
513  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
514  break;
515  }
516  case 3: {
517  const auto p = boost::get<int64_t>(&lit);
518  CHECK(p);
519  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
520  break;
521  }
522  case 4: {
523  const auto p = boost::get<float>(&lit);
524  CHECK(p);
525  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
526  break;
527  }
528  case 5: {
529  const auto p = boost::get<double>(&lit);
530  CHECK(p);
531  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
532  break;
533  }
534  case 6: {
535  const auto p = boost::get<std::pair<std::string, int>>(&lit);
536  CHECK(p);
537  const auto str_id =
539  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
540  ->getOrAddTransient(p->first)
541  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
542  ->getIdOfString(p->first);
543  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
544  break;
545  }
546  case 7: {
547  const auto p = boost::get<std::string>(&lit);
548  CHECK(p);
549  int32_t off_and_len = crt_real_str_off << 16;
550  const auto& crt_real_str = real_strings[crt_real_str_idx];
551  off_and_len |= static_cast<int16_t>(crt_real_str.size());
552  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
553  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
554  ++crt_real_str_idx;
555  crt_real_str_off += crt_real_str.size();
556  break;
557  }
558  case 8: {
559  const auto p = boost::get<std::vector<double>>(&lit);
560  CHECK(p);
561  int32_t off_and_len = crt_double_arr_lit_off << 16;
562  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
563  int32_t len = crt_double_arr_lit.size();
564  CHECK_EQ((len >> 16), 0);
565  off_and_len |= static_cast<int16_t>(len);
566  int32_t double_array_bytesize = len * sizeof(double);
567  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
568  memcpy(&serialized[crt_double_arr_lit_off],
569  crt_double_arr_lit.data(),
570  double_array_bytesize);
571  ++crt_double_arr_lit_idx;
572  crt_double_arr_lit_off += double_array_bytesize;
573  break;
574  }
575  case 9: {
576  const auto p = boost::get<std::vector<int32_t>>(&lit);
577  CHECK(p);
578  int32_t off_and_len = crt_int32_arr_lit_off << 16;
579  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
580  int32_t len = crt_int32_arr_lit.size();
581  CHECK_EQ((len >> 16), 0);
582  off_and_len |= static_cast<int16_t>(len);
583  int32_t int32_array_bytesize = len * sizeof(int32_t);
584  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
585  memcpy(&serialized[crt_int32_arr_lit_off],
586  crt_int32_arr_lit.data(),
587  int32_array_bytesize);
588  ++crt_int32_arr_lit_idx;
589  crt_int32_arr_lit_off += int32_array_bytesize;
590  break;
591  }
592  case 10: {
593  const auto p = boost::get<std::vector<int8_t>>(&lit);
594  CHECK(p);
595  int32_t off_and_len = crt_int8_arr_lit_off << 16;
596  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
597  int32_t len = crt_int8_arr_lit.size();
598  CHECK_EQ((len >> 16), 0);
599  off_and_len |= static_cast<int16_t>(len);
600  int32_t int8_array_bytesize = len;
601  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
602  memcpy(&serialized[crt_int8_arr_lit_off],
603  crt_int8_arr_lit.data(),
604  int8_array_bytesize);
605  ++crt_int8_arr_lit_idx;
606  crt_int8_arr_lit_off += int8_array_bytesize;
607  break;
608  }
609  case 11: {
610  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
611  CHECK(p);
612  if (p->second == 64) {
613  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
614  const auto& crt_align64_int8_arr_lit =
615  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
616  int32_t len = crt_align64_int8_arr_lit.size();
617  CHECK_EQ((len >> 16), 0);
618  off_and_len |= static_cast<int16_t>(len);
619  int32_t align64_int8_array_bytesize = len;
620  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
621  memcpy(&serialized[crt_align64_int8_arr_lit_off],
622  crt_align64_int8_arr_lit.data(),
623  align64_int8_array_bytesize);
624  ++crt_align64_int8_arr_lit_idx;
625  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
626  } else if (p->second == 32) {
627  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
628  const auto& crt_align32_int8_arr_lit =
629  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
630  int32_t len = crt_align32_int8_arr_lit.size();
631  CHECK_EQ((len >> 16), 0);
632  off_and_len |= static_cast<int16_t>(len);
633  int32_t align32_int8_array_bytesize = len;
634  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
635  memcpy(&serialized[crt_align32_int8_arr_lit_off],
636  crt_align32_int8_arr_lit.data(),
637  align32_int8_array_bytesize);
638  ++crt_align32_int8_arr_lit_idx;
639  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
640  } else {
641  CHECK(false);
642  }
643  break;
644  }
645  default:
646  CHECK(false);
647  }
648  }
649  return serialized;
650 }
651 
652 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
653  if (device_type == ExecutorDeviceType::GPU) {
654  return cudaMgr()->getDeviceCount();
655  } else {
656  return 1;
657  }
658 }
659 
661  const Data_Namespace::MemoryLevel memory_level) const {
662  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
663  : deviceCount(ExecutorDeviceType::CPU);
664 }
665 
666 // TODO(alex): remove or split
667 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
668  const SQLTypeInfo& ti,
669  const int64_t agg_init_val,
670  const int8_t out_byte_width,
671  const int64_t* out_vec,
672  const size_t out_vec_sz,
673  const bool is_group_by,
674  const bool float_argument_input) {
675  switch (agg) {
676  case kAVG:
677  case kSUM:
678  if (0 != agg_init_val) {
679  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
680  int64_t agg_result = agg_init_val;
681  for (size_t i = 0; i < out_vec_sz; ++i) {
682  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
683  }
684  return {agg_result, 0};
685  } else {
686  CHECK(ti.is_fp());
687  switch (out_byte_width) {
688  case 4: {
689  int agg_result = static_cast<int32_t>(agg_init_val);
690  for (size_t i = 0; i < out_vec_sz; ++i) {
692  &agg_result,
693  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
694  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
695  }
696  const int64_t converted_bin =
697  float_argument_input
698  ? static_cast<int64_t>(agg_result)
699  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
700  return {converted_bin, 0};
701  break;
702  }
703  case 8: {
704  int64_t agg_result = agg_init_val;
705  for (size_t i = 0; i < out_vec_sz; ++i) {
707  &agg_result,
708  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
709  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
710  }
711  return {agg_result, 0};
712  break;
713  }
714  default:
715  CHECK(false);
716  }
717  }
718  }
719  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
720  int64_t agg_result = 0;
721  for (size_t i = 0; i < out_vec_sz; ++i) {
722  agg_result += out_vec[i];
723  }
724  return {agg_result, 0};
725  } else {
726  CHECK(ti.is_fp());
727  switch (out_byte_width) {
728  case 4: {
729  float r = 0.;
730  for (size_t i = 0; i < out_vec_sz; ++i) {
731  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
732  }
733  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
734  const int64_t converted_bin =
735  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
736  return {converted_bin, 0};
737  }
738  case 8: {
739  double r = 0.;
740  for (size_t i = 0; i < out_vec_sz; ++i) {
741  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
742  }
743  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
744  }
745  default:
746  CHECK(false);
747  }
748  }
749  break;
750  case kCOUNT: {
751  uint64_t agg_result = 0;
752  for (size_t i = 0; i < out_vec_sz; ++i) {
753  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
754  agg_result += out;
755  }
756  return {static_cast<int64_t>(agg_result), 0};
757  }
758  case kMIN: {
759  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
760  int64_t agg_result = agg_init_val;
761  for (size_t i = 0; i < out_vec_sz; ++i) {
762  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
763  }
764  return {agg_result, 0};
765  } else {
766  switch (out_byte_width) {
767  case 4: {
768  int32_t agg_result = static_cast<int32_t>(agg_init_val);
769  for (size_t i = 0; i < out_vec_sz; ++i) {
771  &agg_result,
772  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
773  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
774  }
775  const int64_t converted_bin =
776  float_argument_input
777  ? static_cast<int64_t>(agg_result)
778  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
779  return {converted_bin, 0};
780  }
781  case 8: {
782  int64_t agg_result = agg_init_val;
783  for (size_t i = 0; i < out_vec_sz; ++i) {
785  &agg_result,
786  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
787  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
788  }
789  return {agg_result, 0};
790  }
791  default:
792  CHECK(false);
793  }
794  }
795  }
796  case kMAX:
797  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
798  int64_t agg_result = agg_init_val;
799  for (size_t i = 0; i < out_vec_sz; ++i) {
800  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
801  }
802  return {agg_result, 0};
803  } else {
804  switch (out_byte_width) {
805  case 4: {
806  int32_t agg_result = static_cast<int32_t>(agg_init_val);
807  for (size_t i = 0; i < out_vec_sz; ++i) {
809  &agg_result,
810  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
811  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
812  }
813  const int64_t converted_bin =
814  float_argument_input ? static_cast<int64_t>(agg_result)
815  : float_to_double_bin(agg_result, !ti.get_notnull());
816  return {converted_bin, 0};
817  }
818  case 8: {
819  int64_t agg_result = agg_init_val;
820  for (size_t i = 0; i < out_vec_sz; ++i) {
822  &agg_result,
823  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
824  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
825  }
826  return {agg_result, 0};
827  }
828  default:
829  CHECK(false);
830  }
831  }
832  case kSINGLE_VALUE: {
833  int64_t agg_result = agg_init_val;
834  for (size_t i = 0; i < out_vec_sz; ++i) {
835  if (out_vec[i] != agg_init_val) {
836  if (agg_result == agg_init_val) {
837  agg_result = out_vec[i];
838  } else if (out_vec[i] != agg_result) {
840  }
841  }
842  }
843  return {agg_result, 0};
844  }
845  case kSAMPLE: {
846  int64_t agg_result = agg_init_val;
847  for (size_t i = 0; i < out_vec_sz; ++i) {
848  if (out_vec[i] != agg_init_val) {
849  agg_result = out_vec[i];
850  break;
851  }
852  }
853  return {agg_result, 0};
854  }
855  default:
856  CHECK(false);
857  }
858  abort();
859 }
860 
861 namespace {
862 
864  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device) {
865  auto& first = results_per_device.front().first;
866  CHECK(first);
867  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
868  const auto& next = results_per_device[dev_idx].first;
869  CHECK(next);
870  first->append(*next);
871  }
872  return std::move(first);
873 }
874 
875 } // namespace
876 
878  const RelAlgExecutionUnit& ra_exe_unit) {
879  auto& results_per_device = shared_context.getFragmentResults();
880  if (results_per_device.empty()) {
881  std::vector<TargetInfo> targets;
882  for (const auto target_expr : ra_exe_unit.target_exprs) {
883  targets.push_back(get_target_info(target_expr, g_bigint_count));
884  }
885  return std::make_shared<ResultSet>(targets,
888  row_set_mem_owner_,
889  catalog_,
890  blockSize(),
891  gridSize());
892  }
893  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
894  std::sort(results_per_device.begin(),
895  results_per_device.end(),
896  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
897  CHECK_GE(lhs.second.size(), size_t(1));
898  CHECK_GE(rhs.second.size(), size_t(1));
899  return lhs.second.front() < rhs.second.front();
900  });
901 
902  return get_merged_result(results_per_device);
903 }
904 
906  const RelAlgExecutionUnit& ra_exe_unit,
907  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
908  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
909  const QueryMemoryDescriptor& query_mem_desc) const {
910  auto timer = DEBUG_TIMER(__func__);
911  if (ra_exe_unit.estimator) {
912  return reduce_estimator_results(ra_exe_unit, results_per_device);
913  }
914 
915  if (results_per_device.empty()) {
916  std::vector<TargetInfo> targets;
917  for (const auto target_expr : ra_exe_unit.target_exprs) {
918  targets.push_back(get_target_info(target_expr, g_bigint_count));
919  }
920  return std::make_shared<ResultSet>(targets,
923  nullptr,
924  catalog_,
925  blockSize(),
926  gridSize());
927  }
928 
929  return reduceMultiDeviceResultSets(
930  results_per_device,
931  row_set_mem_owner,
932  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
933 }
934 
935 namespace {
936 
938  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
939  int64_t* compilation_queue_time) {
940  auto clock_begin = timer_start();
941  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
942  *compilation_queue_time = timer_stop(clock_begin);
943  const auto& this_result_set = results_per_device[0].first;
944  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
945  this_result_set->getTargetInfos(),
946  this_result_set->getTargetInitVals());
947  return reduction_jit.codegen();
948 };
949 
950 } // namespace
951 
953  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
954  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
955  const QueryMemoryDescriptor& query_mem_desc) const {
956  auto timer = DEBUG_TIMER(__func__);
957  std::shared_ptr<ResultSet> reduced_results;
958 
959  const auto& first = results_per_device.front().first;
960 
961  if (query_mem_desc.getQueryDescriptionType() ==
963  results_per_device.size() > 1) {
964  const auto total_entry_count = std::accumulate(
965  results_per_device.begin(),
966  results_per_device.end(),
967  size_t(0),
968  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
969  const auto& r = rs.first;
970  return init + r->getQueryMemDesc().getEntryCount();
971  });
972  CHECK(total_entry_count);
973  auto query_mem_desc = first->getQueryMemDesc();
974  query_mem_desc.setEntryCount(total_entry_count);
975  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
978  row_set_mem_owner,
979  catalog_,
980  blockSize(),
981  gridSize());
982  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
983  reduced_results->initializeStorage();
984  switch (query_mem_desc.getEffectiveKeyWidth()) {
985  case 4:
986  first->getStorage()->moveEntriesToBuffer<int32_t>(
987  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
988  break;
989  case 8:
990  first->getStorage()->moveEntriesToBuffer<int64_t>(
991  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
992  break;
993  default:
994  CHECK(false);
995  }
996  } else {
997  reduced_results = first;
998  }
999 
1000  int64_t compilation_queue_time = 0;
1001  const auto reduction_code =
1002  get_reduction_code(results_per_device, &compilation_queue_time);
1003 
1004  for (size_t i = 1; i < results_per_device.size(); ++i) {
1005  reduced_results->getStorage()->reduce(
1006  *(results_per_device[i].first->getStorage()), {}, reduction_code);
1007  }
1008  reduced_results->addCompilationQueueTime(compilation_queue_time);
1009  return reduced_results;
1010 }
1011 
1013  const RelAlgExecutionUnit& ra_exe_unit,
1014  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1015  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1016  const QueryMemoryDescriptor& query_mem_desc) const {
1017  if (results_per_device.size() == 1) {
1018  return std::move(results_per_device.front().first);
1019  }
1020  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1022  for (const auto& result : results_per_device) {
1023  auto rows = result.first;
1024  CHECK(rows);
1025  if (!rows) {
1026  continue;
1027  }
1028  SpeculativeTopNMap that(
1029  *rows,
1030  ra_exe_unit.target_exprs,
1031  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1032  m.reduce(that);
1033  }
1034  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1035  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1036  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1037 }
1038 
1039 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1040  CHECK(data_mgr);
1041  std::unordered_set<int> available_gpus;
1042  if (data_mgr->gpusPresent()) {
1043  CHECK(data_mgr->getCudaMgr());
1044  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1045  CHECK_GT(gpu_count, 0);
1046  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1047  available_gpus.insert(gpu_id);
1048  }
1049  }
1050  return available_gpus;
1051 }
1052 
1053 size_t get_context_count(const ExecutorDeviceType device_type,
1054  const size_t cpu_count,
1055  const size_t gpu_count) {
1056  return device_type == ExecutorDeviceType::GPU ? gpu_count
1057  : static_cast<size_t>(cpu_count);
1058 }
1059 
1060 namespace {
1061 
1062 // Compute a very conservative entry count for the output buffer entry count using no
1063 // other information than the number of tuples in each table and multiplying them
1064 // together.
1065 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1067  // Check for overflows since we're multiplying potentially big table sizes.
1068  using checked_size_t = boost::multiprecision::number<
1069  boost::multiprecision::cpp_int_backend<64,
1070  64,
1071  boost::multiprecision::unsigned_magnitude,
1072  boost::multiprecision::checked,
1073  void>>;
1074  checked_size_t max_groups_buffer_entry_guess = 1;
1075  for (const auto& query_info : query_infos) {
1076  CHECK(!query_info.info.fragments.empty());
1077  auto it = std::max_element(query_info.info.fragments.begin(),
1078  query_info.info.fragments.end(),
1079  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1080  return f1.getNumTuples() < f2.getNumTuples();
1081  });
1082  max_groups_buffer_entry_guess *= it->getNumTuples();
1083  }
1084  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1085  // baseline group layout with that many entries anyway.
1086  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1087  try {
1088  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1089  max_groups_buffer_entry_guess_cap);
1090  } catch (...) {
1091  return max_groups_buffer_entry_guess_cap;
1092  }
1093 }
1094 
1095 std::string get_table_name(const InputDescriptor& input_desc,
1097  const auto source_type = input_desc.getSourceType();
1098  if (source_type == InputSourceType::TABLE) {
1099  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1100  CHECK(td);
1101  return td->tableName;
1102  } else {
1103  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1104  }
1105 }
1106 
1107 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1108  const int device_count) {
1109  if (device_type == ExecutorDeviceType::GPU) {
1110  return device_count * Executor::high_scan_limit;
1111  }
1113 }
1114 
1116  const std::vector<InputTableInfo>& table_infos,
1118  const ExecutorDeviceType device_type,
1119  const int device_count) {
1120  for (const auto target_expr : ra_exe_unit.target_exprs) {
1121  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1122  return;
1123  }
1124  }
1125  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1126  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1127  // Allow a query with no scan limit to run on small tables
1128  return;
1129  }
1130  if (ra_exe_unit.use_bump_allocator) {
1131  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1132  // relative to the size of the input), so we bypass this check for now
1133  return;
1134  }
1135  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1136  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1137  (!ra_exe_unit.scan_limit ||
1138  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1139  std::vector<std::string> table_names;
1140  const auto& input_descs = ra_exe_unit.input_descs;
1141  for (const auto& input_desc : input_descs) {
1142  table_names.push_back(get_table_name(input_desc, cat));
1143  }
1144  if (!ra_exe_unit.scan_limit) {
1145  throw WatchdogException(
1146  "Projection query would require a scan without a limit on table(s): " +
1147  boost::algorithm::join(table_names, ", "));
1148  } else {
1149  throw WatchdogException(
1150  "Projection query output result set on table(s): " +
1151  boost::algorithm::join(table_names, ", ") + " would contain " +
1152  std::to_string(ra_exe_unit.scan_limit) +
1153  " rows, which is more than the current system limit of " +
1154  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1155  }
1156  }
1157 }
1158 
1159 } // namespace
1160 
1161 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1162  const RelAlgExecutionUnit& ra_exe_unit) {
1163  if (ra_exe_unit.input_descs.size() < 2) {
1164  return false;
1165  }
1166 
1167  // We only support loop join at the end of folded joins
1168  // where ra_exe_unit.input_descs.size() > 2 for now.
1169  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1170 
1171  std::optional<size_t> inner_table_idx;
1172  for (size_t i = 0; i < query_infos.size(); ++i) {
1173  if (query_infos[i].table_id == inner_table_id) {
1174  inner_table_idx = i;
1175  break;
1176  }
1177  }
1178  CHECK(inner_table_idx);
1179  return query_infos[*inner_table_idx].info.getNumTuples() <=
1181 }
1182 
1183 namespace {
1184 
1185 template <typename T>
1186 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1187  std::vector<std::string> expr_strs;
1188  for (const auto& expr : expr_container) {
1189  if (!expr) {
1190  expr_strs.emplace_back("NULL");
1191  } else {
1192  expr_strs.emplace_back(expr->toString());
1193  }
1194  }
1195  return expr_strs;
1196 }
1197 
1198 template <>
1199 std::vector<std::string> expr_container_to_string(
1200  const std::list<Analyzer::OrderEntry>& expr_container) {
1201  std::vector<std::string> expr_strs;
1202  for (const auto& expr : expr_container) {
1203  expr_strs.emplace_back(expr.toString());
1204  }
1205  return expr_strs;
1206 }
1207 
1208 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1209  switch (algorithm) {
1211  return "ResultSet";
1213  return "Speculative Top N";
1215  return "Streaming Top N";
1216  }
1217  UNREACHABLE();
1218  return "";
1219 }
1220 
1221 } // namespace
1222 
1223 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1224  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1225  // instead of ra_exec_unit description if possible
1226  std::ostringstream os;
1227  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1228  const auto& scan_desc = input_col_desc->getScanDesc();
1229  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1230  << scan_desc.getNestLevel();
1231  }
1232  if (!ra_exe_unit.simple_quals.empty()) {
1233  for (const auto& qual : ra_exe_unit.simple_quals) {
1234  if (qual) {
1235  os << qual->toString() << ",";
1236  }
1237  }
1238  }
1239  if (!ra_exe_unit.quals.empty()) {
1240  for (const auto& qual : ra_exe_unit.quals) {
1241  if (qual) {
1242  os << qual->toString() << ",";
1243  }
1244  }
1245  }
1246  if (!ra_exe_unit.join_quals.empty()) {
1247  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1248  const auto& join_condition = ra_exe_unit.join_quals[i];
1249  os << std::to_string(i) << ::toString(join_condition.type);
1250  for (const auto& qual : join_condition.quals) {
1251  if (qual) {
1252  os << qual->toString() << ",";
1253  }
1254  }
1255  }
1256  }
1257  if (!ra_exe_unit.groupby_exprs.empty()) {
1258  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1259  if (qual) {
1260  os << qual->toString() << ",";
1261  }
1262  }
1263  }
1264  for (const auto& expr : ra_exe_unit.target_exprs) {
1265  if (expr) {
1266  os << expr->toString() << ",";
1267  }
1268  }
1269  os << ::toString(ra_exe_unit.estimator == nullptr);
1270  os << std::to_string(ra_exe_unit.scan_limit);
1271  return os.str();
1272 }
1273 
1274 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1275  auto query_plan_dag =
1276  ra_exe_unit.query_plan_dag == EMPTY_QUERY_PLAN ? "N/A" : ra_exe_unit.query_plan_dag;
1277  os << "\n\tExtracted Query Plan Dag: " << query_plan_dag;
1278  os << "\n\tTable/Col/Levels: ";
1279  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1280  const auto& scan_desc = input_col_desc->getScanDesc();
1281  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1282  << scan_desc.getNestLevel() << ") ";
1283  }
1284  if (!ra_exe_unit.simple_quals.empty()) {
1285  os << "\n\tSimple Quals: "
1287  ", ");
1288  }
1289  if (!ra_exe_unit.quals.empty()) {
1290  os << "\n\tQuals: "
1291  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1292  }
1293  if (!ra_exe_unit.join_quals.empty()) {
1294  os << "\n\tJoin Quals: ";
1295  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1296  const auto& join_condition = ra_exe_unit.join_quals[i];
1297  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1298  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1299  }
1300  }
1301  if (!ra_exe_unit.groupby_exprs.empty()) {
1302  os << "\n\tGroup By: "
1304  ", ");
1305  }
1306  os << "\n\tProjected targets: "
1308  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1309  os << "\n\tSort Info: ";
1310  const auto& sort_info = ra_exe_unit.sort_info;
1311  os << "\n\t Order Entries: "
1312  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1313  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1314  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1315  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1316  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1317  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1318  if (ra_exe_unit.union_all) {
1319  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1320  }
1321  return os;
1322 }
1323 
1324 namespace {
1325 
1327  const size_t new_scan_limit) {
1328  return {ra_exe_unit_in.input_descs,
1329  ra_exe_unit_in.input_col_descs,
1330  ra_exe_unit_in.simple_quals,
1331  ra_exe_unit_in.quals,
1332  ra_exe_unit_in.join_quals,
1333  ra_exe_unit_in.groupby_exprs,
1334  ra_exe_unit_in.target_exprs,
1335  ra_exe_unit_in.estimator,
1336  ra_exe_unit_in.sort_info,
1337  new_scan_limit,
1338  ra_exe_unit_in.query_hint,
1339  ra_exe_unit_in.query_plan_dag,
1340  ra_exe_unit_in.hash_table_build_plan_dag,
1341  ra_exe_unit_in.use_bump_allocator,
1342  ra_exe_unit_in.union_all,
1343  ra_exe_unit_in.query_state};
1344 }
1345 
1346 } // namespace
1347 
1348 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1349  const bool is_agg,
1350  const std::vector<InputTableInfo>& query_infos,
1351  const RelAlgExecutionUnit& ra_exe_unit_in,
1352  const CompilationOptions& co,
1353  const ExecutionOptions& eo,
1355  RenderInfo* render_info,
1356  const bool has_cardinality_estimation,
1357  ColumnCacheMap& column_cache) {
1358  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1359 
1360  ScopeGuard cleanup_post_execution = [this] {
1361  // cleanup/unpin GPU buffer allocations
1362  // TODO: separate out this state into a single object
1363  plan_state_.reset(nullptr);
1364  if (cgen_state_) {
1365  cgen_state_->in_values_bitmaps_.clear();
1366  }
1367  };
1368 
1369  try {
1370  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1371  is_agg,
1372  true,
1373  query_infos,
1374  ra_exe_unit_in,
1375  co,
1376  eo,
1377  cat,
1378  row_set_mem_owner_,
1379  render_info,
1380  has_cardinality_estimation,
1381  column_cache);
1382  if (result) {
1383  result->setKernelQueueTime(kernel_queue_time_ms_);
1384  result->addCompilationQueueTime(compilation_queue_time_ms_);
1385  if (eo.just_validate) {
1386  result->setValidationOnlyRes();
1387  }
1388  }
1389  return result;
1390  } catch (const CompilationRetryNewScanLimit& e) {
1391  auto result =
1392  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1393  is_agg,
1394  false,
1395  query_infos,
1396  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1397  co,
1398  eo,
1399  cat,
1400  row_set_mem_owner_,
1401  render_info,
1402  has_cardinality_estimation,
1403  column_cache);
1404  if (result) {
1405  result->setKernelQueueTime(kernel_queue_time_ms_);
1406  result->addCompilationQueueTime(compilation_queue_time_ms_);
1407  if (eo.just_validate) {
1408  result->setValidationOnlyRes();
1409  }
1410  }
1411  return result;
1412  }
1413 }
1414 
1416  size_t& max_groups_buffer_entry_guess,
1417  const bool is_agg,
1418  const bool allow_single_frag_table_opt,
1419  const std::vector<InputTableInfo>& query_infos,
1420  const RelAlgExecutionUnit& ra_exe_unit_in,
1421  const CompilationOptions& co,
1422  const ExecutionOptions& eo,
1424  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1425  RenderInfo* render_info,
1426  const bool has_cardinality_estimation,
1427  ColumnCacheMap& column_cache) {
1428  INJECT_TIMER(Exec_executeWorkUnit);
1429  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1430  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1431  CHECK(!query_infos.empty());
1432  if (!max_groups_buffer_entry_guess) {
1433  // The query has failed the first execution attempt because of running out
1434  // of group by slots. Make the conservative choice: allocate fragment size
1435  // slots and run on the CPU.
1436  CHECK(device_type == ExecutorDeviceType::CPU);
1437  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1438  }
1439 
1440  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1441  do {
1442  SharedKernelContext shared_context(query_infos);
1443  ColumnFetcher column_fetcher(this, column_cache);
1444  ScopeGuard scope_guard = [&column_fetcher, &device_type] {
1445  column_fetcher.freeLinearizedBuf();
1446  if (device_type == ExecutorDeviceType::GPU) {
1447  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1448  }
1449  };
1450  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1451  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1452  if (eo.executor_type == ExecutorType::Native) {
1453  try {
1454  INJECT_TIMER(query_step_compilation);
1455  auto clock_begin = timer_start();
1456  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1457  compilation_queue_time_ms_ += timer_stop(clock_begin);
1458 
1459  query_mem_desc_owned =
1460  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1461  crt_min_byte_width,
1462  has_cardinality_estimation,
1463  ra_exe_unit,
1464  query_infos,
1465  deleted_cols_map,
1466  column_fetcher,
1467  {device_type,
1468  co.hoist_literals,
1469  co.opt_level,
1471  co.allow_lazy_fetch,
1473  co.explain_type,
1475  eo,
1476  render_info,
1477  this);
1478  CHECK(query_mem_desc_owned);
1479  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1480  } catch (CompilationRetryNoCompaction&) {
1481  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1482  continue;
1483  }
1484  } else {
1485  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1486  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1487  CHECK(!query_mem_desc_owned);
1488  query_mem_desc_owned.reset(
1490  }
1491  if (eo.just_explain) {
1492  return executeExplain(*query_comp_desc_owned);
1493  }
1494 
1495  for (const auto target_expr : ra_exe_unit.target_exprs) {
1496  plan_state_->target_exprs_.push_back(target_expr);
1497  }
1498 
1499  if (!eo.just_validate) {
1500  int available_cpus = cpu_threads();
1501  auto available_gpus = get_available_gpus(data_mgr_);
1502 
1503  const auto context_count =
1504  get_context_count(device_type, available_cpus, available_gpus.size());
1505  try {
1506  auto kernels = createKernels(shared_context,
1507  ra_exe_unit,
1508  column_fetcher,
1509  query_infos,
1510  eo,
1511  is_agg,
1512  allow_single_frag_table_opt,
1513  context_count,
1514  *query_comp_desc_owned,
1515  *query_mem_desc_owned,
1516  render_info,
1517  available_gpus,
1518  available_cpus);
1519  if (g_use_tbb_pool) {
1520 #ifdef HAVE_TBB
1521  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1522  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1523  std::move(kernels));
1524 #else
1525  throw std::runtime_error(
1526  "This build is not TBB enabled. Restart the server with "
1527  "\"enable-modern-thread-pool\" disabled.");
1528 #endif
1529  } else {
1530  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1531  std::move(kernels));
1532  }
1533  } catch (QueryExecutionError& e) {
1534  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1535  e.getErrorCode() == ERR_OUT_OF_TIME) {
1536  throw QueryExecutionError(ERR_INTERRUPTED);
1537  }
1538  if (e.getErrorCode() == ERR_INTERRUPTED) {
1539  throw QueryExecutionError(ERR_INTERRUPTED);
1540  }
1541  if (e.getErrorCode() == ERR_OVERFLOW_OR_UNDERFLOW &&
1542  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1543  crt_min_byte_width <<= 1;
1544  continue;
1545  }
1546  throw;
1547  }
1548  }
1549  if (is_agg) {
1550  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1551  // update query status to let user know we are now in the reduction phase
1552  std::string curRunningSession{""};
1553  std::string curRunningQuerySubmittedTime{""};
1554  bool sessionEnrolled = false;
1556  {
1557  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
1558  executor->getSessionLock());
1559  curRunningSession = executor->getCurrentQuerySession(session_read_lock);
1560  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1561  sessionEnrolled =
1562  executor->checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1563  }
1564  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1565  sessionEnrolled) {
1566  executor->updateQuerySessionStatus(curRunningSession,
1567  curRunningQuerySubmittedTime,
1569  }
1570  }
1571  try {
1572  return collectAllDeviceResults(shared_context,
1573  ra_exe_unit,
1574  *query_mem_desc_owned,
1575  query_comp_desc_owned->getDeviceType(),
1576  row_set_mem_owner);
1577  } catch (ReductionRanOutOfSlots&) {
1578  throw QueryExecutionError(ERR_OUT_OF_SLOTS);
1579  } catch (OverflowOrUnderflow&) {
1580  crt_min_byte_width <<= 1;
1581  continue;
1582  } catch (QueryExecutionError& e) {
1583  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1584  << ", what(): " << e.what();
1585  throw QueryExecutionError(e.getErrorCode());
1586  }
1587  }
1588  return resultsUnion(shared_context, ra_exe_unit);
1589 
1590  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1591 
1592  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1595  nullptr,
1596  catalog_,
1597  blockSize(),
1598  gridSize());
1599 }
1600 
1602  const RelAlgExecutionUnit& ra_exe_unit_in,
1603  const InputTableInfo& table_info,
1604  const CompilationOptions& co,
1605  const ExecutionOptions& eo,
1607  PerFragmentCallBack& cb,
1608  const std::set<size_t>& fragment_indexes_param) {
1609  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1610  ColumnCacheMap column_cache;
1611 
1612  std::vector<InputTableInfo> table_infos{table_info};
1613  SharedKernelContext kernel_context(table_infos);
1614 
1615  ColumnFetcher column_fetcher(this, column_cache);
1616  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1617  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1618  {
1619  auto clock_begin = timer_start();
1620  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1621  compilation_queue_time_ms_ += timer_stop(clock_begin);
1622  query_mem_desc_owned =
1623  query_comp_desc_owned->compile(0,
1624  8,
1625  /*has_cardinality_estimation=*/false,
1626  ra_exe_unit,
1627  table_infos,
1628  deleted_cols_map,
1629  column_fetcher,
1630  co,
1631  eo,
1632  nullptr,
1633  this);
1634  }
1635  CHECK(query_mem_desc_owned);
1636  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1637  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1638  const auto& outer_fragments = table_info.info.fragments;
1639 
1640  std::set<size_t> fragment_indexes;
1641  if (fragment_indexes_param.empty()) {
1642  // An empty `fragment_indexes_param` set implies executing
1643  // the query for all fragments in the table. In this
1644  // case, populate `fragment_indexes` with all fragment indexes.
1645  for (size_t i = 0; i < outer_fragments.size(); i++) {
1646  fragment_indexes.emplace(i);
1647  }
1648  } else {
1649  fragment_indexes = fragment_indexes_param;
1650  }
1651 
1652  {
1653  auto clock_begin = timer_start();
1654  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1655  kernel_queue_time_ms_ += timer_stop(clock_begin);
1656 
1657  for (auto fragment_index : fragment_indexes) {
1658  // We may want to consider in the future allowing this to execute on devices other
1659  // than CPU
1660  FragmentsList fragments_list{{table_id, {fragment_index}}};
1661  ExecutionKernel kernel(ra_exe_unit,
1662  co.device_type,
1663  /*device_id=*/0,
1664  eo,
1665  column_fetcher,
1666  *query_comp_desc_owned,
1667  *query_mem_desc_owned,
1668  fragments_list,
1670  /*render_info=*/nullptr,
1671  /*rowid_lookup_key=*/-1);
1672  kernel.run(this, 0, kernel_context);
1673  }
1674  }
1675 
1676  const auto& all_fragment_results = kernel_context.getFragmentResults();
1677 
1678  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
1679  CHECK_EQ(result_fragment_indexes.size(), 1);
1680  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
1681  }
1682 }
1683 
1685  const TableFunctionExecutionUnit exe_unit,
1686  const std::vector<InputTableInfo>& table_infos,
1687  const CompilationOptions& co,
1688  const ExecutionOptions& eo,
1690  INJECT_TIMER(Exec_executeTableFunction);
1691 
1692  if (eo.just_validate) {
1694  /*entry_count=*/0,
1696  /*is_table_function=*/true);
1697  query_mem_desc.setOutputColumnar(true);
1698  return std::make_shared<ResultSet>(
1699  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
1700  co.device_type,
1701  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
1702  this->getRowSetMemoryOwner(),
1703  this->getCatalog(),
1704  this->blockSize(),
1705  this->gridSize());
1706  }
1707 
1708  nukeOldState(false, table_infos, PlanState::DeletedColumnsMap{}, nullptr);
1709 
1710  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1711  // framework, we may want to move this up a level
1712 
1713  ColumnFetcher column_fetcher(this, column_cache);
1714  TableFunctionCompilationContext compilation_context;
1715  compilation_context.compile(exe_unit, co, this);
1716 
1717  TableFunctionExecutionContext exe_context(getRowSetMemoryOwner());
1718  return exe_context.execute(
1719  exe_unit, table_infos, &compilation_context, column_fetcher, co.device_type, this);
1720 }
1721 
1723  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1724 }
1725 
1727  const RelAlgExecutionUnit& ra_exe_unit,
1728  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
1729  TransientDictIdVisitor dict_id_visitor;
1730 
1731  auto visit_expr =
1732  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
1733  if (!expr) {
1734  return;
1735  }
1736  const auto dict_id = dict_id_visitor.visit(expr);
1737  if (dict_id >= 0) {
1738  auto sdp = getStringDictionaryProxy(dict_id, row_set_mem_owner, true);
1739  CHECK(sdp);
1740  TransientStringLiteralsVisitor visitor(sdp);
1741  visitor.visit(expr);
1742  }
1743  };
1744 
1745  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
1746  visit_expr(group_expr.get());
1747  }
1748 
1749  for (const auto& group_expr : ra_exe_unit.quals) {
1750  visit_expr(group_expr.get());
1751  }
1752 
1753  for (const auto& group_expr : ra_exe_unit.simple_quals) {
1754  visit_expr(group_expr.get());
1755  }
1756 
1757  for (const auto target_expr : ra_exe_unit.target_exprs) {
1758  const auto& target_type = target_expr->get_type_info();
1759  if (target_type.is_string() && target_type.get_compression() != kENCODING_DICT) {
1760  continue;
1761  }
1762  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
1763  if (agg_expr) {
1764  if (agg_expr->get_aggtype() == kSINGLE_VALUE ||
1765  agg_expr->get_aggtype() == kSAMPLE) {
1766  visit_expr(agg_expr->get_arg());
1767  }
1768  } else {
1769  visit_expr(target_expr);
1770  }
1771  }
1772 }
1773 
1775  const RelAlgExecutionUnit& ra_exe_unit,
1776  const ExecutorDeviceType requested_device_type) {
1777  for (const auto target_expr : ra_exe_unit.target_exprs) {
1778  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1779  if (!ra_exe_unit.groupby_exprs.empty() &&
1780  !isArchPascalOrLater(requested_device_type)) {
1781  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
1782  agg_info.agg_arg_type.get_type() == kDOUBLE) {
1783  return ExecutorDeviceType::CPU;
1784  }
1785  }
1786  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1787  return ExecutorDeviceType::CPU;
1788  }
1789  }
1790  return requested_device_type;
1791 }
1792 
1793 namespace {
1794 
1795 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
1796  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
1797  if (ti.is_fp()) {
1798  if (float_argument_input && ti.get_type() == kFLOAT) {
1799  int64_t float_null_val = 0;
1800  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
1801  static_cast<float>(inline_fp_null_val(ti));
1802  return float_null_val;
1803  }
1804  const auto double_null_val = inline_fp_null_val(ti);
1805  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
1806  }
1807  return inline_int_null_val(ti);
1808 }
1809 
1810 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
1811  std::vector<int64_t>& entry,
1812  const std::vector<Analyzer::Expr*>& target_exprs,
1814  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1815  const auto target_expr = target_exprs[target_idx];
1816  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1817  CHECK(agg_info.is_agg);
1818  target_infos.push_back(agg_info);
1819  if (g_cluster) {
1820  const auto executor = query_mem_desc.getExecutor();
1821  CHECK(executor);
1822  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1823  CHECK(row_set_mem_owner);
1824  const auto& count_distinct_desc =
1825  query_mem_desc.getCountDistinctDescriptor(target_idx);
1826  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
1827  CHECK(row_set_mem_owner);
1828  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1829  count_distinct_desc.bitmapPaddedSizeBytes(),
1830  /*thread_idx=*/0); // TODO: can we detect thread idx here?
1831  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1832  continue;
1833  }
1834  if (count_distinct_desc.impl_type_ == CountDistinctImplType::StdSet) {
1835  auto count_distinct_set = new std::set<int64_t>();
1836  CHECK(row_set_mem_owner);
1837  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1838  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1839  continue;
1840  }
1841  }
1842  const bool float_argument_input = takes_float_argument(agg_info);
1843  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
1844  entry.push_back(0);
1845  } else if (agg_info.agg_kind == kAVG) {
1846  entry.push_back(0);
1847  entry.push_back(0);
1848  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
1849  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
1850  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1851  entry.push_back(0);
1852  }
1853  } else if (agg_info.sql_type.is_varlen()) {
1854  entry.push_back(0);
1855  entry.push_back(0);
1856  } else {
1857  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1858  }
1859  } else {
1860  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
1861  }
1862  }
1863 }
1864 
1866  const std::vector<Analyzer::Expr*>& target_exprs_in,
1868  const ExecutorDeviceType device_type) {
1869  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1870  std::vector<Analyzer::Expr*> target_exprs;
1871  for (const auto target_expr : target_exprs_in) {
1872  const auto target_expr_copy =
1873  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
1874  CHECK(target_expr_copy);
1875  auto ti = target_expr->get_type_info();
1876  ti.set_notnull(false);
1877  target_expr_copy->set_type_info(ti);
1878  if (target_expr_copy->get_arg()) {
1879  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1880  arg_ti.set_notnull(false);
1881  target_expr_copy->get_arg()->set_type_info(arg_ti);
1882  }
1883  target_exprs_owned_copies.push_back(target_expr_copy);
1884  target_exprs.push_back(target_expr_copy.get());
1885  }
1886  std::vector<TargetInfo> target_infos;
1887  std::vector<int64_t> entry;
1888  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
1889  const auto executor = query_mem_desc.getExecutor();
1890  CHECK(executor);
1891  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1892  CHECK(row_set_mem_owner);
1893  auto rs = std::make_shared<ResultSet>(target_infos,
1894  device_type,
1896  row_set_mem_owner,
1897  executor->getCatalog(),
1898  executor->blockSize(),
1899  executor->gridSize());
1900  rs->allocateStorage();
1901  rs->fillOneEntry(entry);
1902  return rs;
1903 }
1904 
1905 } // namespace
1906 
1908  SharedKernelContext& shared_context,
1909  const RelAlgExecutionUnit& ra_exe_unit,
1911  const ExecutorDeviceType device_type,
1912  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1913  auto timer = DEBUG_TIMER(__func__);
1914  auto& result_per_device = shared_context.getFragmentResults();
1915  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1918  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1919  }
1920  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1921  try {
1922  return reduceSpeculativeTopN(
1923  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1924  } catch (const std::bad_alloc&) {
1925  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1926  }
1927  }
1928  const auto shard_count =
1929  device_type == ExecutorDeviceType::GPU
1931  : 0;
1932 
1933  if (shard_count && !result_per_device.empty()) {
1934  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1935  }
1936  return reduceMultiDeviceResults(
1937  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1938 }
1939 
1940 namespace {
1951 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
1952  const QueryMemoryDescriptor& input_query_mem_desc,
1953  const ResultSetStorage* output_storage,
1954  size_t output_row_index,
1955  const QueryMemoryDescriptor& output_query_mem_desc,
1956  const std::vector<uint32_t>& top_permutation) {
1957  const auto output_buffer = output_storage->getUnderlyingBuffer();
1958  const auto input_buffer = input_storage->getUnderlyingBuffer();
1959  for (const auto sorted_idx : top_permutation) {
1960  // permuting all group-columns in this result set into the final buffer:
1961  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
1962  group_idx++) {
1963  const auto input_column_ptr =
1964  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1965  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
1966  const auto output_column_ptr =
1967  output_buffer +
1968  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
1969  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
1970  memcpy(output_column_ptr,
1971  input_column_ptr,
1972  output_query_mem_desc.groupColWidth(group_idx));
1973  }
1974  // permuting all agg-columns in this result set into the final buffer:
1975  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
1976  slot_idx++) {
1977  const auto input_column_ptr =
1978  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
1979  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1980  const auto output_column_ptr =
1981  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
1982  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
1983  memcpy(output_column_ptr,
1984  input_column_ptr,
1985  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
1986  }
1987  ++output_row_index;
1988  }
1989  return output_row_index;
1990 }
1991 
2001 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2002  const ResultSetStorage* output_storage,
2003  size_t output_row_index,
2004  const QueryMemoryDescriptor& output_query_mem_desc,
2005  const std::vector<uint32_t>& top_permutation) {
2006  const auto output_buffer = output_storage->getUnderlyingBuffer();
2007  const auto input_buffer = input_storage->getUnderlyingBuffer();
2008  for (const auto sorted_idx : top_permutation) {
2009  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2010  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2011  row_ptr,
2012  output_query_mem_desc.getRowSize());
2013  ++output_row_index;
2014  }
2015  return output_row_index;
2016 }
2017 } // namespace
2018 
2019 // Collect top results from each device, stitch them together and sort. Partial
2020 // results from each device are guaranteed to be disjunct because we only go on
2021 // this path when one of the columns involved is a shard key.
2023  SharedKernelContext& shared_context,
2024  const RelAlgExecutionUnit& ra_exe_unit) const {
2025  auto& result_per_device = shared_context.getFragmentResults();
2026  const auto first_result_set = result_per_device.front().first;
2027  CHECK(first_result_set);
2028  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2029  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2030  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2031  top_query_mem_desc.setEntryCount(0);
2032  for (auto& result : result_per_device) {
2033  const auto result_set = result.first;
2034  CHECK(result_set);
2035  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2036  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2037  top_query_mem_desc.setEntryCount(new_entry_cnt);
2038  }
2039  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2040  first_result_set->getDeviceType(),
2041  top_query_mem_desc,
2042  first_result_set->getRowSetMemOwner(),
2043  catalog_,
2044  blockSize(),
2045  gridSize());
2046  auto top_storage = top_result_set->allocateStorage();
2047  size_t top_output_row_idx{0};
2048  for (auto& result : result_per_device) {
2049  const auto result_set = result.first;
2050  CHECK(result_set);
2051  const auto& top_permutation = result_set->getPermutationBuffer();
2052  CHECK_LE(top_permutation.size(), top_n);
2053  if (top_query_mem_desc.didOutputColumnar()) {
2054  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2055  result_set->getQueryMemDesc(),
2056  top_storage,
2057  top_output_row_idx,
2058  top_query_mem_desc,
2059  top_permutation);
2060  } else {
2061  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2062  top_storage,
2063  top_output_row_idx,
2064  top_query_mem_desc,
2065  top_permutation);
2066  }
2067  }
2068  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2069  return top_result_set;
2070 }
2071 
2072 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2073  const {
2074  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2075  const auto& join_info = plan_state_->join_info_;
2076  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2077  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2078  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2079  id_to_cond.insert(
2080  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2081  }
2082  return id_to_cond;
2083 }
2084 
2085 namespace {
2086 
2087 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2088  for (const auto& col : fetched_cols) {
2089  if (col.is_lazily_fetched) {
2090  return true;
2091  }
2092  }
2093  return false;
2094 }
2095 
2096 } // namespace
2097 
2098 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2099  SharedKernelContext& shared_context,
2100  const RelAlgExecutionUnit& ra_exe_unit,
2101  ColumnFetcher& column_fetcher,
2102  const std::vector<InputTableInfo>& table_infos,
2103  const ExecutionOptions& eo,
2104  const bool is_agg,
2105  const bool allow_single_frag_table_opt,
2106  const size_t context_count,
2107  const QueryCompilationDescriptor& query_comp_desc,
2109  RenderInfo* render_info,
2110  std::unordered_set<int>& available_gpus,
2111  int& available_cpus) {
2112  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2113 
2114  QueryFragmentDescriptor fragment_descriptor(
2115  ra_exe_unit,
2116  table_infos,
2117  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2119  : std::vector<Data_Namespace::MemoryInfo>{},
2122  CHECK(!ra_exe_unit.input_descs.empty());
2123 
2124  const auto device_type = query_comp_desc.getDeviceType();
2125  const bool uses_lazy_fetch =
2126  plan_state_->allow_lazy_fetch_ &&
2127  has_lazy_fetched_columns(getColLazyFetchInfo(ra_exe_unit.target_exprs));
2128  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2129  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2130  const auto device_count = deviceCount(device_type);
2131  CHECK_GT(device_count, 0);
2132 
2133  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2134  shared_context.getFragOffsets(),
2135  device_count,
2136  device_type,
2137  use_multifrag_kernel,
2139  this);
2140  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2141  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2142  }
2143 
2144  if (use_multifrag_kernel) {
2145  VLOG(1) << "Creating multifrag execution kernels";
2146  VLOG(1) << query_mem_desc.toString();
2147 
2148  // NB: We should never be on this path when the query is retried because of running
2149  // out of group by slots; also, for scan only queries on CPU we want the
2150  // high-granularity, fragment by fragment execution instead. For scan only queries on
2151  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2152  // buffer per fragment.
2153  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2154  &execution_kernels,
2155  &column_fetcher,
2156  &eo,
2157  &query_comp_desc,
2158  &query_mem_desc,
2159  render_info](const int device_id,
2160  const FragmentsList& frag_list,
2161  const int64_t rowid_lookup_key) {
2162  execution_kernels.emplace_back(
2163  std::make_unique<ExecutionKernel>(ra_exe_unit,
2165  device_id,
2166  eo,
2167  column_fetcher,
2168  query_comp_desc,
2169  query_mem_desc,
2170  frag_list,
2172  render_info,
2173  rowid_lookup_key));
2174  };
2175  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2176  } else {
2177  VLOG(1) << "Creating one execution kernel per fragment";
2178  VLOG(1) << query_mem_desc.toString();
2179 
2180  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2182  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2183  const auto max_frag_size =
2184  table_infos.front().info.getFragmentNumTuplesUpperBound();
2185  if (max_frag_size < query_mem_desc.getEntryCount()) {
2186  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2187  << " to match max fragment size " << max_frag_size
2188  << " for kernel per fragment execution path.";
2189  throw CompilationRetryNewScanLimit(max_frag_size);
2190  }
2191  }
2192 
2193  size_t frag_list_idx{0};
2194  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2195  &execution_kernels,
2196  &column_fetcher,
2197  &eo,
2198  &frag_list_idx,
2199  &device_type,
2200  &query_comp_desc,
2201  &query_mem_desc,
2202  render_info](const int device_id,
2203  const FragmentsList& frag_list,
2204  const int64_t rowid_lookup_key) {
2205  if (!frag_list.size()) {
2206  return;
2207  }
2208  CHECK_GE(device_id, 0);
2209 
2210  execution_kernels.emplace_back(
2211  std::make_unique<ExecutionKernel>(ra_exe_unit,
2212  device_type,
2213  device_id,
2214  eo,
2215  column_fetcher,
2216  query_comp_desc,
2217  query_mem_desc,
2218  frag_list,
2220  render_info,
2221  rowid_lookup_key));
2222  ++frag_list_idx;
2223  };
2224 
2225  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2226  ra_exe_unit);
2227  }
2228 
2229  return execution_kernels;
2230 }
2231 
2232 template <typename THREAD_POOL>
2234  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2235  auto clock_begin = timer_start();
2236  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2237  kernel_queue_time_ms_ += timer_stop(clock_begin);
2238 
2239  THREAD_POOL thread_pool;
2240  VLOG(1) << "Launching " << kernels.size() << " kernels for query.";
2241  size_t kernel_idx = 1;
2242  for (auto& kernel : kernels) {
2243  thread_pool.spawn(
2244  [this, &shared_context, parent_thread_id = logger::thread_id()](
2245  ExecutionKernel* kernel, const size_t crt_kernel_idx) {
2246  CHECK(kernel);
2247  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2248  const size_t thread_idx = crt_kernel_idx % cpu_threads();
2249  kernel->run(this, thread_idx, shared_context);
2250  },
2251  kernel.get(),
2252  kernel_idx++);
2253  }
2254  thread_pool.join();
2255 }
2256 
2258  const RelAlgExecutionUnit& ra_exe_unit,
2259  const ExecutorDeviceType device_type,
2260  const size_t table_idx,
2261  const size_t outer_frag_idx,
2262  std::map<int, const TableFragments*>& selected_tables_fragments,
2263  const std::unordered_map<int, const Analyzer::BinOper*>&
2264  inner_table_id_to_join_condition) {
2265  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2266  auto table_frags_it = selected_tables_fragments.find(table_id);
2267  CHECK(table_frags_it != selected_tables_fragments.end());
2268  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2269  const auto outer_table_fragments_it =
2270  selected_tables_fragments.find(outer_input_desc.getTableId());
2271  const auto outer_table_fragments = outer_table_fragments_it->second;
2272  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2273  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2274  if (!table_idx) {
2275  return {outer_frag_idx};
2276  }
2277  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2278  auto& inner_frags = table_frags_it->second;
2279  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2280  std::vector<size_t> all_frag_ids;
2281  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2282  ++inner_frag_idx) {
2283  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2284  if (skipFragmentPair(outer_fragment_info,
2285  inner_frag_info,
2286  table_idx,
2287  inner_table_id_to_join_condition,
2288  ra_exe_unit,
2289  device_type)) {
2290  continue;
2291  }
2292  all_frag_ids.push_back(inner_frag_idx);
2293  }
2294  return all_frag_ids;
2295 }
2296 
2297 // Returns true iff the join between two fragments cannot yield any results, per
2298 // shard information. The pair can be skipped to avoid full broadcast.
2300  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2301  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2302  const int table_idx,
2303  const std::unordered_map<int, const Analyzer::BinOper*>&
2304  inner_table_id_to_join_condition,
2305  const RelAlgExecutionUnit& ra_exe_unit,
2306  const ExecutorDeviceType device_type) {
2307  if (device_type != ExecutorDeviceType::GPU) {
2308  return false;
2309  }
2310  CHECK(table_idx >= 0 &&
2311  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2312  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2313  // Both tables need to be sharded the same way.
2314  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2315  outer_fragment_info.shard == inner_fragment_info.shard) {
2316  return false;
2317  }
2318  const Analyzer::BinOper* join_condition{nullptr};
2319  if (ra_exe_unit.join_quals.empty()) {
2320  CHECK(!inner_table_id_to_join_condition.empty());
2321  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2322  CHECK(condition_it != inner_table_id_to_join_condition.end());
2323  join_condition = condition_it->second;
2324  CHECK(join_condition);
2325  } else {
2326  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2327  plan_state_->join_info_.join_hash_tables_.size());
2328  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2329  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2330  table_idx) {
2331  CHECK(!join_condition);
2332  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2333  }
2334  }
2335  }
2336  if (!join_condition) {
2337  return false;
2338  }
2339  // TODO(adb): support fragment skipping based on the overlaps operator
2340  if (join_condition->is_overlaps_oper()) {
2341  return false;
2342  }
2343  size_t shard_count{0};
2344  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2345  join_condition->get_left_operand())) {
2346  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
2347  join_condition, *getCatalog(), getTemporaryTables());
2349  join_condition, this, inner_outer_pairs);
2350  } else {
2351  shard_count = get_shard_count(join_condition, this);
2352  }
2353  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2354  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2355  }
2356  return shard_count;
2357 }
2358 
2359 namespace {
2360 
2363  const int table_id = col_desc->getScanDesc().getTableId();
2364  const int col_id = col_desc->getColId();
2365  return get_column_descriptor_maybe(col_id, table_id, cat);
2366 }
2367 
2368 } // namespace
2369 
2370 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2371  const std::vector<InputDescriptor>& input_descs,
2372  const std::map<int, const TableFragments*>& all_tables_fragments) {
2373  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2374  for (auto& desc : input_descs) {
2375  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2376  CHECK(fragments_it != all_tables_fragments.end());
2377  const auto& fragments = *fragments_it->second;
2378  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2379  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2380  frag_offsets[i] = off;
2381  off += fragments[i].getNumTuples();
2382  }
2383  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2384  }
2385  return tab_id_to_frag_offsets;
2386 }
2387 
2388 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2390  const RelAlgExecutionUnit& ra_exe_unit,
2391  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2392  const std::vector<InputDescriptor>& input_descs,
2393  const std::map<int, const TableFragments*>& all_tables_fragments) {
2394  std::vector<std::vector<int64_t>> all_num_rows;
2395  std::vector<std::vector<uint64_t>> all_frag_offsets;
2396  const auto tab_id_to_frag_offsets =
2397  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2398  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2399  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2400  std::vector<int64_t> num_rows;
2401  std::vector<uint64_t> frag_offsets;
2402  if (!ra_exe_unit.union_all) {
2403  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2404  }
2405  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2406  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2407  const auto fragments_it =
2408  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2409  CHECK(fragments_it != all_tables_fragments.end());
2410  const auto& fragments = *fragments_it->second;
2411  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2412  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2413  const auto& fragment = fragments[frag_id];
2414  num_rows.push_back(fragment.getNumTuples());
2415  } else {
2416  size_t total_row_count{0};
2417  for (const auto& fragment : fragments) {
2418  total_row_count += fragment.getNumTuples();
2419  }
2420  num_rows.push_back(total_row_count);
2421  }
2422  const auto frag_offsets_it =
2423  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2424  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2425  const auto& offsets = frag_offsets_it->second;
2426  CHECK_LT(frag_id, offsets.size());
2427  frag_offsets.push_back(offsets[frag_id]);
2428  }
2429  all_num_rows.push_back(num_rows);
2430  // Fragment offsets of outer table should be ONLY used by rowid for now.
2431  all_frag_offsets.push_back(frag_offsets);
2432  }
2433  return {all_num_rows, all_frag_offsets};
2434 }
2435 
2436 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2437 // all the table fragments.
2439  const RelAlgExecutionUnit& ra_exe_unit,
2440  const FragmentsList& selected_fragments) const {
2441  const auto& input_descs = ra_exe_unit.input_descs;
2442  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2443  if (nest_level < 1 ||
2444  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2445  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2446  (ra_exe_unit.join_quals.empty() &&
2447  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2448  return false;
2449  }
2450  const int table_id = inner_col_desc.getScanDesc().getTableId();
2451  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2452  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2453  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2454  return fragments.size() > 1;
2455 }
2456 
2458  const ColumnDescriptor* cd,
2459  const InputColDescriptor& inner_col_desc,
2460  const RelAlgExecutionUnit& ra_exe_unit,
2461  const FragmentsList& selected_fragments,
2462  const Data_Namespace::MemoryLevel memory_level) const {
2463  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2464  const int table_id = inner_col_desc.getScanDesc().getTableId();
2465  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2466  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2467  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2468  auto need_linearize =
2469  cd->columnType.is_array() ||
2471  return table_id > 0 && need_linearize && fragments.size() > 1;
2472 }
2473 
2474 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2475  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2476  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2477  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2478 }
2479 
2481  const ColumnFetcher& column_fetcher,
2482  const RelAlgExecutionUnit& ra_exe_unit,
2483  const int device_id,
2484  const Data_Namespace::MemoryLevel memory_level,
2485  const std::map<int, const TableFragments*>& all_tables_fragments,
2486  const FragmentsList& selected_fragments,
2488  std::list<ChunkIter>& chunk_iterators,
2489  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2490  DeviceAllocator* device_allocator,
2491  const size_t thread_idx,
2492  const bool allow_runtime_interrupt) {
2493  auto timer = DEBUG_TIMER(__func__);
2494  INJECT_TIMER(fetchChunks);
2495  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2496  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2497  std::vector<size_t> local_col_to_frag_pos;
2498  buildSelectedFragsMapping(selected_fragments_crossjoin,
2499  local_col_to_frag_pos,
2500  col_global_ids,
2501  selected_fragments,
2502  ra_exe_unit);
2503 
2505  selected_fragments_crossjoin);
2506  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2507  std::vector<std::vector<int64_t>> all_num_rows;
2508  std::vector<std::vector<uint64_t>> all_frag_offsets;
2509  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2510  std::vector<const int8_t*> frag_col_buffers(
2511  plan_state_->global_to_local_col_ids_.size());
2512  for (const auto& col_id : col_global_ids) {
2513  if (allow_runtime_interrupt) {
2514  bool isInterrupted = false;
2515  {
2516  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2517  const auto query_session = getCurrentQuerySession(session_read_lock);
2518  isInterrupted =
2519  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2520  }
2521  if (isInterrupted) {
2522  throw QueryExecutionError(ERR_INTERRUPTED);
2523  }
2524  }
2525  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2526  throw QueryExecutionError(ERR_INTERRUPTED);
2527  }
2528  CHECK(col_id);
2529  const int table_id = col_id->getScanDesc().getTableId();
2530  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2531  if (cd && cd->isVirtualCol) {
2532  CHECK_EQ("rowid", cd->columnName);
2533  continue;
2534  }
2535  const auto fragments_it = all_tables_fragments.find(table_id);
2536  CHECK(fragments_it != all_tables_fragments.end());
2537  const auto fragments = fragments_it->second;
2538  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2539  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2540  CHECK_LT(static_cast<size_t>(it->second),
2541  plan_state_->global_to_local_col_ids_.size());
2542  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2543  if (!fragments->size()) {
2544  return {};
2545  }
2546  CHECK_LT(frag_id, fragments->size());
2547  auto memory_level_for_column = memory_level;
2548  if (plan_state_->columns_to_fetch_.find(
2549  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2550  plan_state_->columns_to_fetch_.end()) {
2551  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2552  }
2553  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2554  frag_col_buffers[it->second] =
2555  column_fetcher.getResultSetColumn(col_id.get(),
2556  memory_level_for_column,
2557  device_id,
2558  device_allocator,
2559  thread_idx);
2560  } else {
2561  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2562  // determine if we need special treatment to linearlize multi-frag table
2563  // i.e., a column that is classified as varlen type, i.e., array
2564  // for now, we only support fixed-length array that contains
2565  // geo point coordianates but we can support more types in this way
2566  if (needLinearizeAllFragments(cd,
2567  *col_id,
2568  ra_exe_unit,
2569  selected_fragments,
2570  memory_level_for_column)) {
2571  frag_col_buffers[it->second] =
2572  column_fetcher.linearizeColumnFragments(table_id,
2573  col_id->getColId(),
2574  all_tables_fragments,
2575  chunks,
2576  chunk_iterators,
2577  memory_level_for_column,
2578  device_id,
2579  device_allocator,
2580  thread_idx);
2581  } else {
2582  frag_col_buffers[it->second] =
2583  column_fetcher.getAllTableColumnFragments(table_id,
2584  col_id->getColId(),
2585  all_tables_fragments,
2586  memory_level_for_column,
2587  device_id,
2588  device_allocator,
2589  thread_idx);
2590  }
2591  } else {
2592  frag_col_buffers[it->second] =
2593  column_fetcher.getOneTableColumnFragment(table_id,
2594  frag_id,
2595  col_id->getColId(),
2596  all_tables_fragments,
2597  chunks,
2598  chunk_iterators,
2599  memory_level_for_column,
2600  device_id,
2601  device_allocator);
2602  }
2603  }
2604  }
2605  all_frag_col_buffers.push_back(frag_col_buffers);
2606  }
2607  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2608  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2609  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2610 }
2611 
2612 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
2613 // This is written under the assumption that multiple inputs implies a UNION ALL.
2615  const ColumnFetcher& column_fetcher,
2616  const RelAlgExecutionUnit& ra_exe_unit,
2617  const int device_id,
2618  const Data_Namespace::MemoryLevel memory_level,
2619  const std::map<int, const TableFragments*>& all_tables_fragments,
2620  const FragmentsList& selected_fragments,
2622  std::list<ChunkIter>& chunk_iterators,
2623  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2624  DeviceAllocator* device_allocator,
2625  const size_t thread_idx,
2626  const bool allow_runtime_interrupt) {
2627  auto timer = DEBUG_TIMER(__func__);
2628  INJECT_TIMER(fetchUnionChunks);
2629 
2630  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2631  std::vector<std::vector<int64_t>> all_num_rows;
2632  std::vector<std::vector<uint64_t>> all_frag_offsets;
2633 
2634  CHECK(!selected_fragments.empty());
2635  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2636  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2637  using TableId = int;
2638  TableId const selected_table_id = selected_fragments.front().table_id;
2639  bool const input_descs_index =
2640  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2641  if (!input_descs_index) {
2642  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2643  }
2644  bool const input_col_descs_index =
2645  selected_table_id ==
2646  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2647  if (!input_col_descs_index) {
2648  CHECK_EQ(selected_table_id,
2649  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2650  }
2651  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2652  << " selected_table_id=" << selected_table_id
2653  << " input_descs_index=" << int(input_descs_index)
2654  << " input_col_descs_index=" << int(input_col_descs_index)
2655  << " ra_exe_unit.input_descs="
2656  << shared::printContainer(ra_exe_unit.input_descs)
2657  << " ra_exe_unit.input_col_descs="
2658  << shared::printContainer(ra_exe_unit.input_col_descs);
2659 
2660  // Partition col_global_ids by table_id
2661  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2662  table_id_to_input_col_descs;
2663  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2664  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2665  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2666  }
2667  for (auto const& pair : table_id_to_input_col_descs) {
2668  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2669  std::vector<size_t> local_col_to_frag_pos;
2670 
2671  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2672  local_col_to_frag_pos,
2673  pair.second,
2674  selected_fragments,
2675  ra_exe_unit);
2676 
2678  selected_fragments_crossjoin);
2679 
2680  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2681  if (allow_runtime_interrupt) {
2682  bool isInterrupted = false;
2683  {
2684  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2685  const auto query_session = getCurrentQuerySession(session_read_lock);
2686  isInterrupted =
2687  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2688  }
2689  if (isInterrupted) {
2690  throw QueryExecutionError(ERR_INTERRUPTED);
2691  }
2692  }
2693  std::vector<const int8_t*> frag_col_buffers(
2694  plan_state_->global_to_local_col_ids_.size());
2695  for (const auto& col_id : pair.second) {
2696  CHECK(col_id);
2697  const int table_id = col_id->getScanDesc().getTableId();
2698  CHECK_EQ(table_id, pair.first);
2699  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2700  if (cd && cd->isVirtualCol) {
2701  CHECK_EQ("rowid", cd->columnName);
2702  continue;
2703  }
2704  const auto fragments_it = all_tables_fragments.find(table_id);
2705  CHECK(fragments_it != all_tables_fragments.end());
2706  const auto fragments = fragments_it->second;
2707  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2708  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2709  CHECK_LT(static_cast<size_t>(it->second),
2710  plan_state_->global_to_local_col_ids_.size());
2711  const size_t frag_id = ra_exe_unit.union_all
2712  ? 0
2713  : selected_frag_ids[local_col_to_frag_pos[it->second]];
2714  if (!fragments->size()) {
2715  return {};
2716  }
2717  CHECK_LT(frag_id, fragments->size());
2718  auto memory_level_for_column = memory_level;
2719  if (plan_state_->columns_to_fetch_.find(
2720  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2721  plan_state_->columns_to_fetch_.end()) {
2722  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2723  }
2724  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2725  frag_col_buffers[it->second] =
2726  column_fetcher.getResultSetColumn(col_id.get(),
2727  memory_level_for_column,
2728  device_id,
2729  device_allocator,
2730  thread_idx);
2731  } else {
2732  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2733  frag_col_buffers[it->second] =
2734  column_fetcher.getAllTableColumnFragments(table_id,
2735  col_id->getColId(),
2736  all_tables_fragments,
2737  memory_level_for_column,
2738  device_id,
2739  device_allocator,
2740  thread_idx);
2741  } else {
2742  frag_col_buffers[it->second] =
2743  column_fetcher.getOneTableColumnFragment(table_id,
2744  frag_id,
2745  col_id->getColId(),
2746  all_tables_fragments,
2747  chunks,
2748  chunk_iterators,
2749  memory_level_for_column,
2750  device_id,
2751  device_allocator);
2752  }
2753  }
2754  }
2755  all_frag_col_buffers.push_back(frag_col_buffers);
2756  }
2757  std::vector<std::vector<int64_t>> num_rows;
2758  std::vector<std::vector<uint64_t>> frag_offsets;
2759  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
2760  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2761  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2762  all_frag_offsets.insert(
2763  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2764  }
2765  // The hack below assumes a particular table traversal order which is not
2766  // always achieved due to unordered map in the outermost loop. According
2767  // to the code below we expect NULLs in even positions of all_frag_col_buffers[0]
2768  // and odd positions of all_frag_col_buffers[1]. As an additional hack we
2769  // swap these vectors if NULLs are not on expected positions.
2770  if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
2771  !all_frag_col_buffers[0][1]) {
2772  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2773  }
2774  // UNION ALL hacks.
2775  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
2776  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2777  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2778  }
2779  if (input_descs_index == input_col_descs_index) {
2780  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2781  }
2782 
2783  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
2784  << " all_num_rows=" << shared::printContainer(all_num_rows)
2785  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
2786  << " input_col_descs_index=" << input_col_descs_index;
2787  return {{all_frag_col_buffers[input_descs_index]},
2788  {{all_num_rows[0][input_descs_index]}},
2789  {{all_frag_offsets[0][input_descs_index]}}};
2790 }
2791 
2792 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
2793  const size_t scan_idx,
2794  const RelAlgExecutionUnit& ra_exe_unit) {
2795  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
2796  scan_idx > 0 &&
2797  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2798  !selected_fragments[scan_idx].fragment_ids.empty()) {
2799  // Fetch all fragments
2800  return {size_t(0)};
2801  }
2802 
2803  return selected_fragments[scan_idx].fragment_ids;
2804 }
2805 
2807  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2808  std::vector<size_t>& local_col_to_frag_pos,
2809  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2810  const FragmentsList& selected_fragments,
2811  const RelAlgExecutionUnit& ra_exe_unit) {
2812  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2813  size_t frag_pos{0};
2814  const auto& input_descs = ra_exe_unit.input_descs;
2815  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2816  const int table_id = input_descs[scan_idx].getTableId();
2817  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2818  selected_fragments_crossjoin.push_back(
2819  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2820  for (const auto& col_id : col_global_ids) {
2821  CHECK(col_id);
2822  const auto& input_desc = col_id->getScanDesc();
2823  if (input_desc.getTableId() != table_id ||
2824  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2825  continue;
2826  }
2827  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2828  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2829  CHECK_LT(static_cast<size_t>(it->second),
2830  plan_state_->global_to_local_col_ids_.size());
2831  local_col_to_frag_pos[it->second] = frag_pos;
2832  }
2833  ++frag_pos;
2834  }
2835 }
2836 
2838  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2839  std::vector<size_t>& local_col_to_frag_pos,
2840  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2841  const FragmentsList& selected_fragments,
2842  const RelAlgExecutionUnit& ra_exe_unit) {
2843  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2844  size_t frag_pos{0};
2845  const auto& input_descs = ra_exe_unit.input_descs;
2846  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2847  const int table_id = input_descs[scan_idx].getTableId();
2848  // selected_fragments here is from assignFragsToKernelDispatch
2849  // execution_kernel.fragments
2850  if (selected_fragments[0].table_id != table_id) { // TODO 0
2851  continue;
2852  }
2853  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2854  selected_fragments_crossjoin.push_back(
2855  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2856  {size_t(1)}); // TODO
2857  for (const auto& col_id : col_global_ids) {
2858  CHECK(col_id);
2859  const auto& input_desc = col_id->getScanDesc();
2860  if (input_desc.getTableId() != table_id ||
2861  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2862  continue;
2863  }
2864  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2865  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2866  CHECK_LT(static_cast<size_t>(it->second),
2867  plan_state_->global_to_local_col_ids_.size());
2868  local_col_to_frag_pos[it->second] = frag_pos;
2869  }
2870  ++frag_pos;
2871  }
2872 }
2873 
2874 namespace {
2875 
2877  public:
2878  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2880  for (auto out : out_vec_) {
2881  delete[] out;
2882  }
2883  }
2884 
2885  private:
2886  std::vector<int64_t*> out_vec_;
2887 };
2888 } // namespace
2889 
2891  const RelAlgExecutionUnit& ra_exe_unit,
2892  const CompilationResult& compilation_result,
2893  const bool hoist_literals,
2894  ResultSetPtr& results,
2895  const std::vector<Analyzer::Expr*>& target_exprs,
2896  const ExecutorDeviceType device_type,
2897  std::vector<std::vector<const int8_t*>>& col_buffers,
2898  QueryExecutionContext* query_exe_context,
2899  const std::vector<std::vector<int64_t>>& num_rows,
2900  const std::vector<std::vector<uint64_t>>& frag_offsets,
2901  Data_Namespace::DataMgr* data_mgr,
2902  const int device_id,
2903  const uint32_t start_rowid,
2904  const uint32_t num_tables,
2905  const bool allow_runtime_interrupt,
2906  RenderInfo* render_info) {
2907  INJECT_TIMER(executePlanWithoutGroupBy);
2908  auto timer = DEBUG_TIMER(__func__);
2909  CHECK(!results);
2910  if (col_buffers.empty()) {
2911  return 0;
2912  }
2913 
2914  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2915  if (render_info) {
2916  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2917  // here, we are in non-insitu mode.
2918  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2919  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2920  "currently unsupported.";
2921  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2922  }
2923 
2924  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2925  std::vector<int64_t*> out_vec;
2926  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2927  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2928  std::unique_ptr<OutVecOwner> output_memory_scope;
2929  if (allow_runtime_interrupt) {
2930  bool isInterrupted = false;
2931  {
2932  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2933  const auto query_session = getCurrentQuerySession(session_read_lock);
2934  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2935  }
2936  if (isInterrupted) {
2937  throw QueryExecutionError(ERR_INTERRUPTED);
2938  }
2939  }
2940  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2941  throw QueryExecutionError(ERR_INTERRUPTED);
2942  }
2943  if (device_type == ExecutorDeviceType::CPU) {
2944  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2945  compilation_result.generated_code);
2946  CHECK(cpu_generated_code);
2947  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2948  cpu_generated_code.get(),
2949  hoist_literals,
2950  hoist_buf,
2951  col_buffers,
2952  num_rows,
2953  frag_offsets,
2954  0,
2955  &error_code,
2956  num_tables,
2957  join_hash_table_ptrs);
2958  output_memory_scope.reset(new OutVecOwner(out_vec));
2959  } else {
2960  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2961  compilation_result.generated_code);
2962  CHECK(gpu_generated_code);
2963  try {
2964  out_vec = query_exe_context->launchGpuCode(
2965  ra_exe_unit,
2966  gpu_generated_code.get(),
2967  hoist_literals,
2968  hoist_buf,
2969  col_buffers,
2970  num_rows,
2971  frag_offsets,
2972  0,
2973  data_mgr,
2974  blockSize(),
2975  gridSize(),
2976  device_id,
2977  compilation_result.gpu_smem_context.getSharedMemorySize(),
2978  &error_code,
2979  num_tables,
2980  allow_runtime_interrupt,
2981  join_hash_table_ptrs,
2982  render_allocator_map_ptr);
2983  output_memory_scope.reset(new OutVecOwner(out_vec));
2984  } catch (const OutOfMemory&) {
2985  return ERR_OUT_OF_GPU_MEM;
2986  } catch (const std::exception& e) {
2987  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2988  }
2989  }
2990  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2991  error_code == Executor::ERR_DIV_BY_ZERO ||
2992  error_code == Executor::ERR_OUT_OF_TIME ||
2993  error_code == Executor::ERR_INTERRUPTED ||
2995  error_code == Executor::ERR_GEOS) {
2996  return error_code;
2997  }
2998  if (ra_exe_unit.estimator) {
2999  CHECK(!error_code);
3000  results =
3001  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3002  return 0;
3003  }
3004  std::vector<int64_t> reduced_outs;
3005  const auto num_frags = col_buffers.size();
3006  const size_t entry_count =
3007  device_type == ExecutorDeviceType::GPU
3008  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3009  ? 1
3010  : blockSize() * gridSize() * num_frags)
3011  : num_frags;
3012  if (size_t(1) == entry_count) {
3013  for (auto out : out_vec) {
3014  CHECK(out);
3015  reduced_outs.push_back(*out);
3016  }
3017  } else {
3018  size_t out_vec_idx = 0;
3019 
3020  for (const auto target_expr : target_exprs) {
3021  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3022  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3023  << target_expr->toString();
3024 
3025  const int num_iterations = agg_info.sql_type.is_geometry()
3026  ? agg_info.sql_type.get_physical_coord_cols()
3027  : 1;
3028 
3029  for (int i = 0; i < num_iterations; i++) {
3030  int64_t val1;
3031  const bool float_argument_input = takes_float_argument(agg_info);
3032  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_QUANTILE) {
3033  CHECK(agg_info.agg_kind == kCOUNT ||
3034  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
3035  agg_info.agg_kind == kAPPROX_QUANTILE);
3036  val1 = out_vec[out_vec_idx][0];
3037  error_code = 0;
3038  } else {
3039  const auto chosen_bytes = static_cast<size_t>(
3040  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3041  std::tie(val1, error_code) = Executor::reduceResults(
3042  agg_info.agg_kind,
3043  agg_info.sql_type,
3044  query_exe_context->getAggInitValForIndex(out_vec_idx),
3045  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3046  out_vec[out_vec_idx],
3047  entry_count,
3048  false,
3049  float_argument_input);
3050  }
3051  if (error_code) {
3052  break;
3053  }
3054  reduced_outs.push_back(val1);
3055  if (agg_info.agg_kind == kAVG ||
3056  (agg_info.agg_kind == kSAMPLE &&
3057  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3058  const auto chosen_bytes = static_cast<size_t>(
3059  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3060  1));
3061  int64_t val2;
3062  std::tie(val2, error_code) = Executor::reduceResults(
3063  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3064  agg_info.sql_type,
3065  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3066  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3067  out_vec[out_vec_idx + 1],
3068  entry_count,
3069  false,
3070  false);
3071  if (error_code) {
3072  break;
3073  }
3074  reduced_outs.push_back(val2);
3075  ++out_vec_idx;
3076  }
3077  ++out_vec_idx;
3078  }
3079  }
3080  }
3081 
3082  if (error_code) {
3083  return error_code;
3084  }
3085 
3086  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3087  auto rows_ptr = std::shared_ptr<ResultSet>(
3088  query_exe_context->query_buffers_->result_sets_[0].release());
3089  rows_ptr->fillOneEntry(reduced_outs);
3090  results = std::move(rows_ptr);
3091  return error_code;
3092 }
3093 
3094 namespace {
3095 
3096 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3097  CHECK(scan_limit);
3098  return results && results->rowCount() < scan_limit;
3099 }
3100 
3101 } // namespace
3102 
3104  const RelAlgExecutionUnit& ra_exe_unit,
3105  const CompilationResult& compilation_result,
3106  const bool hoist_literals,
3107  ResultSetPtr& results,
3108  const ExecutorDeviceType device_type,
3109  std::vector<std::vector<const int8_t*>>& col_buffers,
3110  const std::vector<size_t> outer_tab_frag_ids,
3111  QueryExecutionContext* query_exe_context,
3112  const std::vector<std::vector<int64_t>>& num_rows,
3113  const std::vector<std::vector<uint64_t>>& frag_offsets,
3114  Data_Namespace::DataMgr* data_mgr,
3115  const int device_id,
3116  const int outer_table_id,
3117  const int64_t scan_limit,
3118  const uint32_t start_rowid,
3119  const uint32_t num_tables,
3120  const bool allow_runtime_interrupt,
3121  RenderInfo* render_info) {
3122  auto timer = DEBUG_TIMER(__func__);
3123  INJECT_TIMER(executePlanWithGroupBy);
3124  CHECK(!results);
3125  if (col_buffers.empty()) {
3126  return 0;
3127  }
3128  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3129  // TODO(alex):
3130  // 1. Optimize size (make keys more compact).
3131  // 2. Resize on overflow.
3132  // 3. Optimize runtime.
3133  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3134  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3135  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3136  if (allow_runtime_interrupt) {
3137  bool isInterrupted = false;
3138  {
3139  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3140  const auto query_session = getCurrentQuerySession(session_read_lock);
3141  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3142  }
3143  if (isInterrupted) {
3144  throw QueryExecutionError(ERR_INTERRUPTED);
3145  }
3146  }
3147  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3148  return ERR_INTERRUPTED;
3149  }
3150 
3151  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3152  if (render_info && render_info->useCudaBuffers()) {
3153  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3154  }
3155 
3156  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3157  << " ra_exe_unit.input_descs="
3158  << shared::printContainer(ra_exe_unit.input_descs)
3159  << " ra_exe_unit.input_col_descs="
3160  << shared::printContainer(ra_exe_unit.input_col_descs)
3161  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3162  << " num_rows=" << shared::printContainer(num_rows)
3163  << " frag_offsets=" << shared::printContainer(frag_offsets)
3164  << " query_exe_context->query_buffers_->num_rows_="
3165  << query_exe_context->query_buffers_->num_rows_
3166  << " query_exe_context->query_mem_desc_.getEntryCount()="
3167  << query_exe_context->query_mem_desc_.getEntryCount()
3168  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3169  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3170  << " num_tables=" << num_tables;
3171 
3172  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3173  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3174  // with outer_table_id.
3175  if (ra_exe_unit_copy.union_all) {
3176  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3177  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3178  ra_exe_unit_copy.input_descs.end(),
3179  [outer_table_id](auto const& a, auto const& b) {
3180  return a.getTableId() == outer_table_id &&
3181  b.getTableId() != outer_table_id;
3182  });
3183  while (!ra_exe_unit_copy.input_descs.empty() &&
3184  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3185  ra_exe_unit_copy.input_descs.pop_back();
3186  }
3187  // Filter ra_exe_unit_copy.input_col_descs.
3188  ra_exe_unit_copy.input_col_descs.remove_if(
3189  [outer_table_id](auto const& input_col_desc) {
3190  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3191  });
3192  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3193  }
3194 
3195  if (device_type == ExecutorDeviceType::CPU) {
3196  const int32_t scan_limit_for_query =
3197  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3198  const int32_t max_matched = scan_limit_for_query == 0
3199  ? query_exe_context->query_mem_desc_.getEntryCount()
3200  : scan_limit_for_query;
3201 
3202  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3203  compilation_result.generated_code);
3204  CHECK(cpu_generated_code);
3205  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3206  cpu_generated_code.get(),
3207  hoist_literals,
3208  hoist_buf,
3209  col_buffers,
3210  num_rows,
3211  frag_offsets,
3212  max_matched,
3213  &error_code,
3214  num_tables,
3215  join_hash_table_ptrs);
3216  } else {
3217  try {
3218  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3219  compilation_result.generated_code);
3220  CHECK(gpu_generated_code);
3221  query_exe_context->launchGpuCode(
3222  ra_exe_unit_copy,
3223  gpu_generated_code.get(),
3224  hoist_literals,
3225  hoist_buf,
3226  col_buffers,
3227  num_rows,
3228  frag_offsets,
3229  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3230  data_mgr,
3231  blockSize(),
3232  gridSize(),
3233  device_id,
3234  compilation_result.gpu_smem_context.getSharedMemorySize(),
3235  &error_code,
3236  num_tables,
3237  allow_runtime_interrupt,
3238  join_hash_table_ptrs,
3239  render_allocator_map_ptr);
3240  } catch (const OutOfMemory&) {
3241  return ERR_OUT_OF_GPU_MEM;
3242  } catch (const OutOfRenderMemory&) {
3243  return ERR_OUT_OF_RENDER_MEM;
3244  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3245  return ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY;
3246  } catch (const std::exception& e) {
3247  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3248  }
3249  }
3250 
3251  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3252  error_code == Executor::ERR_DIV_BY_ZERO ||
3253  error_code == Executor::ERR_OUT_OF_TIME ||
3254  error_code == Executor::ERR_INTERRUPTED ||
3256  error_code == Executor::ERR_GEOS) {
3257  return error_code;
3258  }
3259 
3260  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3261  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3262  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3263  query_exe_context->query_mem_desc_);
3264  CHECK(results);
3265  VLOG(2) << "results->rowCount()=" << results->rowCount();
3266  results->holdLiterals(hoist_buf);
3267  }
3268  if (error_code < 0 && render_allocator_map_ptr) {
3269  auto const adjusted_scan_limit =
3270  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3271  // More rows passed the filter than available slots. We don't have a count to check,
3272  // so assume we met the limit if a scan limit is set
3273  if (adjusted_scan_limit != 0) {
3274  return 0;
3275  } else {
3276  return error_code;
3277  }
3278  }
3279  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3280  return error_code; // unlucky, not enough results and we ran out of slots
3281  }
3282 
3283  return 0;
3284 }
3285 
3286 std::vector<int64_t> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3287  const int device_id) {
3288  std::vector<int64_t> table_ptrs;
3289  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3290  for (auto hash_table : join_hash_tables) {
3291  if (!hash_table) {
3292  CHECK(table_ptrs.empty());
3293  return {};
3294  }
3295  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3296  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3297  }
3298  return table_ptrs;
3299 }
3300 
3301 void Executor::nukeOldState(const bool allow_lazy_fetch,
3302  const std::vector<InputTableInfo>& query_infos,
3303  const PlanState::DeletedColumnsMap& deleted_cols_map,
3304  const RelAlgExecutionUnit* ra_exe_unit) {
3305  kernel_queue_time_ms_ = 0;
3306  compilation_queue_time_ms_ = 0;
3307  const bool contains_left_deep_outer_join =
3308  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3309  ra_exe_unit->join_quals.end(),
3310  [](const JoinCondition& join_condition) {
3311  return join_condition.type == JoinType::LEFT;
3312  }) != ra_exe_unit->join_quals.end();
3313  cgen_state_.reset(new CgenState(query_infos.size(), contains_left_deep_outer_join));
3314  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3315  query_infos,
3316  deleted_cols_map,
3317  this));
3318 }
3319 
3320 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3321  const std::vector<InputTableInfo>& query_infos) {
3322  AUTOMATIC_IR_METADATA(cgen_state_.get());
3323  const auto ld_count = input_descs.size();
3324  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3325  for (size_t i = 0; i < ld_count; ++i) {
3326  CHECK_LT(i, query_infos.size());
3327  const auto frag_count = query_infos[i].info.fragments.size();
3328  if (i > 0) {
3329  cgen_state_->frag_offsets_.push_back(nullptr);
3330  } else {
3331  if (frag_count > 1) {
3332  cgen_state_->frag_offsets_.push_back(
3333  cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3334  } else {
3335  cgen_state_->frag_offsets_.push_back(nullptr);
3336  }
3337  }
3338  }
3339 }
3340 
3342  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3343  const std::vector<InputTableInfo>& query_infos,
3344  const MemoryLevel memory_level,
3345  const JoinType join_type,
3346  const HashType preferred_hash_type,
3347  ColumnCacheMap& column_cache,
3348  const RegisteredQueryHint& query_hint) {
3349  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3350  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3351  }
3352  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3353  throw QueryExecutionError(ERR_INTERRUPTED);
3354  }
3355  try {
3356  auto tbl = HashJoin::getInstance(qual_bin_oper,
3357  query_infos,
3358  memory_level,
3359  join_type,
3360  preferred_hash_type,
3361  deviceCountForMemoryLevel(memory_level),
3362  column_cache,
3363  this,
3364  query_hint);
3365  return {tbl, ""};
3366  } catch (const HashJoinFail& e) {
3367  return {nullptr, e.what()};
3368  }
3369 }
3370 
3371 int8_t Executor::warpSize() const {
3372  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3373  CHECK(!dev_props.empty());
3374  return dev_props.front().warpSize;
3375 }
3376 
3377 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3378 // exist?
3379 unsigned Executor::gridSize() const {
3380  CHECK(data_mgr_);
3381  const auto cuda_mgr = data_mgr_->getCudaMgr();
3382  if (!cuda_mgr) {
3383  return 0;
3384  }
3385  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3386 }
3387 
3388 unsigned Executor::numBlocksPerMP() const {
3389  return grid_size_x_ ? std::ceil(grid_size_x_ / cudaMgr()->getMinNumMPsForAllDevices())
3390  : 2;
3391 }
3392 
3393 unsigned Executor::blockSize() const {
3394  CHECK(data_mgr_);
3395  const auto cuda_mgr = data_mgr_->getCudaMgr();
3396  if (!cuda_mgr) {
3397  return 0;
3398  }
3399  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3400  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3401 }
3402 
3404  return max_gpu_slab_size_;
3405 }
3406 
3407 int64_t Executor::deviceCycles(int milliseconds) const {
3408  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3409  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3410 }
3411 
3412 llvm::Value* Executor::castToFP(llvm::Value* value,
3413  SQLTypeInfo const& from_ti,
3414  SQLTypeInfo const& to_ti) {
3415  AUTOMATIC_IR_METADATA(cgen_state_.get());
3416  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3417  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3418  llvm::Type* fp_type{nullptr};
3419  switch (to_ti.get_size()) {
3420  case 4:
3421  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3422  break;
3423  case 8:
3424  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3425  break;
3426  default:
3427  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3428  }
3429  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3430  if (from_ti.get_scale()) {
3431  value = cgen_state_->ir_builder_.CreateFDiv(
3432  value,
3433  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3434  }
3435  }
3436  return value;
3437 }
3438 
3439 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3440  AUTOMATIC_IR_METADATA(cgen_state_.get());
3441  CHECK(val->getType()->isPointerTy());
3442 
3443  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3444  const auto val_type = val_ptr_type->getElementType();
3445  size_t val_width = 0;
3446  if (val_type->isIntegerTy()) {
3447  val_width = val_type->getIntegerBitWidth();
3448  } else {
3449  if (val_type->isFloatTy()) {
3450  val_width = 32;
3451  } else {
3452  CHECK(val_type->isDoubleTy());
3453  val_width = 64;
3454  }
3455  }
3456  CHECK_LT(size_t(0), val_width);
3457  if (bitWidth == val_width) {
3458  return val;
3459  }
3460  return cgen_state_->ir_builder_.CreateBitCast(
3461  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3462 }
3463 
3464 #define EXECUTE_INCLUDE
3465 #include "ArrayOps.cpp"
3466 #include "DateAdd.cpp"
3467 #include "GeoOps.cpp"
3468 #include "StringFunctions.cpp"
3470 #undef EXECUTE_INCLUDE
3471 
3472 namespace {
3474  const ColumnDescriptor* deleted_cd) {
3475  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3476  if (deleted_cols_it == deleted_cols_map.end()) {
3477  CHECK(
3478  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3479  } else {
3480  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3481  }
3482 }
3483 } // namespace
3484 
3485 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3486  const RelAlgExecutionUnit& ra_exe_unit,
3487  const CompilationOptions& co) {
3488  if (!co.filter_on_deleted_column) {
3489  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3490  }
3491  auto ra_exe_unit_with_deleted = ra_exe_unit;
3492  PlanState::DeletedColumnsMap deleted_cols_map;
3493  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3494  if (input_table.getSourceType() != InputSourceType::TABLE) {
3495  continue;
3496  }
3497  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3498  CHECK(td);
3499  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3500  if (!deleted_cd) {
3501  continue;
3502  }
3503  CHECK(deleted_cd->columnType.is_boolean());
3504  // check deleted column is not already present
3505  bool found = false;
3506  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3507  if (input_col.get()->getColId() == deleted_cd->columnId &&
3508  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3509  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3510  found = true;
3511  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3512  break;
3513  }
3514  }
3515  if (!found) {
3516  // add deleted column
3517  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3518  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3519  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3520  }
3521  }
3522  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3523 }
3524 
3525 namespace {
3526 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3527 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3528 // return type.
3529 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3530  const int64_t chunk_min,
3531  const int64_t chunk_max,
3532  const SQLTypeInfo& lhs_type,
3533  const SQLTypeInfo& rhs_type) {
3534  const int32_t ldim = lhs_type.get_dimension();
3535  const int32_t rdim = rhs_type.get_dimension();
3536  CHECK(ldim != rdim);
3537  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3538  if (ldim > rdim) {
3539  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3540  return {true, chunk_min / scale, chunk_max / scale};
3541  }
3542 
3543  using checked_int64_t = boost::multiprecision::number<
3544  boost::multiprecision::cpp_int_backend<64,
3545  64,
3546  boost::multiprecision::signed_magnitude,
3547  boost::multiprecision::checked,
3548  void>>;
3549 
3550  try {
3551  auto ret =
3552  std::make_tuple(true,
3553  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3554  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3555  return ret;
3556  } catch (const std::overflow_error& e) {
3557  // noop
3558  }
3559  return std::make_tuple(false, chunk_min, chunk_max);
3560 }
3561 
3562 } // namespace
3563 
3565  const int table_id,
3566  const Fragmenter_Namespace::FragmentInfo& fragment) {
3567  // Skip temporary tables
3568  if (table_id < 0) {
3569  return false;
3570  }
3571 
3572  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
3573  CHECK(td);
3574  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3575  if (!deleted_cd) {
3576  return false;
3577  }
3578 
3579  const auto& chunk_type = deleted_cd->columnType;
3580  CHECK(chunk_type.is_boolean());
3581 
3582  const auto deleted_col_id = deleted_cd->columnId;
3583  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
3584  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
3585  const int64_t chunk_min =
3586  extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3587  const int64_t chunk_max =
3588  extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3589  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
3590  // is true (signifying all rows deleted)
3591  return true;
3592  }
3593  }
3594  return false;
3595 }
3596 
3597 std::pair<bool, int64_t> Executor::skipFragment(
3598  const InputDescriptor& table_desc,
3599  const Fragmenter_Namespace::FragmentInfo& fragment,
3600  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3601  const std::vector<uint64_t>& frag_offsets,
3602  const size_t frag_idx) {
3603  const int table_id = table_desc.getTableId();
3604 
3605  // First check to see if all of fragment is deleted, in which case we know we can skip
3606  if (isFragmentFullyDeleted(table_id, fragment)) {
3607  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
3608  << ", fragment id: " << frag_idx;
3609  return {true, -1};
3610  }
3611 
3612  for (const auto& simple_qual : simple_quals) {
3613  const auto comp_expr =
3614  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3615  if (!comp_expr) {
3616  // is this possible?
3617  return {false, -1};
3618  }
3619  const auto lhs = comp_expr->get_left_operand();
3620  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3621  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3622  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
3623  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
3624  if (lhs_uexpr) {
3625  CHECK(lhs_uexpr->get_optype() ==
3626  kCAST); // We should have only been passed a cast expression
3627  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
3628  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3629  continue;
3630  }
3631  } else {
3632  continue;
3633  }
3634  }
3635  const auto rhs = comp_expr->get_right_operand();
3636  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3637  if (!rhs_const) {
3638  // is this possible?
3639  return {false, -1};
3640  }
3641  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3642  continue;
3643  }
3644  const int col_id = lhs_col->get_column_id();
3645  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
3646  int64_t chunk_min{0};
3647  int64_t chunk_max{0};
3648  bool is_rowid{false};
3649  size_t start_rowid{0};
3650  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
3651  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
3652  if (cd->isVirtualCol) {
3653  CHECK(cd->columnName == "rowid");
3654  const auto& table_generation = getTableGeneration(table_id);
3655  start_rowid = table_generation.start_rowid;
3656  chunk_min = frag_offsets[frag_idx] + start_rowid;
3657  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3658  is_rowid = true;
3659  }
3660  } else {
3661  const auto& chunk_type = lhs_col->get_type_info();
3662  chunk_min = extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3663  chunk_max = extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3664  }
3665  if (chunk_min > chunk_max) {
3666  // invalid metadata range, do not skip fragment
3667  return {false, -1};
3668  }
3669  if (lhs->get_type_info().is_timestamp() &&
3670  (lhs_col->get_type_info().get_dimension() !=
3671  rhs_const->get_type_info().get_dimension()) &&
3672  (lhs_col->get_type_info().is_high_precision_timestamp() ||
3673  rhs_const->get_type_info().is_high_precision_timestamp())) {
3674  // If original timestamp lhs col has different precision,
3675  // column metadata holds value in original precision
3676  // therefore adjust rhs value to match lhs precision
3677 
3678  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
3679  // artificially limit the lhs column range. RHS overflow/underflow is already
3680  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
3681  bool is_valid;
3682  std::tie(is_valid, chunk_min, chunk_max) =
3684  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3685  if (!is_valid) {
3686  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
3687  "value: "
3688  << std::to_string(chunk_min)
3689  << "\nChunk max value: " << std::to_string(chunk_max)
3690  << "\nLHS col precision is: "
3691  << std::to_string(lhs_col->get_type_info().get_dimension())
3692  << "\nRHS precision is: "
3693  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
3694  return {false, -1};
3695  }
3696  }
3697  llvm::LLVMContext local_context;
3698  CgenState local_cgen_state(local_context);
3699  CodeGenerator code_generator(&local_cgen_state, nullptr);
3700 
3701  const auto rhs_val =
3702  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
3703 
3704  switch (comp_expr->get_optype()) {
3705  case kGE:
3706  if (chunk_max < rhs_val) {
3707  return {true, -1};
3708  }
3709  break;
3710  case kGT:
3711  if (chunk_max <= rhs_val) {
3712  return {true, -1};
3713  }
3714  break;
3715  case kLE:
3716  if (chunk_min > rhs_val) {
3717  return {true, -1};
3718  }
3719  break;
3720  case kLT:
3721  if (chunk_min >= rhs_val) {
3722  return {true, -1};
3723  }
3724  break;
3725  case kEQ:
3726  if (chunk_min > rhs_val || chunk_max < rhs_val) {
3727  return {true, -1};
3728  } else if (is_rowid) {
3729  return {false, rhs_val - start_rowid};
3730  }
3731  break;
3732  default:
3733  break;
3734  }
3735  }
3736  return {false, -1};
3737 }
3738 
3739 /*
3740  * The skipFragmentInnerJoins process all quals stored in the execution unit's
3741  * join_quals and gather all the ones that meet the "simple_qual" characteristics
3742  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
3743  * to decide whether the fragment should be skipped or not. The fragment will be skipped
3744  * if at least one of these skipFragment calls return a true statment in its first value.
3745  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
3746  * second value only if its first value is "false".
3747  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
3748  * i.e., we only skip if none of the quals trigger the code to update the
3749  * rowid_lookup_key
3750  * - Only AND operations are valid and considered:
3751  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
3752  * the skip
3753  * - `select * from t1,t2 where (A or B) and C`: only C is considered
3754  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
3755  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
3756  * implemented differently, which cause the first one to skip correctly, but the second
3757  * one will not skip.
3758  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
3759  * possible
3760  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
3761  * projection, no skipping
3762  */
3763 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
3764  const InputDescriptor& table_desc,
3765  const RelAlgExecutionUnit& ra_exe_unit,
3766  const Fragmenter_Namespace::FragmentInfo& fragment,
3767  const std::vector<uint64_t>& frag_offsets,
3768  const size_t frag_idx) {
3769  std::pair<bool, int64_t> skip_frag{false, -1};
3770  for (auto& inner_join : ra_exe_unit.join_quals) {
3771  if (inner_join.type != JoinType::INNER) {
3772  continue;
3773  }
3774 
3775  // extracting all the conjunctive simple_quals from the quals stored for the inner
3776  // join
3777  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3778  for (auto& qual : inner_join.quals) {
3779  auto temp_qual = qual_to_conjunctive_form(qual);
3780  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3781  temp_qual.simple_quals.begin(),
3782  temp_qual.simple_quals.end());
3783  }
3784  auto temp_skip_frag = skipFragment(
3785  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3786  if (temp_skip_frag.second != -1) {
3787  skip_frag.second = temp_skip_frag.second;
3788  return skip_frag;
3789  } else {
3790  skip_frag.first = skip_frag.first || temp_skip_frag.first;
3791  }
3792  }
3793  return skip_frag;
3794 }
3795 
3797  const std::unordered_set<PhysicalInput>& phys_inputs) {
3798  AggregatedColRange agg_col_range_cache;
3799  CHECK(catalog_);
3800  std::unordered_set<int> phys_table_ids;
3801  for (const auto& phys_input : phys_inputs) {
3802  phys_table_ids.insert(phys_input.table_id);
3803  }
3804  std::vector<InputTableInfo> query_infos;
3805  for (const int table_id : phys_table_ids) {
3806  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3807  }
3808  for (const auto& phys_input : phys_inputs) {
3809  const auto cd =
3810  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3811  CHECK(cd);
3812  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3813  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3814  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3815  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3816  agg_col_range_cache.setColRange(phys_input, col_range);
3817  }
3818  }
3819  return agg_col_range_cache;
3820 }
3821 
3823  const std::unordered_set<PhysicalInput>& phys_inputs) {
3824  StringDictionaryGenerations string_dictionary_generations;
3825  CHECK(catalog_);
3826  for (const auto& phys_input : phys_inputs) {
3827  const auto cd =
3828  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3829  CHECK(cd);
3830  const auto& col_ti =
3831  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3832  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3833  const int dict_id = col_ti.get_comp_param();
3834  const auto dd = catalog_->getMetadataForDict(dict_id);
3835  CHECK(dd && dd->stringDict);
3836  string_dictionary_generations.setGeneration(dict_id,
3837  dd->stringDict->storageEntryCount());
3838  }
3839  }
3840  return string_dictionary_generations;
3841 }
3842 
3844  std::unordered_set<int> phys_table_ids) {
3845  TableGenerations table_generations;
3846  for (const int table_id : phys_table_ids) {
3847  const auto table_info = getTableInfo(table_id);
3848  table_generations.setGeneration(
3849  table_id,
3850  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3851  }
3852  return table_generations;
3853 }
3854 
3855 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
3856  const std::unordered_set<int>& phys_table_ids) {
3857  CHECK(catalog_);
3858  row_set_mem_owner_ =
3859  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
3860  row_set_mem_owner_->setDictionaryGenerations(
3861  computeStringDictionaryGenerations(phys_inputs));
3862  agg_col_range_cache_ = computeColRangesCache(phys_inputs);
3863  table_generations_ = computeTableGenerations(phys_table_ids);
3864 }
3865 
3867  return recycler_mutex_;
3868 }
3869 
3871  return query_plan_dag_cache_;
3872 }
3873 
3875  JoinColumnSide target_side,
3876  bool extract_only_col_id) {
3877  return query_plan_dag_cache_.getJoinColumnsInfoString(
3878  join_expr, target_side, extract_only_col_id);
3879 }
3880 
3882  return executor_session_mutex_;
3883 }
3884 
3886  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3887  return current_query_session_;
3888 }
3889 
3891  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3892  if (!query_session.empty()) {
3893  current_query_session_ = query_session;
3894  }
3895 }
3896 
3897 void Executor::setRunningExecutorId(const size_t id,
3898  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3899  running_query_executor_id_ = id;
3900 }
3901 
3902 size_t Executor::getRunningExecutorId(mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3903  return running_query_executor_id_;
3904 }
3905 
3906 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
3907  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
3908  // if current_query_session is equal to the candidate_query_session,
3909  // or it is empty session we consider
3910  return !candidate_query_session.empty() &&
3911  (current_query_session_ == candidate_query_session);
3912 }
3913 
3915  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
3916  current_query_session_ = "";
3917  running_query_executor_id_ = Executor::UNITARY_EXECUTOR_ID;
3918 }
3919 
3921  const QuerySessionId& query_session_id,
3922  const std::string& query_str,
3923  const std::string& query_submitted_time) {
3924  if (!query_session_id.empty()) {
3925  // if session is valid, do update 1) the exact executor id and 2) query status
3926  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3927  updateQuerySessionExecutorAssignment(
3928  query_session_id, query_submitted_time, executor_id_, write_lock);
3929  updateQuerySessionStatusWithLock(query_session_id,
3930  query_submitted_time,
3931  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3932  write_lock);
3933  }
3934  return {query_session_id, query_str};
3935 }
3936 
3938  // check whether we are okay to execute the "pending" query
3939  // i.e., before running the query check if this query session is "ALREADY" interrupted
3940  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3941  if (query_session.empty()) {
3942  return;
3943  }
3944  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3945  // something goes wrong since we assume this is caller's responsibility
3946  // (call this function only for enrolled query session)
3947  if (!queries_session_map_.count(query_session)) {
3948  VLOG(1) << "Interrupting pending query is not available since the query session is "
3949  "not enrolled";
3950  } else {
3951  // here the query session is enrolled but the interrupt flag is not registered
3952  VLOG(1)
3953  << "Interrupting pending query is not available since its interrupt flag is "
3954  "not registered";
3955  }
3956  return;
3957  }
3958  if (queries_interrupt_flag_[query_session]) {
3960  }
3961 }
3962 
3964  const std::string& submitted_time_str,
3965  const bool acquire_spin_lock) {
3966  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3967  // clear the interrupt-related info for a finished query
3968  if (query_session.empty()) {
3969  return;
3970  }
3971  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
3972  if (query_session.compare(current_query_session_) == 0 &&
3973  running_query_executor_id_ == executor_id_) {
3974  invalidateRunningQuerySession(session_write_lock);
3975  if (acquire_spin_lock) {
3976  // try to unlock executor's internal spin lock (let say "L") iff it is acquired
3977  // otherwise we do not need to care about the "L" lock
3978  // i.e., import table does not have a code path towards Executor
3979  // so we just exploit executor's session management code and also global interrupt
3980  // flag excepting this "L" lock
3981  execute_spin_lock_.clear(std::memory_order_release);
3982  }
3983  resetInterrupt();
3984  }
3985 }
3986 
3988  std::shared_ptr<const query_state::QueryState>& query_state,
3989  const QuerySessionStatus::QueryStatus new_query_status) {
3990  // update the running query session's the current status
3991  if (query_state) {
3992  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3993  auto query_session = query_state->getConstSessionInfo()->get_session_id();
3994  if (query_session.empty()) {
3995  return;
3996  }
3997  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3998  current_query_session_ = query_session;
3999  running_query_executor_id_ = executor_id_;
4000  }
4001  updateQuerySessionStatusWithLock(query_session,
4002  query_state->getQuerySubmittedTime(),
4003  new_query_status,
4004  session_write_lock);
4005  }
4006 }
4007 
4009  const QuerySessionId& query_session,
4010  const std::string& submitted_time_str,
4011  const QuerySessionStatus::QueryStatus new_query_status) {
4012  // update the running query session's the current status
4013  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4014  if (query_session.empty()) {
4015  return;
4016  }
4017  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
4018  current_query_session_ = query_session;
4019  running_query_executor_id_ = executor_id_;
4020  }
4021  updateQuerySessionStatusWithLock(
4022  query_session, submitted_time_str, new_query_status, session_write_lock);
4023 }
4024 
4026  const QuerySessionId& query_session,
4027  const std::string& query_str,
4028  const std::string& submitted_time_str,
4029  const size_t executor_id,
4030  const QuerySessionStatus::QueryStatus query_session_status) {
4031  // enroll the query session into the Executor's session map
4032  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4033  if (query_session.empty()) {
4034  return;
4035  }
4036 
4037  addToQuerySessionList(query_session,
4038  query_str,
4039  submitted_time_str,
4040  executor_id,
4041  query_session_status,
4042  session_write_lock);
4043 
4044  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING) {
4045  current_query_session_ = query_session;
4046  running_query_executor_id_ = executor_id_;
4047  }
4048 }
4049 
4051  const std::string& query_str,
4052  const std::string& submitted_time_str,
4053  const size_t executor_id,
4054  const QuerySessionStatus::QueryStatus query_status,
4055  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4056  // an internal API that enrolls the query session into the Executor's session map
4057  if (queries_session_map_.count(query_session)) {
4058  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4059  queries_session_map_.at(query_session).erase(submitted_time_str);
4060  queries_session_map_.at(query_session)
4061  .emplace(submitted_time_str,
4062  QuerySessionStatus(query_session,
4063  executor_id,
4064  query_str,
4065  submitted_time_str,
4066  query_status));
4067  } else {
4068  queries_session_map_.at(query_session)
4069  .emplace(submitted_time_str,
4070  QuerySessionStatus(query_session,
4071  executor_id,
4072  query_str,
4073  submitted_time_str,
4074  query_status));
4075  }
4076  } else {
4077  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4078  executor_per_query_map.emplace(
4079  submitted_time_str,
4081  query_session, executor_id, query_str, submitted_time_str, query_status));
4082  queries_session_map_.emplace(query_session, executor_per_query_map);
4083  }
4084  return queries_interrupt_flag_.emplace(query_session, false).second;
4085 }
4086 
4088  const QuerySessionId& query_session,
4089  const std::string& submitted_time_str,
4090  const QuerySessionStatus::QueryStatus updated_query_status,
4091  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4092  // an internal API that updates query session status
4093  if (query_session.empty()) {
4094  return false;
4095  }
4096  if (queries_session_map_.count(query_session)) {
4097  for (auto& query_status : queries_session_map_.at(query_session)) {
4098  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4099  // no time difference --> found the target query status
4100  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4101  auto prev_status = query_status.second.getQueryStatus();
4102  if (prev_status == updated_query_status) {
4103  return false;
4104  }
4105  query_status.second.setQueryStatus(updated_query_status);
4106  return true;
4107  }
4108  }
4109  }
4110  return false;
4111 }
4112 
4114  const QuerySessionId& query_session,
4115  const std::string& submitted_time_str,
4116  const size_t executor_id,
4117  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4118  // update the executor id of the query session
4119  if (query_session.empty()) {
4120  return false;
4121  }
4122  if (queries_session_map_.count(query_session)) {
4123  auto storage = queries_session_map_.at(query_session);
4124  for (auto it = storage.begin(); it != storage.end(); it++) {
4125  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4126  // no time difference --> found the target query status
4127  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4128  queries_session_map_.at(query_session)
4129  .at(submitted_time_str)
4130  .setExecutorId(executor_id);
4131  return true;
4132  }
4133  }
4134  }
4135  return false;
4136 }
4137 
4139  const QuerySessionId& query_session,
4140  const std::string& submitted_time_str,
4141  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4142  if (query_session.empty()) {
4143  return false;
4144  }
4145  if (queries_session_map_.count(query_session)) {
4146  auto& storage = queries_session_map_.at(query_session);
4147  if (storage.size() > 1) {
4148  // in this case we only remove query executor info
4149  for (auto it = storage.begin(); it != storage.end(); it++) {
4150  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4151  // no time difference && have the same executor id--> found the target query
4152  if (it->second.getExecutorId() == executor_id_ &&
4153  submitted_time_str.compare(target_submitted_t_str) == 0) {
4154  storage.erase(it);
4155  return true;
4156  }
4157  }
4158  } else if (storage.size() == 1) {
4159  // here this session only has a single query executor
4160  // so we clear both executor info and its interrupt flag
4161  queries_session_map_.erase(query_session);
4162  queries_interrupt_flag_.erase(query_session);
4163  if (interrupted_.load()) {
4164  interrupted_.store(false);
4165  VLOG(1) << "RESET Executor " << this << " that had previously been interrupted";
4166  }
4167  return true;
4168  }
4169  }
4170  return false;
4171 }
4172 
4174  const QuerySessionId& query_session,
4175  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4176  if (query_session.empty()) {
4177  return;
4178  }
4179  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4180  queries_interrupt_flag_[query_session] = true;
4181  }
4182 }
4183 
4185  const std::string& query_session,
4186  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4187  if (query_session.empty()) {
4188  return;
4189  }
4190  queries_interrupt_flag_[query_session] = false;
4191 }
4192 
4194  const QuerySessionId& query_session,
4195  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4196  if (query_session.empty()) {
4197  return false;
4198  }
4199  auto flag_it = queries_interrupt_flag_.find(query_session);
4200  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4201  flag_it->second;
4202 }
4203 
4205  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4206  return checkIsQuerySessionInterrupted(current_query_session_, session_read_lock);
4207 }
4208 
4210  const QuerySessionId& query_session,
4211  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4212  if (query_session.empty()) {
4213  return false;
4214  }
4215  return !query_session.empty() && queries_session_map_.count(query_session);
4216 }
4217 
4219  const double runtime_query_check_freq,
4220  const unsigned pending_query_check_freq) const {
4221  // The only one scenario that we intentionally call this function is
4222  // to allow runtime query interrupt in QueryRunner for test cases.
4223  // Because test machine's default setting does not allow runtime query interrupt,
4224  // so we have to turn it on within test code if necessary.
4226  g_pending_query_interrupt_freq = pending_query_check_freq;
4227  g_running_query_interrupt_freq = runtime_query_check_freq;
4230  }
4231 }
4232 
4233 void Executor::addToCardinalityCache(const std::string& cache_key,
4234  const size_t cache_value) {
4236  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4237  cardinality_cache_[cache_key] = cache_value;
4238  VLOG(1) << "Put estimated cardinality to the cache";
4239  }
4240 }
4241 
4243  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4245  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4246  VLOG(1) << "Reuse cached cardinality";
4247  return {true, cardinality_cache_[cache_key]};
4248  }
4249  return {false, -1};
4250 }
4251 
4252 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4253  const QuerySessionId& query_session,
4254  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4255  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4256  auto& query_infos = queries_session_map_.at(query_session);
4257  std::vector<QuerySessionStatus> ret;
4258  for (auto& info : query_infos) {
4259  ret.push_back(QuerySessionStatus(query_session,
4260  info.second.getExecutorId(),
4261  info.second.getQueryStr(),
4262  info.second.getQuerySubmittedTime(),
4263  info.second.getQueryStatus()));
4264  }
4265  return ret;
4266  }
4267  return {};
4268 }
4269 
4270 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4271 
4272 std::atomic_flag Executor::execute_spin_lock_ = ATOMIC_FLAG_INIT;
4273 // current running query's session ID
4274 std::string Executor::current_query_session_{""};
4275 // running executor's id
4277 // contain the interrupt flag's status per query session
4279 // contain a list of queries per query session
4281 // session lock
4283 
4286 
4289 void* Executor::gpu_active_modules_[max_gpu_count];
4290 std::atomic<bool> Executor::interrupted_{false};
4291 
4292 std::mutex Executor::compilation_mutex_;
4293 std::mutex Executor::kernel_mutex_;
4294 
4297 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4087
int get_table_id() const
Definition: Analyzer.h:194
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:208
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3885
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1601
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1094
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3796
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4218
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:71
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2098
bool g_enable_left_join_filter_hoisting
Definition: Execute.cpp:93
void resetQuerySessionInterruptFlag(const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4184
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const RegisteredQueryHint &query_hint)
Definition: Execute.cpp:3341
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
std::string JoinColumnsInfo
bool g_enable_smem_group_by
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
Definition: Execute.cpp:89
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2361
static QuerySessionMap queries_session_map_
Definition: Execute.h:1102
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1109
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3403
size_t g_constrained_by_in_threshold
Definition: Execute.cpp:100
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:3103
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
ExecutorDeviceType getDeviceType() const
double g_bump_allocator_step_reduction
Definition: Execute.cpp:110
std::string cat(Ts &&...args)
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3914
block_size_x_(block_size_x)
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:3937
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1223
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:165
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1161
grid_size_x_(grid_size_x)
bool g_strip_join_covered_quals
Definition: Execute.cpp:99
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:293
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2257
bool g_enable_direct_columnarization
Definition: Execute.cpp:111
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3529
ExecutorDeviceType
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:1095
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:1684
std::string toString() const
bool g_enable_lazy_fetch
Definition: Execute.cpp:113
static const int max_gpu_count
Definition: Execute.h:1052
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:2878
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
int64_t float_to_double_bin(int32_t val, bool nullable=false)
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:151
#define LOG(tag)
Definition: Logger.h:200
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
void freeLinearizedBuf()
std::vector< size_t > outer_fragment_indices
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1105
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_fp() const
Definition: sqltypes.h:502
HOST DEVICE int get_scale() const
Definition: sqltypes.h:328
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:81
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1126
Definition: sqldefs.h:35
int64_t getGeneration(const uint32_t id) const
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:343
JoinColumnSide
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:384
Definition: sqldefs.h:36
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1039
tuple r
Definition: test_fsi.py:16
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3485
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:3843
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:250
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:219
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:667
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Functions to support geospatial operations used by the executor.
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:327
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3906
void setRunningExecutorId(const size_t id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3897
size_t g_filter_push_down_passing_row_ubound
Definition: Execute.cpp:91
static const int32_t ERR_GEOS
Definition: Execute.h:1148
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1057
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:162
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:1810
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:76
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4025
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1065
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:82
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1056
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2837
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:652
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:255
bool g_is_test_env
Definition: Execute.cpp:130
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3439
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:313
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1151
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3388
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:503
#define CHECK_GT(x, y)
Definition: Logger.h:218
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:263
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4252
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:1726
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2001
std::vector< FragmentsPerTable > FragmentsList
bool is_time() const
Definition: sqltypes.h:504
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2438
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:371
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4193
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:3881
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:95
gpu_code_cache_(code_cache_size)
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:192
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:84
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:301
cpu_code_cache_(code_cache_size)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:461
bool g_enable_smem_non_grouped_agg
Definition: Execute.cpp:127
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Definition: Execute.cpp:863
Definition: sqldefs.h:73
std::unique_ptr< QueryMemoryInitializer > query_buffers_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:167
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3286
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3320
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:3564
bool g_null_div_by_zero
Definition: Execute.cpp:81
bool checkIsRunningQuerySessionInterrupted()
Definition: Execute.cpp:4204
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4209
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2614
This file contains the class specification and related data structures for Catalog.
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:194
int8_t warpSize() const
Definition: Execute.cpp:3371
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:82
const size_t limit
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1012
bool g_enable_columnar_output
Definition: Execute.cpp:92
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1907
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:274
int8_t groupColWidth(const size_t key_idx) const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1134
static SysCatalog & instance()
Definition: SysCatalog.h:293
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4050
bool g_from_table_reordering
Definition: Execute.cpp:83
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:166
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
Definition: Execute.cpp:96
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
int getDeviceCount() const
Definition: CudaMgr.h:86
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1053
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3407
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:282
bool is_integer() const
Definition: sqltypes.h:500
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:402
#define CHECK_NE(x, y)
Definition: Logger.h:215
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4173
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:232
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:905
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1186
std::shared_timed_mutex mapd_shared_mutex
SortAlgorithm
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:2022
CachedCardinality getCachedCardinality(const std::string &cache_key)
Definition: Execute.cpp:4242
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:289
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2370
ExecutorExplainType explain_type
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:404
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:1795
size_t g_big_group_threshold
Definition: Execute.cpp:102
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1140
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1131
float g_filter_push_down_high_frac
Definition: Execute.cpp:90
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1100
int getTableId() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1141
bool g_bigint_count
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
Definition: sqldefs.h:75
const TableGeneration & getTableGeneration(const int table_id) const
Definition: Execute.cpp:305
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:3412
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:1008
size_t g_max_memory_allocation_size
Definition: Execute.cpp:105
bool is_dict_encoded_type() const
Definition: sqltypes.h:543
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:153
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:98
size_t g_approx_quantile_buffer
Definition: Execute.cpp:135
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1115
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:268
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1522
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:505
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:97
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1147
StringDictionaryGenerations string_dictionary_generations_
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1104
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:399
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1326
QueryDescriptionType getQueryDescriptionType() const
void freeTemporaryCpuLinearizedIdxBuf()
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:285
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
executor_id_(executor_id)
bool g_enable_window_functions
Definition: Execute.cpp:103
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4113
virtual ReductionCode codegen() const
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:146
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
Definition: sqldefs.h:34
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1208
InputSourceType getSourceType() const
size_t g_min_memory_allocation_size
Definition: Execute.cpp:106
#define CHECK_LT(x, y)
Definition: Logger.h:216
Definition: sqltypes.h:51
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:877
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:255
static QuerySessionId current_query_session_
Definition: Execute.h:1096
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:89
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2792
size_t ExecutorId
Definition: Execute.h:366
#define CHECK_LE(x, y)
Definition: Logger.h:217
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:390
mapd_shared_mutex & getDataRecyclerLock()
Definition: Execute.cpp:3866
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
Speculative top N algorithm.
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
Definition: Execute.cpp:124
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3597
bool g_enable_experimental_string_functions
bool g_enable_automatic_ir_metadata
Definition: Execute.cpp:138
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3379
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:325
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2389
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1951
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:3822
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:78
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:2457
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:491
void updateQuerySessionStatus(std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:3987
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:2890
bool g_cache_string_hash
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3301
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3096
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:3763
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2806
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:101
data_mgr_(data_mgr)
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1865
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:555
ThreadId thread_id()
Definition: Logger.cpp:741
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1107
bool g_enable_filter_push_down
Definition: Execute.cpp:88
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:116
bool g_enable_bump_allocator
Definition: Execute.cpp:109
size_t getRunningExecutorId(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3902
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4138
static std::vector< InnerOuter > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:682
std::string QuerySessionId
Definition: Execute.h:80
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:952
RegisteredQueryHint query_hint
void set_notnull(bool n)
Definition: sqltypes.h:420
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
Definition: Execute.cpp:4233
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
constexpr char const * EMPTY_QUERY_PLAN
static std::atomic< bool > interrupted_
Definition: Execute.h:1058
#define CHECK(condition)
Definition: Logger.h:206
QueryPlanDagCache & getQueryPlanDagCache()
Definition: Execute.cpp:3870
#define DEBUG_TIMER(name)
Definition: Logger.h:322
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
bool gpusPresent() const
Definition: DataMgr.h:203
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const bool acquire_spin_lock)
Definition: Execute.cpp:3963
bool g_cluster
static std::mutex compilation_mutex_
Definition: Execute.h:1150
bool g_allow_cpu_retry
Definition: Execute.cpp:80
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
Definition: sqldefs.h:33
size_t g_approx_quantile_centroids
Definition: Execute.cpp:136
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:2480
static size_t running_query_executor_id_
Definition: Execute.h:1098
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1415
JoinColumnsInfo getJoinColumnsInfo(const Analyzer::Expr *join_expr, JoinColumnSide target_side, bool extract_only_col_id)
Definition: Execute.cpp:3874
static bool typeSupportsRange(const SQLTypeInfo &ti)
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2072
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1774
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:937
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:309
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
Definition: Execute.cpp:3920
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
bool g_optimize_row_initialization
Definition: Execute.cpp:94
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:96
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
debug_file_(debug_file)
int get_column_id() const
Definition: Analyzer.h:195
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:498
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
unsigned blockSize() const
Definition: Execute.cpp:3393
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
std::unique_ptr< ResultSet > estimator_result_set_
const size_t offset
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:79
Definition: sqldefs.h:74
int cpu_threads()
Definition: thread_count.h:24
bool g_use_tbb_pool
Definition: Execute.cpp:77
bool is_decimal() const
Definition: sqltypes.h:501
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:728
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3473
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:660
void setCurrentQuerySession(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3890
temporary_tables_(nullptr)
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:72
size_t getColOffInBytes(const size_t col_idx) const
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
Definition: Execute.cpp:217
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1348
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2087
int getNestLevel() const
HashType
Definition: HashTable.h:19
const InputDescriptor & getScanDesc() const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:245
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:114
size_t g_enable_parallel_linearization
Definition: Execute.cpp:132
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1130
bool is_array() const
Definition: sqltypes.h:506
#define VLOG(n)
Definition: Logger.h:300
Type timer_start()
Definition: measure.h:42
static QueryPlanDagCache query_plan_dag_cache_
Definition: Execute.h:1128
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1055
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
Definition: Execute.cpp:3855
void clearMetaInfoCache()
Definition: Execute.cpp:384
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191
bool g_enable_table_functions
Definition: Execute.cpp:104
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
HashTableBuildDagMap hash_table_build_plan_dag
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:367
size_t g_gpu_smem_threshold
Definition: Execute.cpp:119
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2299
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1722
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)