OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
42 #include "Parser/ParserNode.h"
72 #include "Shared/checked_alloc.h"
73 #include "Shared/measure.h"
74 #include "Shared/misc.h"
75 #include "Shared/scope.h"
76 #include "Shared/shard_key.h"
77 #include "Shared/threading.h"
78 
79 bool g_enable_watchdog{false};
83 size_t g_cpu_sub_task_size{500'000};
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118  256}; // minimum memory allocation required for projection query output buffer
119  // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131  4096}; // GPU shared memory threshold (in bytes), if larger
132  // buffer sizes are required we do not use GPU shared
133  // memory optimizations Setting this to 0 means unlimited
134  // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136  true}; // enable use of shared memory when performing group-by with select non-count
137  // aggregates
138 bool g_enable_smem_non_grouped_agg{
139  true}; // enable optimizations for using GPU shared memory in implementation of
140  // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142  // limits the allocation for the output buffer arena
143  // and data recycler test
144 size_t g_enable_parallel_linearization{
145  10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
157 
158 size_t g_approx_quantile_buffer{1000};
159 size_t g_approx_quantile_centroids{300};
160 
161 bool g_enable_automatic_ir_metadata{true};
162 
163 size_t g_max_log_length{500};
164 
165 extern bool g_cache_string_hash;
166 
167 int const Executor::max_gpu_count;
168 
169 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
170 
171 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
172 
173 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
174  const std::string& udf_ir_filename,
175  llvm::LLVMContext& ctx);
176 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
177  const std::string& udf_ir_filename,
178  llvm::LLVMContext& ctx,
179  bool is_gpu = false);
180 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
181  const std::string& udf_ir_string,
182  llvm::LLVMContext& ctx,
183  bool is_gpu = false);
184 
185 CodeCacheAccessor<CpuCompilationContext> Executor::s_stubs_accessor(
186  Executor::code_cache_size,
187  "s_stubs_cache");
188 CodeCacheAccessor<CpuCompilationContext> Executor::s_code_accessor(
189  Executor::code_cache_size,
190  "s_code_cache");
191 CodeCacheAccessor<CpuCompilationContext> Executor::cpu_code_accessor(
192  Executor::code_cache_size,
193  "cpu_code_cache");
194 CodeCacheAccessor<GpuCompilationContext> Executor::gpu_code_accessor(
195  Executor::code_cache_size,
196  "gpu_code_cache");
197 CodeCacheAccessor<CompilationContext> Executor::tf_code_accessor(
198  Executor::code_cache_size,
199  "tf_code_cache");
200 
201 namespace {
202 // This function is notably different from that in RelAlgExecutor because it already
203 // expects SPI values and therefore needs to avoid that transformation.
204 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs,
205  const Catalog_Namespace::Catalog& catalog) {
206  for (const auto [col_id, table_id] : phys_inputs) {
207  foreign_storage::populate_string_dictionary(table_id, col_id, catalog);
208  }
209 }
210 
211 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
212  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
213  // The fragmenter always returns at least one fragment, even when the table is empty.
214  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
215 }
216 } // namespace
217 
218 namespace foreign_storage {
219 // Foreign tables skip the population of dictionaries during metadata scan. This function
220 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
221 void populate_string_dictionary(const int32_t table_id,
222  const int32_t col_id,
223  const Catalog_Namespace::Catalog& catalog) {
224  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
225  catalog.getMetadataForTable(table_id, false))) {
226  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
227  if (col_desc->columnType.is_dict_encoded_type()) {
228  auto& fragmenter = foreign_table->fragmenter;
229  CHECK(fragmenter != nullptr);
230  if (is_empty_table(fragmenter.get())) {
231  return;
232  }
233  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
234  ChunkKey chunk_key = {catalog.getDatabaseId(), table_id, col_id, frag.fragmentId};
235  // If the key is sharded across leaves, only populate fragments that are sharded
236  // to this leaf.
237  if (key_does_not_shard_to_leaf(chunk_key)) {
238  continue;
239  }
240 
241  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
242  CHECK(metadata_map.find(col_id) != metadata_map.end());
243  if (is_metadata_placeholder(*(metadata_map.at(col_id)))) {
244  // When this goes out of scope it will stay in CPU cache but become
245  // evictable
246  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
247  &(catalog.getDataMgr()),
248  chunk_key,
250  0,
251  0,
252  0);
253  }
254  }
255  }
256  }
257 }
258 } // namespace foreign_storage
259 
260 Executor::Executor(const ExecutorId executor_id,
261  Data_Namespace::DataMgr* data_mgr,
262  const size_t block_size_x,
263  const size_t grid_size_x,
264  const size_t max_gpu_slab_size,
265  const std::string& debug_dir,
266  const std::string& debug_file)
267  : executor_id_(executor_id)
268  , context_(new llvm::LLVMContext())
269  , cgen_state_(new CgenState({}, false, this))
270  , block_size_x_(block_size_x)
271  , grid_size_x_(grid_size_x)
272  , max_gpu_slab_size_(max_gpu_slab_size)
273  , debug_dir_(debug_dir)
274  , debug_file_(debug_file)
275  , catalog_(nullptr)
276  , data_mgr_(data_mgr)
277  , temporary_tables_(nullptr)
281  update_extension_modules();
282 }
283 
288  auto root_path = heavyai::get_root_abs_path();
289  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
290  CHECK(boost::filesystem::exists(template_path));
292  template_path;
293 #ifdef ENABLE_GEOS
294  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
295  CHECK(boost::filesystem::exists(rt_geos_path));
297  rt_geos_path;
298 #endif
299 #ifdef HAVE_CUDA
300  auto rt_libdevice_path = get_cuda_home() + "/nvvm/libdevice/libdevice.10.bc";
301  if (boost::filesystem::exists(rt_libdevice_path)) {
303  rt_libdevice_path;
304  } else {
305  LOG(WARNING) << "File " << rt_libdevice_path
306  << " does not exist; support for some UDF "
307  "functions might not be available.";
308  }
309 #endif
310  }
311 }
312 
313 void Executor::reset(bool discard_runtime_modules_only) {
314  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
315  s_code_accessor.clear();
316  s_stubs_accessor.clear();
317  cpu_code_accessor.clear();
318  gpu_code_accessor.clear();
319  tf_code_accessor.clear();
320 
321  if (discard_runtime_modules_only) {
322  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
323 #ifdef HAVE_CUDA
324  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
325 #endif
326  cgen_state_->module_ = nullptr;
327  } else {
328  extension_modules_.clear();
329  cgen_state_.reset();
330  context_.reset(new llvm::LLVMContext());
331  cgen_state_.reset(new CgenState({}, false, this));
332  }
333 }
334 
335 void Executor::update_extension_modules(bool update_runtime_modules_only) {
336  auto read_module = [&](Executor::ExtModuleKinds module_kind,
337  const std::string& source) {
338  /*
339  source can be either a filename of a LLVM IR
340  or LLVM BC source, or a string containing
341  LLVM IR code.
342  */
343  CHECK(!source.empty());
344  switch (module_kind) {
348  return read_llvm_module_from_bc_file(source, getContext());
349  }
351  return read_llvm_module_from_ir_file(source, getContext(), false);
352  }
354  return read_llvm_module_from_ir_file(source, getContext(), true);
355  }
357  return read_llvm_module_from_ir_string(source, getContext(), false);
358  }
360  return read_llvm_module_from_ir_string(source, getContext(), true);
361  }
362  default: {
363  UNREACHABLE();
364  return std::unique_ptr<llvm::Module>();
365  }
366  }
367  };
368  auto update_module = [&](Executor::ExtModuleKinds module_kind,
369  bool erase_not_found = false) {
370  auto it = Executor::extension_module_sources.find(module_kind);
371  if (it != Executor::extension_module_sources.end()) {
372  auto llvm_module = read_module(module_kind, it->second);
373  if (llvm_module) {
374  extension_modules_[module_kind] = std::move(llvm_module);
375  } else if (erase_not_found) {
376  extension_modules_.erase(module_kind);
377  } else {
378  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
379  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
380  << " LLVM module. The module will be unavailable.";
381  } else {
382  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
383  << " LLVM module. Using the existing module.";
384  }
385  }
386  } else {
387  if (erase_not_found) {
388  extension_modules_.erase(module_kind);
389  } else {
390  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
391  LOG(WARNING) << "Source of " << ::toString(module_kind)
392  << " LLVM module is unavailable. The module will be unavailable.";
393  } else {
394  LOG(WARNING) << "Source of " << ::toString(module_kind)
395  << " LLVM module is unavailable. Using the existing module.";
396  }
397  }
398  }
399  };
400 
401  if (!update_runtime_modules_only) {
402  // required compile-time modules, their requirements are enforced
403  // by Executor::initialize_extension_module_sources():
405 #ifdef ENABLE_GEOS
407 #endif
408  // load-time modules, these are optional:
409  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
410 #ifdef HAVE_CUDA
411  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
413 #endif
414  }
415  // run-time modules, these are optional and erasable:
416  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
417 #ifdef HAVE_CUDA
418  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
419 #endif
420 }
421 
422 // Used by StubGenerator::generateStub
424  : executor_(executor)
425  , lock_queue_clock_(timer_start())
426  , lock_(executor_.compilation_mutex_)
427  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
428 {
429  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
430  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
431 }
432 
434  Executor& executor,
435  const bool allow_lazy_fetch,
436  const std::vector<InputTableInfo>& query_infos,
437  const PlanState::DeletedColumnsMap& deleted_cols_map,
438  const RelAlgExecutionUnit* ra_exe_unit)
439  : executor_(executor)
440  , lock_queue_clock_(timer_start())
441  , lock_(executor_.compilation_mutex_)
442  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
443 {
444  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
445  // nukeOldState creates new CgenState and PlanState instances for
446  // the subsequent code generation. It also resets
447  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
448  // not currently restore.. should we accumulate these timings?
449  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
450 }
451 
453  // prevent memory leak from hoisted literals
454  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
455  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
456  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
457  // The llvm::Value instance stored in p.first is created by the
458  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
459  p.first->deleteValue();
460  }
461  }
462  executor_.cgen_state_->row_func_hoisted_literals_.clear();
463 
464  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
465  // to the old CgenState instance as the execution of the generated
466  // code uses these bitmaps
467 
468  for (auto& str_dict_translation_mgr :
469  executor_.cgen_state_->str_dict_translation_mgrs_) {
470  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
471  }
472  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
473 
474  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
475  cgen_state_->moveInValuesBitmap(bm);
476  }
477  executor_.cgen_state_->in_values_bitmaps_.clear();
478 
479  // restore the old CgenState instance
480  executor_.cgen_state_.reset(cgen_state_.release());
481 }
482 
483 std::shared_ptr<Executor> Executor::getExecutor(
484  const ExecutorId executor_id,
485  const std::string& debug_dir,
486  const std::string& debug_file,
487  const SystemParameters& system_parameters) {
489 
490  mapd_unique_lock<mapd_shared_mutex> write_lock(executors_cache_mutex_);
491  auto it = executors_.find(executor_id);
492  if (it != executors_.end()) {
493  return it->second;
494  }
496  auto executor = std::make_shared<Executor>(executor_id,
497  &data_mgr,
498  system_parameters.cuda_block_size,
499  system_parameters.cuda_grid_size,
500  system_parameters.max_gpu_slab_size,
501  debug_dir,
502  debug_file);
503  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
504  return executor;
505 }
506 
508  switch (memory_level) {
511  mapd_unique_lock<mapd_shared_mutex> flush_lock(
512  execute_mutex_); // Don't flush memory while queries are running
513 
514  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
515  // The hash table cache uses CPU memory not managed by the buffer manager. In the
516  // future, we should manage these allocations with the buffer manager directly.
517  // For now, assume the user wants to purge the hash table cache when they clear
518  // CPU memory (currently used in ExecuteTest to lower memory pressure)
520  }
523  break;
524  }
525  default: {
526  throw std::runtime_error(
527  "Clearing memory levels other than the CPU level or GPU level is not "
528  "supported.");
529  }
530  }
531 }
532 
534  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
535 }
536 
538  const int dict_id_in,
539  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
540  const bool with_generation) const {
541  CHECK(row_set_mem_owner);
542  std::lock_guard<std::mutex> lock(
543  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
544  return row_set_mem_owner->getOrAddStringDictProxy(
545  dict_id_in, with_generation, catalog_);
546 }
547 
549  const int dict_id_in,
550  const bool with_generation,
551  const Catalog_Namespace::Catalog* catalog) {
552  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
553  CHECK(catalog);
554  const auto dd = catalog->getMetadataForDict(dict_id);
555  if (dd) {
556  CHECK(dd->stringDict);
557  CHECK_LE(dd->dictNBits, 32);
558  const int64_t generation =
559  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
560  return addStringDict(dd->stringDict, dict_id, generation);
561  }
563  if (!lit_str_dict_proxy_) {
564  DictRef literal_dict_ref(catalog->getDatabaseId(), DictRef::literalsDictId);
565  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
566  literal_dict_ref, "", false, true, g_cache_string_hash);
567  lit_str_dict_proxy_ =
568  std::make_shared<StringDictionaryProxy>(tsd, literal_dict_ref.dictId, 0);
569  }
570  return lit_str_dict_proxy_.get();
571 }
572 
574  const int source_dict_id,
575  const int dest_dict_id,
576  const RowSetMemoryOwner::StringTranslationType translation_type,
577  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
578  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
579  const bool with_generation) const {
580  CHECK(row_set_mem_owner);
581  std::lock_guard<std::mutex> lock(
582  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
583  return row_set_mem_owner->getOrAddStringProxyTranslationMap(source_dict_id,
584  dest_dict_id,
585  with_generation,
586  translation_type,
587  string_op_infos,
588  catalog_);
589 }
590 
593  const StringDictionaryProxy* source_proxy,
594  StringDictionaryProxy* dest_proxy,
595  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
596  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
597  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
598  CHECK(row_set_mem_owner);
599  std::lock_guard<std::mutex> lock(
600  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
601  // First translate lhs onto itself if there are string ops
602  if (!dest_string_op_infos.empty()) {
603  row_set_mem_owner->addStringProxyUnionTranslationMap(
604  dest_proxy, dest_proxy, dest_string_op_infos);
605  }
606  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
607  source_proxy, dest_proxy, source_string_op_infos);
608 }
609 
611  const int source_dict_id_in,
612  const int dest_dict_id_in,
613  const bool with_generation,
614  const RowSetMemoryOwner::StringTranslationType translation_type,
615  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
616  const Catalog_Namespace::Catalog* catalog) {
617  const auto source_proxy =
618  getOrAddStringDictProxy(source_dict_id_in, with_generation, catalog);
619  auto dest_proxy = getOrAddStringDictProxy(dest_dict_id_in, with_generation, catalog);
621  return addStringProxyIntersectionTranslationMap(
622  source_proxy, dest_proxy, string_op_infos);
623  } else {
624  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
625  }
626 }
627 
629  std::lock_guard<std::mutex> lock(state_mutex_);
630  return t_digests_
631  .emplace_back(std::make_unique<quantile::TDigest>(
633  .get();
634 }
635 
636 bool Executor::isCPUOnly() const {
637  CHECK(data_mgr_);
638  return !data_mgr_->getCudaMgr();
639 }
640 
642  const Analyzer::ColumnVar* col_var) const {
644  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
645 }
646 
648  const Analyzer::ColumnVar* col_var,
649  int n) const {
650  const auto cd = getColumnDescriptor(col_var);
651  if (!cd || n > cd->columnType.get_physical_cols()) {
652  return nullptr;
653  }
655  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
656 }
657 
659  return catalog_;
660 }
661 
663  catalog_ = catalog;
664 }
665 
666 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
667  return row_set_mem_owner_;
668 }
669 
671  return temporary_tables_;
672 }
673 
675  return input_table_info_cache_.getTableInfo(table_id);
676 }
677 
678 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
679  return table_generations_.getGeneration(table_id);
680 }
681 
683  return agg_col_range_cache_.getColRange(phys_input);
684 }
685 
686 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
687  size_t num_bytes = 0;
688  if (!plan_state_) {
689  return 0;
690  }
691  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
692  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
693  continue;
694  }
695 
696  if (fetched_col_pair.first < 0) {
697  num_bytes += 8;
698  } else {
699  const auto cd =
700  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
701  const auto& ti = cd->columnType;
702  const auto sz = ti.get_size();
703  if (sz < 0) {
704  // for varlen types, only account for the pointer/size for each row, for now
705  if (!ti.is_logical_geo_type()) {
706  // Don't count size for logical geo types, as they are
707  // backed by physical columns
708  num_bytes += 16;
709  }
710  } else {
711  num_bytes += sz;
712  }
713  }
714  }
715  return num_bytes;
716 }
717 
719  const std::vector<Analyzer::Expr*>& target_exprs) const {
721  for (const auto target_expr : target_exprs) {
722  if (plan_state_->isLazyFetchColumn(target_expr)) {
723  return true;
724  }
725  }
726  return false;
727 }
728 
729 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
730  const std::vector<Analyzer::Expr*>& target_exprs) const {
732  CHECK(catalog_);
733  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
734  for (const auto target_expr : target_exprs) {
735  if (!plan_state_->isLazyFetchColumn(target_expr)) {
736  col_lazy_fetch_info.emplace_back(
737  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
738  } else {
739  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
740  CHECK(col_var);
741  auto col_id = col_var->get_column_id();
742  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
743  auto cd = (col_var->get_table_id() > 0)
744  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
745  : nullptr;
746  if (cd && IS_GEO(cd->columnType.get_type())) {
747  // Geo coords cols will be processed in sequence. So we only need to track the
748  // first coords col in lazy fetch info.
749  {
750  auto cd0 =
751  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
752  auto col0_ti = cd0->columnType;
753  CHECK(!cd0->isVirtualCol);
754  auto col0_var = makeExpr<Analyzer::ColumnVar>(
755  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
756  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
757  col_lazy_fetch_info.emplace_back(
758  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
759  }
760  } else {
761  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
762  const auto& col_ti = col_var->get_type_info();
763  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
764  }
765  }
766  }
767  return col_lazy_fetch_info;
768 }
769 
774 }
775 
776 std::vector<int8_t> Executor::serializeLiterals(
777  const std::unordered_map<int, CgenState::LiteralValues>& literals,
778  const int device_id) {
779  if (literals.empty()) {
780  return {};
781  }
782  const auto dev_literals_it = literals.find(device_id);
783  CHECK(dev_literals_it != literals.end());
784  const auto& dev_literals = dev_literals_it->second;
785  size_t lit_buf_size{0};
786  std::vector<std::string> real_strings;
787  std::vector<std::vector<double>> double_array_literals;
788  std::vector<std::vector<int8_t>> align64_int8_array_literals;
789  std::vector<std::vector<int32_t>> int32_array_literals;
790  std::vector<std::vector<int8_t>> align32_int8_array_literals;
791  std::vector<std::vector<int8_t>> int8_array_literals;
792  for (const auto& lit : dev_literals) {
793  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
794  if (lit.which() == 7) {
795  const auto p = boost::get<std::string>(&lit);
796  CHECK(p);
797  real_strings.push_back(*p);
798  } else if (lit.which() == 8) {
799  const auto p = boost::get<std::vector<double>>(&lit);
800  CHECK(p);
801  double_array_literals.push_back(*p);
802  } else if (lit.which() == 9) {
803  const auto p = boost::get<std::vector<int32_t>>(&lit);
804  CHECK(p);
805  int32_array_literals.push_back(*p);
806  } else if (lit.which() == 10) {
807  const auto p = boost::get<std::vector<int8_t>>(&lit);
808  CHECK(p);
809  int8_array_literals.push_back(*p);
810  } else if (lit.which() == 11) {
811  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
812  CHECK(p);
813  if (p->second == 64) {
814  align64_int8_array_literals.push_back(p->first);
815  } else if (p->second == 32) {
816  align32_int8_array_literals.push_back(p->first);
817  } else {
818  CHECK(false);
819  }
820  }
821  }
822  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
823  throw TooManyLiterals();
824  }
825  int16_t crt_real_str_off = lit_buf_size;
826  for (const auto& real_str : real_strings) {
827  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
828  lit_buf_size += real_str.size();
829  }
830  if (double_array_literals.size() > 0) {
831  lit_buf_size = align(lit_buf_size, sizeof(double));
832  }
833  int16_t crt_double_arr_lit_off = lit_buf_size;
834  for (const auto& double_array_literal : double_array_literals) {
835  CHECK_LE(double_array_literal.size(),
836  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
837  lit_buf_size += double_array_literal.size() * sizeof(double);
838  }
839  if (align64_int8_array_literals.size() > 0) {
840  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
841  }
842  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
843  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
844  CHECK_LE(align64_int8_array_literals.size(),
845  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
846  lit_buf_size += align64_int8_array_literal.size();
847  }
848  if (int32_array_literals.size() > 0) {
849  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
850  }
851  int16_t crt_int32_arr_lit_off = lit_buf_size;
852  for (const auto& int32_array_literal : int32_array_literals) {
853  CHECK_LE(int32_array_literal.size(),
854  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
855  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
856  }
857  if (align32_int8_array_literals.size() > 0) {
858  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
859  }
860  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
861  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
862  CHECK_LE(align32_int8_array_literals.size(),
863  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
864  lit_buf_size += align32_int8_array_literal.size();
865  }
866  int16_t crt_int8_arr_lit_off = lit_buf_size;
867  for (const auto& int8_array_literal : int8_array_literals) {
868  CHECK_LE(int8_array_literal.size(),
869  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
870  lit_buf_size += int8_array_literal.size();
871  }
872  unsigned crt_real_str_idx = 0;
873  unsigned crt_double_arr_lit_idx = 0;
874  unsigned crt_align64_int8_arr_lit_idx = 0;
875  unsigned crt_int32_arr_lit_idx = 0;
876  unsigned crt_align32_int8_arr_lit_idx = 0;
877  unsigned crt_int8_arr_lit_idx = 0;
878  std::vector<int8_t> serialized(lit_buf_size);
879  size_t off{0};
880  for (const auto& lit : dev_literals) {
881  const auto lit_bytes = CgenState::literalBytes(lit);
882  off = CgenState::addAligned(off, lit_bytes);
883  switch (lit.which()) {
884  case 0: {
885  const auto p = boost::get<int8_t>(&lit);
886  CHECK(p);
887  serialized[off - lit_bytes] = *p;
888  break;
889  }
890  case 1: {
891  const auto p = boost::get<int16_t>(&lit);
892  CHECK(p);
893  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
894  break;
895  }
896  case 2: {
897  const auto p = boost::get<int32_t>(&lit);
898  CHECK(p);
899  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
900  break;
901  }
902  case 3: {
903  const auto p = boost::get<int64_t>(&lit);
904  CHECK(p);
905  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
906  break;
907  }
908  case 4: {
909  const auto p = boost::get<float>(&lit);
910  CHECK(p);
911  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
912  break;
913  }
914  case 5: {
915  const auto p = boost::get<double>(&lit);
916  CHECK(p);
917  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
918  break;
919  }
920  case 6: {
921  const auto p = boost::get<std::pair<std::string, int>>(&lit);
922  CHECK(p);
923  const auto str_id =
925  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
926  ->getOrAddTransient(p->first)
927  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
928  ->getIdOfString(p->first);
929  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
930  break;
931  }
932  case 7: {
933  const auto p = boost::get<std::string>(&lit);
934  CHECK(p);
935  int32_t off_and_len = crt_real_str_off << 16;
936  const auto& crt_real_str = real_strings[crt_real_str_idx];
937  off_and_len |= static_cast<int16_t>(crt_real_str.size());
938  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
939  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
940  ++crt_real_str_idx;
941  crt_real_str_off += crt_real_str.size();
942  break;
943  }
944  case 8: {
945  const auto p = boost::get<std::vector<double>>(&lit);
946  CHECK(p);
947  int32_t off_and_len = crt_double_arr_lit_off << 16;
948  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
949  int32_t len = crt_double_arr_lit.size();
950  CHECK_EQ((len >> 16), 0);
951  off_and_len |= static_cast<int16_t>(len);
952  int32_t double_array_bytesize = len * sizeof(double);
953  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
954  memcpy(&serialized[crt_double_arr_lit_off],
955  crt_double_arr_lit.data(),
956  double_array_bytesize);
957  ++crt_double_arr_lit_idx;
958  crt_double_arr_lit_off += double_array_bytesize;
959  break;
960  }
961  case 9: {
962  const auto p = boost::get<std::vector<int32_t>>(&lit);
963  CHECK(p);
964  int32_t off_and_len = crt_int32_arr_lit_off << 16;
965  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
966  int32_t len = crt_int32_arr_lit.size();
967  CHECK_EQ((len >> 16), 0);
968  off_and_len |= static_cast<int16_t>(len);
969  int32_t int32_array_bytesize = len * sizeof(int32_t);
970  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
971  memcpy(&serialized[crt_int32_arr_lit_off],
972  crt_int32_arr_lit.data(),
973  int32_array_bytesize);
974  ++crt_int32_arr_lit_idx;
975  crt_int32_arr_lit_off += int32_array_bytesize;
976  break;
977  }
978  case 10: {
979  const auto p = boost::get<std::vector<int8_t>>(&lit);
980  CHECK(p);
981  int32_t off_and_len = crt_int8_arr_lit_off << 16;
982  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
983  int32_t len = crt_int8_arr_lit.size();
984  CHECK_EQ((len >> 16), 0);
985  off_and_len |= static_cast<int16_t>(len);
986  int32_t int8_array_bytesize = len;
987  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
988  memcpy(&serialized[crt_int8_arr_lit_off],
989  crt_int8_arr_lit.data(),
990  int8_array_bytesize);
991  ++crt_int8_arr_lit_idx;
992  crt_int8_arr_lit_off += int8_array_bytesize;
993  break;
994  }
995  case 11: {
996  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
997  CHECK(p);
998  if (p->second == 64) {
999  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1000  const auto& crt_align64_int8_arr_lit =
1001  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1002  int32_t len = crt_align64_int8_arr_lit.size();
1003  CHECK_EQ((len >> 16), 0);
1004  off_and_len |= static_cast<int16_t>(len);
1005  int32_t align64_int8_array_bytesize = len;
1006  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1007  memcpy(&serialized[crt_align64_int8_arr_lit_off],
1008  crt_align64_int8_arr_lit.data(),
1009  align64_int8_array_bytesize);
1010  ++crt_align64_int8_arr_lit_idx;
1011  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1012  } else if (p->second == 32) {
1013  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1014  const auto& crt_align32_int8_arr_lit =
1015  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1016  int32_t len = crt_align32_int8_arr_lit.size();
1017  CHECK_EQ((len >> 16), 0);
1018  off_and_len |= static_cast<int16_t>(len);
1019  int32_t align32_int8_array_bytesize = len;
1020  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1021  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1022  crt_align32_int8_arr_lit.data(),
1023  align32_int8_array_bytesize);
1024  ++crt_align32_int8_arr_lit_idx;
1025  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1026  } else {
1027  CHECK(false);
1028  }
1029  break;
1030  }
1031  default:
1032  CHECK(false);
1033  }
1034  }
1035  return serialized;
1036 }
1037 
1038 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1039  if (device_type == ExecutorDeviceType::GPU) {
1040  return cudaMgr()->getDeviceCount();
1041  } else {
1042  return 1;
1043  }
1044 }
1045 
1047  const Data_Namespace::MemoryLevel memory_level) const {
1048  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1050 }
1051 
1052 // TODO(alex): remove or split
1053 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1054  const SQLTypeInfo& ti,
1055  const int64_t agg_init_val,
1056  const int8_t out_byte_width,
1057  const int64_t* out_vec,
1058  const size_t out_vec_sz,
1059  const bool is_group_by,
1060  const bool float_argument_input) {
1061  switch (agg) {
1062  case kAVG:
1063  case kSUM:
1064  if (0 != agg_init_val) {
1065  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1066  int64_t agg_result = agg_init_val;
1067  for (size_t i = 0; i < out_vec_sz; ++i) {
1068  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1069  }
1070  return {agg_result, 0};
1071  } else {
1072  CHECK(ti.is_fp());
1073  switch (out_byte_width) {
1074  case 4: {
1075  int agg_result = static_cast<int32_t>(agg_init_val);
1076  for (size_t i = 0; i < out_vec_sz; ++i) {
1078  &agg_result,
1079  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1080  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1081  }
1082  const int64_t converted_bin =
1083  float_argument_input
1084  ? static_cast<int64_t>(agg_result)
1085  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1086  return {converted_bin, 0};
1087  break;
1088  }
1089  case 8: {
1090  int64_t agg_result = agg_init_val;
1091  for (size_t i = 0; i < out_vec_sz; ++i) {
1093  &agg_result,
1094  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1095  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1096  }
1097  return {agg_result, 0};
1098  break;
1099  }
1100  default:
1101  CHECK(false);
1102  }
1103  }
1104  }
1105  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1106  int64_t agg_result = 0;
1107  for (size_t i = 0; i < out_vec_sz; ++i) {
1108  agg_result += out_vec[i];
1109  }
1110  return {agg_result, 0};
1111  } else {
1112  CHECK(ti.is_fp());
1113  switch (out_byte_width) {
1114  case 4: {
1115  float r = 0.;
1116  for (size_t i = 0; i < out_vec_sz; ++i) {
1117  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1118  }
1119  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1120  const int64_t converted_bin =
1121  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1122  return {converted_bin, 0};
1123  }
1124  case 8: {
1125  double r = 0.;
1126  for (size_t i = 0; i < out_vec_sz; ++i) {
1127  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1128  }
1129  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1130  }
1131  default:
1132  CHECK(false);
1133  }
1134  }
1135  break;
1136  case kCOUNT: {
1137  uint64_t agg_result = 0;
1138  for (size_t i = 0; i < out_vec_sz; ++i) {
1139  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1140  agg_result += out;
1141  }
1142  return {static_cast<int64_t>(agg_result), 0};
1143  }
1144  case kMIN: {
1145  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1146  int64_t agg_result = agg_init_val;
1147  for (size_t i = 0; i < out_vec_sz; ++i) {
1148  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1149  }
1150  return {agg_result, 0};
1151  } else {
1152  switch (out_byte_width) {
1153  case 4: {
1154  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1155  for (size_t i = 0; i < out_vec_sz; ++i) {
1157  &agg_result,
1158  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1159  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1160  }
1161  const int64_t converted_bin =
1162  float_argument_input
1163  ? static_cast<int64_t>(agg_result)
1164  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1165  return {converted_bin, 0};
1166  }
1167  case 8: {
1168  int64_t agg_result = agg_init_val;
1169  for (size_t i = 0; i < out_vec_sz; ++i) {
1171  &agg_result,
1172  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1173  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1174  }
1175  return {agg_result, 0};
1176  }
1177  default:
1178  CHECK(false);
1179  }
1180  }
1181  }
1182  case kMAX:
1183  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1184  int64_t agg_result = agg_init_val;
1185  for (size_t i = 0; i < out_vec_sz; ++i) {
1186  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1187  }
1188  return {agg_result, 0};
1189  } else {
1190  switch (out_byte_width) {
1191  case 4: {
1192  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1193  for (size_t i = 0; i < out_vec_sz; ++i) {
1195  &agg_result,
1196  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1197  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1198  }
1199  const int64_t converted_bin =
1200  float_argument_input ? static_cast<int64_t>(agg_result)
1201  : float_to_double_bin(agg_result, !ti.get_notnull());
1202  return {converted_bin, 0};
1203  }
1204  case 8: {
1205  int64_t agg_result = agg_init_val;
1206  for (size_t i = 0; i < out_vec_sz; ++i) {
1208  &agg_result,
1209  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1210  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1211  }
1212  return {agg_result, 0};
1213  }
1214  default:
1215  CHECK(false);
1216  }
1217  }
1218  case kSINGLE_VALUE: {
1219  int64_t agg_result = agg_init_val;
1220  for (size_t i = 0; i < out_vec_sz; ++i) {
1221  if (out_vec[i] != agg_init_val) {
1222  if (agg_result == agg_init_val) {
1223  agg_result = out_vec[i];
1224  } else if (out_vec[i] != agg_result) {
1226  }
1227  }
1228  }
1229  return {agg_result, 0};
1230  }
1231  case kSAMPLE: {
1232  int64_t agg_result = agg_init_val;
1233  for (size_t i = 0; i < out_vec_sz; ++i) {
1234  if (out_vec[i] != agg_init_val) {
1235  agg_result = out_vec[i];
1236  break;
1237  }
1238  }
1239  return {agg_result, 0};
1240  }
1241  default:
1242  CHECK(false);
1243  }
1244  abort();
1245 }
1246 
1247 namespace {
1248 
1250  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1251  std::vector<TargetInfo> const& targets) {
1252  auto& first = results_per_device.front().first;
1253  CHECK(first);
1254  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1255  if (first_target_idx) {
1256  first->translateDictEncodedColumns(targets, *first_target_idx);
1257  }
1258  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1259  const auto& next = results_per_device[dev_idx].first;
1260  CHECK(next);
1261  if (first_target_idx) {
1262  next->translateDictEncodedColumns(targets, *first_target_idx);
1263  }
1264  first->append(*next);
1265  }
1266  return std::move(first);
1267 }
1268 
1270  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1271  return get_target_info(target_expr, g_bigint_count);
1272  }
1273 };
1274 
1275 } // namespace
1276 
1278  const RelAlgExecutionUnit& ra_exe_unit) {
1279  auto timer = DEBUG_TIMER(__func__);
1280  auto& results_per_device = shared_context.getFragmentResults();
1281  auto const targets = shared::transform<std::vector<TargetInfo>>(
1282  ra_exe_unit.target_exprs, GetTargetInfo{});
1283  if (results_per_device.empty()) {
1284  return std::make_shared<ResultSet>(targets,
1288  catalog_,
1289  blockSize(),
1290  gridSize());
1291  }
1292  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1293  std::sort(results_per_device.begin(),
1294  results_per_device.end(),
1295  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1296  CHECK_GE(lhs.second.size(), size_t(1));
1297  CHECK_GE(rhs.second.size(), size_t(1));
1298  return lhs.second.front() < rhs.second.front();
1299  });
1300 
1301  return get_merged_result(results_per_device, targets);
1302 }
1303 
1305  const RelAlgExecutionUnit& ra_exe_unit,
1306  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1307  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1308  const QueryMemoryDescriptor& query_mem_desc) const {
1309  auto timer = DEBUG_TIMER(__func__);
1310  if (ra_exe_unit.estimator) {
1311  return reduce_estimator_results(ra_exe_unit, results_per_device);
1312  }
1313 
1314  if (results_per_device.empty()) {
1315  auto const targets = shared::transform<std::vector<TargetInfo>>(
1316  ra_exe_unit.target_exprs, GetTargetInfo{});
1317  return std::make_shared<ResultSet>(targets,
1320  nullptr,
1321  catalog_,
1322  blockSize(),
1323  gridSize());
1324  }
1325 
1327  results_per_device,
1328  row_set_mem_owner,
1329  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1330 }
1331 
1332 namespace {
1333 
1335  const size_t executor_id,
1336  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1337  int64_t* compilation_queue_time) {
1338  auto clock_begin = timer_start();
1339  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1340  *compilation_queue_time = timer_stop(clock_begin);
1341  const auto& this_result_set = results_per_device[0].first;
1342  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1343  this_result_set->getTargetInfos(),
1344  this_result_set->getTargetInitVals(),
1345  executor_id);
1346  return reduction_jit.codegen();
1347 };
1348 
1349 } // namespace
1350 
1352  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1353  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1354  const QueryMemoryDescriptor& query_mem_desc) const {
1355  auto timer = DEBUG_TIMER(__func__);
1356  std::shared_ptr<ResultSet> reduced_results;
1357 
1358  const auto& first = results_per_device.front().first;
1359 
1360  if (query_mem_desc.getQueryDescriptionType() ==
1362  results_per_device.size() > 1) {
1363  const auto total_entry_count = std::accumulate(
1364  results_per_device.begin(),
1365  results_per_device.end(),
1366  size_t(0),
1367  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1368  const auto& r = rs.first;
1369  return init + r->getQueryMemDesc().getEntryCount();
1370  });
1371  CHECK(total_entry_count);
1372  auto query_mem_desc = first->getQueryMemDesc();
1373  query_mem_desc.setEntryCount(total_entry_count);
1374  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1377  row_set_mem_owner,
1378  catalog_,
1379  blockSize(),
1380  gridSize());
1381  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1382  reduced_results->initializeStorage();
1383  switch (query_mem_desc.getEffectiveKeyWidth()) {
1384  case 4:
1385  first->getStorage()->moveEntriesToBuffer<int32_t>(
1386  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1387  break;
1388  case 8:
1389  first->getStorage()->moveEntriesToBuffer<int64_t>(
1390  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1391  break;
1392  default:
1393  CHECK(false);
1394  }
1395  } else {
1396  reduced_results = first;
1397  }
1398 
1399  int64_t compilation_queue_time = 0;
1400  const auto reduction_code =
1401  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1402 
1403  for (size_t i = 1; i < results_per_device.size(); ++i) {
1404  reduced_results->getStorage()->reduce(
1405  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1406  }
1407  reduced_results->addCompilationQueueTime(compilation_queue_time);
1408  return reduced_results;
1409 }
1410 
1412  const RelAlgExecutionUnit& ra_exe_unit,
1413  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1414  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1415  const QueryMemoryDescriptor& query_mem_desc) const {
1416  if (results_per_device.size() == 1) {
1417  return std::move(results_per_device.front().first);
1418  }
1419  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1421  for (const auto& result : results_per_device) {
1422  auto rows = result.first;
1423  CHECK(rows);
1424  if (!rows) {
1425  continue;
1426  }
1427  SpeculativeTopNMap that(
1428  *rows,
1429  ra_exe_unit.target_exprs,
1430  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1431  m.reduce(that);
1432  }
1433  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1434  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1435  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1436 }
1437 
1438 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1439  CHECK(data_mgr);
1440  std::unordered_set<int> available_gpus;
1441  if (data_mgr->gpusPresent()) {
1442  CHECK(data_mgr->getCudaMgr());
1443  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1444  CHECK_GT(gpu_count, 0);
1445  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1446  available_gpus.insert(gpu_id);
1447  }
1448  }
1449  return available_gpus;
1450 }
1451 
1452 size_t get_context_count(const ExecutorDeviceType device_type,
1453  const size_t cpu_count,
1454  const size_t gpu_count) {
1455  return device_type == ExecutorDeviceType::GPU ? gpu_count
1456  : static_cast<size_t>(cpu_count);
1457 }
1458 
1459 namespace {
1460 
1461 // Compute a very conservative entry count for the output buffer entry count using no
1462 // other information than the number of tuples in each table and multiplying them
1463 // together.
1464 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1466  // Check for overflows since we're multiplying potentially big table sizes.
1467  using checked_size_t = boost::multiprecision::number<
1468  boost::multiprecision::cpp_int_backend<64,
1469  64,
1470  boost::multiprecision::unsigned_magnitude,
1471  boost::multiprecision::checked,
1472  void>>;
1473  checked_size_t max_groups_buffer_entry_guess = 1;
1474  for (const auto& query_info : query_infos) {
1475  CHECK(!query_info.info.fragments.empty());
1476  auto it = std::max_element(query_info.info.fragments.begin(),
1477  query_info.info.fragments.end(),
1478  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1479  return f1.getNumTuples() < f2.getNumTuples();
1480  });
1481  max_groups_buffer_entry_guess *= it->getNumTuples();
1482  }
1483  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1484  // baseline group layout with that many entries anyway.
1485  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1486  try {
1487  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1488  max_groups_buffer_entry_guess_cap);
1489  } catch (...) {
1490  return max_groups_buffer_entry_guess_cap;
1491  }
1492 }
1493 
1494 std::string get_table_name(const InputDescriptor& input_desc,
1496  const auto source_type = input_desc.getSourceType();
1497  if (source_type == InputSourceType::TABLE) {
1498  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1499  CHECK(td);
1500  return td->tableName;
1501  } else {
1502  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1503  }
1504 }
1505 
1506 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1507  const int device_count) {
1508  if (device_type == ExecutorDeviceType::GPU) {
1509  return device_count * Executor::high_scan_limit;
1510  }
1512 }
1513 
1515  const std::vector<InputTableInfo>& table_infos,
1517  const ExecutorDeviceType device_type,
1518  const int device_count) {
1519  for (const auto target_expr : ra_exe_unit.target_exprs) {
1520  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1521  return;
1522  }
1523  }
1524  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1525  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1526  // Allow a query with no scan limit to run on small tables
1527  return;
1528  }
1529  if (ra_exe_unit.use_bump_allocator) {
1530  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1531  // relative to the size of the input), so we bypass this check for now
1532  return;
1533  }
1534  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1535  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1536  (!ra_exe_unit.scan_limit ||
1537  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1538  std::vector<std::string> table_names;
1539  const auto& input_descs = ra_exe_unit.input_descs;
1540  for (const auto& input_desc : input_descs) {
1541  table_names.push_back(get_table_name(input_desc, cat));
1542  }
1543  if (!ra_exe_unit.scan_limit) {
1544  throw WatchdogException(
1545  "Projection query would require a scan without a limit on table(s): " +
1546  boost::algorithm::join(table_names, ", "));
1547  } else {
1548  throw WatchdogException(
1549  "Projection query output result set on table(s): " +
1550  boost::algorithm::join(table_names, ", ") + " would contain " +
1551  std::to_string(ra_exe_unit.scan_limit) +
1552  " rows, which is more than the current system limit of " +
1553  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1554  }
1555  }
1556 }
1557 
1558 } // namespace
1559 
1560 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1561  const RelAlgExecutionUnit& ra_exe_unit) {
1562  if (ra_exe_unit.input_descs.size() < 2) {
1563  return false;
1564  }
1565 
1566  // We only support loop join at the end of folded joins
1567  // where ra_exe_unit.input_descs.size() > 2 for now.
1568  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1569 
1570  std::optional<size_t> inner_table_idx;
1571  for (size_t i = 0; i < query_infos.size(); ++i) {
1572  if (query_infos[i].table_id == inner_table_id) {
1573  inner_table_idx = i;
1574  break;
1575  }
1576  }
1577  CHECK(inner_table_idx);
1578  return query_infos[*inner_table_idx].info.getNumTuples() <=
1580 }
1581 
1582 namespace {
1583 
1584 template <typename T>
1585 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1586  std::vector<std::string> expr_strs;
1587  for (const auto& expr : expr_container) {
1588  if (!expr) {
1589  expr_strs.emplace_back("NULL");
1590  } else {
1591  expr_strs.emplace_back(expr->toString());
1592  }
1593  }
1594  return expr_strs;
1595 }
1596 
1597 template <>
1598 std::vector<std::string> expr_container_to_string(
1599  const std::list<Analyzer::OrderEntry>& expr_container) {
1600  std::vector<std::string> expr_strs;
1601  for (const auto& expr : expr_container) {
1602  expr_strs.emplace_back(expr.toString());
1603  }
1604  return expr_strs;
1605 }
1606 
1607 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1608  switch (algorithm) {
1610  return "ResultSet";
1612  return "Speculative Top N";
1614  return "Streaming Top N";
1615  }
1616  UNREACHABLE();
1617  return "";
1618 }
1619 
1620 } // namespace
1621 
1622 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1623  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1624  // instead of ra_exec_unit description if possible
1625  std::ostringstream os;
1626  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1627  const auto& scan_desc = input_col_desc->getScanDesc();
1628  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1629  << scan_desc.getNestLevel();
1630  }
1631  if (!ra_exe_unit.simple_quals.empty()) {
1632  for (const auto& qual : ra_exe_unit.simple_quals) {
1633  if (qual) {
1634  os << qual->toString() << ",";
1635  }
1636  }
1637  }
1638  if (!ra_exe_unit.quals.empty()) {
1639  for (const auto& qual : ra_exe_unit.quals) {
1640  if (qual) {
1641  os << qual->toString() << ",";
1642  }
1643  }
1644  }
1645  if (!ra_exe_unit.join_quals.empty()) {
1646  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1647  const auto& join_condition = ra_exe_unit.join_quals[i];
1648  os << std::to_string(i) << ::toString(join_condition.type);
1649  for (const auto& qual : join_condition.quals) {
1650  if (qual) {
1651  os << qual->toString() << ",";
1652  }
1653  }
1654  }
1655  }
1656  if (!ra_exe_unit.groupby_exprs.empty()) {
1657  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1658  if (qual) {
1659  os << qual->toString() << ",";
1660  }
1661  }
1662  }
1663  for (const auto& expr : ra_exe_unit.target_exprs) {
1664  if (expr) {
1665  os << expr->toString() << ",";
1666  }
1667  }
1668  os << ::toString(ra_exe_unit.estimator == nullptr);
1669  os << std::to_string(ra_exe_unit.scan_limit);
1670  return os.str();
1671 }
1672 
1673 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1674  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
1675  os << "\n\tTable/Col/Levels: ";
1676  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1677  const auto& scan_desc = input_col_desc->getScanDesc();
1678  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1679  << scan_desc.getNestLevel() << ") ";
1680  }
1681  if (!ra_exe_unit.simple_quals.empty()) {
1682  os << "\n\tSimple Quals: "
1684  ", ");
1685  }
1686  if (!ra_exe_unit.quals.empty()) {
1687  os << "\n\tQuals: "
1688  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1689  }
1690  if (!ra_exe_unit.join_quals.empty()) {
1691  os << "\n\tJoin Quals: ";
1692  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1693  const auto& join_condition = ra_exe_unit.join_quals[i];
1694  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1695  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1696  }
1697  }
1698  if (!ra_exe_unit.groupby_exprs.empty()) {
1699  os << "\n\tGroup By: "
1701  ", ");
1702  }
1703  os << "\n\tProjected targets: "
1705  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1706  os << "\n\tSort Info: ";
1707  const auto& sort_info = ra_exe_unit.sort_info;
1708  os << "\n\t Order Entries: "
1709  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1710  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1711  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1712  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1713  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1714  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1715  if (ra_exe_unit.union_all) {
1716  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1717  }
1718  return os;
1719 }
1720 
1721 namespace {
1722 
1724  const size_t new_scan_limit) {
1725  return {ra_exe_unit_in.input_descs,
1726  ra_exe_unit_in.input_col_descs,
1727  ra_exe_unit_in.simple_quals,
1728  ra_exe_unit_in.quals,
1729  ra_exe_unit_in.join_quals,
1730  ra_exe_unit_in.groupby_exprs,
1731  ra_exe_unit_in.target_exprs,
1732  ra_exe_unit_in.estimator,
1733  ra_exe_unit_in.sort_info,
1734  new_scan_limit,
1735  ra_exe_unit_in.query_hint,
1736  ra_exe_unit_in.query_plan_dag_hash,
1737  ra_exe_unit_in.hash_table_build_plan_dag,
1738  ra_exe_unit_in.table_id_to_node_map,
1739  ra_exe_unit_in.use_bump_allocator,
1740  ra_exe_unit_in.union_all,
1741  ra_exe_unit_in.query_state};
1742 }
1743 
1744 } // namespace
1745 
1746 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1747  const bool is_agg,
1748  const std::vector<InputTableInfo>& query_infos,
1749  const RelAlgExecutionUnit& ra_exe_unit_in,
1750  const CompilationOptions& co,
1751  const ExecutionOptions& eo,
1753  RenderInfo* render_info,
1754  const bool has_cardinality_estimation,
1755  ColumnCacheMap& column_cache) {
1756  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1757 
1758  ScopeGuard cleanup_post_execution = [this] {
1759  // cleanup/unpin GPU buffer allocations
1760  // TODO: separate out this state into a single object
1761  plan_state_.reset(nullptr);
1762  if (cgen_state_) {
1763  cgen_state_->in_values_bitmaps_.clear();
1764  cgen_state_->str_dict_translation_mgrs_.clear();
1765  }
1766  };
1767 
1768  try {
1769  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1770  is_agg,
1771  true,
1772  query_infos,
1773  ra_exe_unit_in,
1774  co,
1775  eo,
1776  cat,
1778  render_info,
1779  has_cardinality_estimation,
1780  column_cache);
1781  if (result) {
1782  result->setKernelQueueTime(kernel_queue_time_ms_);
1783  result->addCompilationQueueTime(compilation_queue_time_ms_);
1784  if (eo.just_validate) {
1785  result->setValidationOnlyRes();
1786  }
1787  }
1788  return result;
1789  } catch (const CompilationRetryNewScanLimit& e) {
1790  auto result =
1791  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1792  is_agg,
1793  false,
1794  query_infos,
1795  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1796  co,
1797  eo,
1798  cat,
1800  render_info,
1801  has_cardinality_estimation,
1802  column_cache);
1803  if (result) {
1804  result->setKernelQueueTime(kernel_queue_time_ms_);
1805  result->addCompilationQueueTime(compilation_queue_time_ms_);
1806  if (eo.just_validate) {
1807  result->setValidationOnlyRes();
1808  }
1809  }
1810  return result;
1811  }
1812 }
1813 
1815  size_t& max_groups_buffer_entry_guess,
1816  const bool is_agg,
1817  const bool allow_single_frag_table_opt,
1818  const std::vector<InputTableInfo>& query_infos,
1819  const RelAlgExecutionUnit& ra_exe_unit_in,
1820  const CompilationOptions& co,
1821  const ExecutionOptions& eo,
1823  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1824  RenderInfo* render_info,
1825  const bool has_cardinality_estimation,
1826  ColumnCacheMap& column_cache) {
1827  INJECT_TIMER(Exec_executeWorkUnit);
1828  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1829  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1830  CHECK(!query_infos.empty());
1831  if (!max_groups_buffer_entry_guess) {
1832  // The query has failed the first execution attempt because of running out
1833  // of group by slots. Make the conservative choice: allocate fragment size
1834  // slots and run on the CPU.
1835  CHECK(device_type == ExecutorDeviceType::CPU);
1836  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1837  }
1838 
1839  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1840  do {
1841  SharedKernelContext shared_context(query_infos);
1842  ColumnFetcher column_fetcher(this, column_cache);
1843  ScopeGuard scope_guard = [&column_fetcher] {
1844  column_fetcher.freeLinearizedBuf();
1845  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1846  };
1847  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1848  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1849 
1850  if (eo.executor_type == ExecutorType::Native) {
1851  try {
1852  INJECT_TIMER(query_step_compilation);
1853  query_mem_desc_owned =
1854  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1855  crt_min_byte_width,
1856  has_cardinality_estimation,
1857  ra_exe_unit,
1858  query_infos,
1859  deleted_cols_map,
1860  column_fetcher,
1861  {device_type,
1862  co.hoist_literals,
1863  co.opt_level,
1865  co.allow_lazy_fetch,
1867  co.explain_type,
1869  eo,
1870  render_info,
1871  this);
1872  CHECK(query_mem_desc_owned);
1873  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1874  } catch (CompilationRetryNoCompaction&) {
1875  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1876  continue;
1877  }
1878  } else {
1879  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1880  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1881  CHECK(!query_mem_desc_owned);
1882  query_mem_desc_owned.reset(
1884  }
1885  if (eo.just_explain) {
1886  return executeExplain(*query_comp_desc_owned);
1887  }
1888 
1889  for (const auto target_expr : ra_exe_unit.target_exprs) {
1890  plan_state_->target_exprs_.push_back(target_expr);
1891  }
1892 
1893  if (!eo.just_validate) {
1894  int available_cpus = cpu_threads();
1895  auto available_gpus = get_available_gpus(data_mgr_);
1896 
1897  const auto context_count =
1898  get_context_count(device_type, available_cpus, available_gpus.size());
1899  try {
1900  auto kernels = createKernels(shared_context,
1901  ra_exe_unit,
1902  column_fetcher,
1903  query_infos,
1904  eo,
1905  is_agg,
1906  allow_single_frag_table_opt,
1907  context_count,
1908  *query_comp_desc_owned,
1909  *query_mem_desc_owned,
1910  render_info,
1911  available_gpus,
1912  available_cpus);
1913  launchKernels(
1914  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1915  } catch (QueryExecutionError& e) {
1916  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1917  e.getErrorCode() == ERR_OUT_OF_TIME) {
1919  }
1920  if (e.getErrorCode() == ERR_INTERRUPTED) {
1922  }
1924  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1925  crt_min_byte_width <<= 1;
1926  continue;
1927  }
1928  throw;
1929  }
1930  }
1931  if (is_agg) {
1932  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1933  // update query status to let user know we are now in the reduction phase
1934  std::string curRunningSession{""};
1935  std::string curRunningQuerySubmittedTime{""};
1936  bool sessionEnrolled = false;
1937  {
1938  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
1939  curRunningSession = getCurrentQuerySession(session_read_lock);
1940  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1941  sessionEnrolled =
1942  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1943  }
1944  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1945  sessionEnrolled) {
1946  updateQuerySessionStatus(curRunningSession,
1947  curRunningQuerySubmittedTime,
1949  }
1950  }
1951  try {
1952  return collectAllDeviceResults(shared_context,
1953  ra_exe_unit,
1954  *query_mem_desc_owned,
1955  query_comp_desc_owned->getDeviceType(),
1956  row_set_mem_owner);
1957  } catch (ReductionRanOutOfSlots&) {
1959  } catch (OverflowOrUnderflow&) {
1960  crt_min_byte_width <<= 1;
1961  continue;
1962  } catch (QueryExecutionError& e) {
1963  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1964  << ", what(): " << e.what();
1965  throw QueryExecutionError(e.getErrorCode());
1966  }
1967  }
1968  return resultsUnion(shared_context, ra_exe_unit);
1969 
1970  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1971 
1972  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1975  nullptr,
1976  catalog_,
1977  blockSize(),
1978  gridSize());
1979 }
1980 
1982  const RelAlgExecutionUnit& ra_exe_unit_in,
1983  const InputTableInfo& table_info,
1984  const CompilationOptions& co,
1985  const ExecutionOptions& eo,
1987  PerFragmentCallBack& cb,
1988  const std::set<size_t>& fragment_indexes_param) {
1989  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1990  ColumnCacheMap column_cache;
1991 
1992  std::vector<InputTableInfo> table_infos{table_info};
1993  SharedKernelContext kernel_context(table_infos);
1994 
1995  ColumnFetcher column_fetcher(this, column_cache);
1996  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1997  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1998  {
1999  query_mem_desc_owned =
2000  query_comp_desc_owned->compile(0,
2001  8,
2002  /*has_cardinality_estimation=*/false,
2003  ra_exe_unit,
2004  table_infos,
2005  deleted_cols_map,
2006  column_fetcher,
2007  co,
2008  eo,
2009  nullptr,
2010  this);
2011  }
2012  CHECK(query_mem_desc_owned);
2013  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2014  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
2015  const auto& outer_fragments = table_info.info.fragments;
2016 
2017  std::set<size_t> fragment_indexes;
2018  if (fragment_indexes_param.empty()) {
2019  // An empty `fragment_indexes_param` set implies executing
2020  // the query for all fragments in the table. In this
2021  // case, populate `fragment_indexes` with all fragment indexes.
2022  for (size_t i = 0; i < outer_fragments.size(); i++) {
2023  fragment_indexes.emplace(i);
2024  }
2025  } else {
2026  fragment_indexes = fragment_indexes_param;
2027  }
2028 
2029  {
2030  auto clock_begin = timer_start();
2031  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2032  kernel_queue_time_ms_ += timer_stop(clock_begin);
2033 
2034  for (auto fragment_index : fragment_indexes) {
2035  // We may want to consider in the future allowing this to execute on devices other
2036  // than CPU
2037  FragmentsList fragments_list{{table_id, {fragment_index}}};
2038  ExecutionKernel kernel(ra_exe_unit,
2039  co.device_type,
2040  /*device_id=*/0,
2041  eo,
2042  column_fetcher,
2043  *query_comp_desc_owned,
2044  *query_mem_desc_owned,
2045  fragments_list,
2047  /*render_info=*/nullptr,
2048  /*rowid_lookup_key=*/-1);
2049  kernel.run(this, 0, kernel_context);
2050  }
2051  }
2052 
2053  const auto& all_fragment_results = kernel_context.getFragmentResults();
2054 
2055  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2056  CHECK_EQ(result_fragment_indexes.size(), 1);
2057  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2058  }
2059 }
2060 
2062  const TableFunctionExecutionUnit exe_unit,
2063  const std::vector<InputTableInfo>& table_infos,
2064  const CompilationOptions& co,
2065  const ExecutionOptions& eo,
2067  INJECT_TIMER(Exec_executeTableFunction);
2068  if (eo.just_validate) {
2070  /*entry_count=*/0,
2072  /*is_table_function=*/true);
2073  query_mem_desc.setOutputColumnar(true);
2074  return std::make_shared<ResultSet>(
2075  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2076  co.device_type,
2077  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2078  this->getRowSetMemoryOwner(),
2079  this->getCatalog(),
2080  this->blockSize(),
2081  this->gridSize());
2082  }
2083 
2084  // Avoid compile functions that set the sizer at runtime if the device is GPU
2085  // This should be fixed in the python script as well to minimize the number of
2086  // QueryMustRunOnCpu exceptions
2089  throw QueryMustRunOnCpu();
2090  }
2091 
2092  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2093  // framework, we may want to move this up a level
2094 
2095  ColumnFetcher column_fetcher(this, column_cache);
2097 
2098  if (exe_unit.table_func.containsPreFlightFn()) {
2099  std::shared_ptr<CompilationContext> compilation_context;
2100  {
2101  Executor::CgenStateManager cgenstate_manager(*this,
2102  false,
2103  table_infos,
2105  nullptr); // locks compilation_mutex
2107  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2108  compilation_context =
2109  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2110  }
2111  exe_context.execute(exe_unit,
2112  table_infos,
2113  compilation_context,
2114  column_fetcher,
2116  this,
2117  true /* is_pre_launch_udtf */);
2118  }
2119  std::shared_ptr<CompilationContext> compilation_context;
2120  {
2121  Executor::CgenStateManager cgenstate_manager(*this,
2122  false,
2123  table_infos,
2125  nullptr); // locks compilation_mutex
2126  TableFunctionCompilationContext tf_compilation_context(this, co);
2127  compilation_context =
2128  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2129  }
2130  return exe_context.execute(exe_unit,
2131  table_infos,
2132  compilation_context,
2133  column_fetcher,
2134  co.device_type,
2135  this,
2136  false /* is_pre_launch_udtf */);
2137 }
2138 
2140  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2141 }
2142 
2144  const RelAlgExecutionUnit& ra_exe_unit,
2145  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2146  TransientDictIdVisitor dict_id_visitor;
2147 
2148  auto visit_expr =
2149  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2150  if (!expr) {
2151  return;
2152  }
2153  const auto dict_id = dict_id_visitor.visit(expr);
2154  if (dict_id >= 0) {
2155  auto sdp = getStringDictionaryProxy(dict_id, row_set_mem_owner, true);
2156  CHECK(sdp);
2157  TransientStringLiteralsVisitor visitor(sdp, this);
2158  visitor.visit(expr);
2159  }
2160  };
2161 
2162  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2163  visit_expr(group_expr.get());
2164  }
2165 
2166  for (const auto& group_expr : ra_exe_unit.quals) {
2167  visit_expr(group_expr.get());
2168  }
2169 
2170  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2171  visit_expr(group_expr.get());
2172  }
2173 
2174  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2175  const auto& target_type = target_expr->get_type_info();
2176  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2177  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2178  if (agg_expr) {
2179  if (agg_expr->get_aggtype() == kSINGLE_VALUE ||
2180  agg_expr->get_aggtype() == kSAMPLE) {
2181  visit_expr(agg_expr->get_arg());
2182  }
2183  } else {
2184  visit_expr(target_expr);
2185  }
2186  }
2187  };
2188  const auto& target_exprs = ra_exe_unit.target_exprs;
2189  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2190  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2191  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2192 }
2193 
2195  const RelAlgExecutionUnit& ra_exe_unit,
2196  const ExecutorDeviceType requested_device_type) {
2197  for (const auto target_expr : ra_exe_unit.target_exprs) {
2198  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2199  if (!ra_exe_unit.groupby_exprs.empty() &&
2200  !isArchPascalOrLater(requested_device_type)) {
2201  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
2202  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2203  return ExecutorDeviceType::CPU;
2204  }
2205  }
2206  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2207  return ExecutorDeviceType::CPU;
2208  }
2209  }
2210  return requested_device_type;
2211 }
2212 
2213 namespace {
2214 
2215 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2216  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2217  if (ti.is_fp()) {
2218  if (float_argument_input && ti.get_type() == kFLOAT) {
2219  int64_t float_null_val = 0;
2220  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2221  static_cast<float>(inline_fp_null_val(ti));
2222  return float_null_val;
2223  }
2224  const auto double_null_val = inline_fp_null_val(ti);
2225  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2226  }
2227  return inline_int_null_val(ti);
2228 }
2229 
2230 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2231  std::vector<int64_t>& entry,
2232  const std::vector<Analyzer::Expr*>& target_exprs,
2234  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2235  const auto target_expr = target_exprs[target_idx];
2236  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2237  CHECK(agg_info.is_agg);
2238  target_infos.push_back(agg_info);
2239  if (g_cluster) {
2240  const auto executor = query_mem_desc.getExecutor();
2241  CHECK(executor);
2242  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2243  CHECK(row_set_mem_owner);
2244  const auto& count_distinct_desc =
2245  query_mem_desc.getCountDistinctDescriptor(target_idx);
2246  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2247  CHECK(row_set_mem_owner);
2248  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2249  count_distinct_desc.bitmapPaddedSizeBytes(),
2250  /*thread_idx=*/0); // TODO: can we detect thread idx here?
2251  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2252  continue;
2253  }
2254  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2255  auto count_distinct_set = new CountDistinctSet();
2256  CHECK(row_set_mem_owner);
2257  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2258  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2259  continue;
2260  }
2261  }
2262  const bool float_argument_input = takes_float_argument(agg_info);
2263  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
2264  entry.push_back(0);
2265  } else if (agg_info.agg_kind == kAVG) {
2266  entry.push_back(0);
2267  entry.push_back(0);
2268  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
2269  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2270  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2271  entry.push_back(0);
2272  }
2273  } else if (agg_info.sql_type.is_varlen()) {
2274  entry.push_back(0);
2275  entry.push_back(0);
2276  } else {
2277  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2278  }
2279  } else {
2280  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2281  }
2282  }
2283 }
2284 
2286  const std::vector<Analyzer::Expr*>& target_exprs_in,
2288  const ExecutorDeviceType device_type) {
2289  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2290  std::vector<Analyzer::Expr*> target_exprs;
2291  for (const auto target_expr : target_exprs_in) {
2292  const auto target_expr_copy =
2293  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2294  CHECK(target_expr_copy);
2295  auto ti = target_expr->get_type_info();
2296  ti.set_notnull(false);
2297  target_expr_copy->set_type_info(ti);
2298  if (target_expr_copy->get_arg()) {
2299  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2300  arg_ti.set_notnull(false);
2301  target_expr_copy->get_arg()->set_type_info(arg_ti);
2302  }
2303  target_exprs_owned_copies.push_back(target_expr_copy);
2304  target_exprs.push_back(target_expr_copy.get());
2305  }
2306  std::vector<TargetInfo> target_infos;
2307  std::vector<int64_t> entry;
2308  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2309  const auto executor = query_mem_desc.getExecutor();
2310  CHECK(executor);
2311  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2312  CHECK(row_set_mem_owner);
2313  auto rs = std::make_shared<ResultSet>(target_infos,
2314  device_type,
2316  row_set_mem_owner,
2317  executor->getCatalog(),
2318  executor->blockSize(),
2319  executor->gridSize());
2320  rs->allocateStorage();
2321  rs->fillOneEntry(entry);
2322  return rs;
2323 }
2324 
2325 } // namespace
2326 
2328  SharedKernelContext& shared_context,
2329  const RelAlgExecutionUnit& ra_exe_unit,
2331  const ExecutorDeviceType device_type,
2332  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2333  auto timer = DEBUG_TIMER(__func__);
2334  auto& result_per_device = shared_context.getFragmentResults();
2335  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2338  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2339  }
2340  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2341  try {
2342  return reduceSpeculativeTopN(
2343  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2344  } catch (const std::bad_alloc&) {
2345  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2346  }
2347  }
2348  const auto shard_count =
2349  device_type == ExecutorDeviceType::GPU
2351  : 0;
2352 
2353  if (shard_count && !result_per_device.empty()) {
2354  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
2355  }
2356  return reduceMultiDeviceResults(
2357  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2358 }
2359 
2360 namespace {
2371 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2372  const QueryMemoryDescriptor& input_query_mem_desc,
2373  const ResultSetStorage* output_storage,
2374  size_t output_row_index,
2375  const QueryMemoryDescriptor& output_query_mem_desc,
2376  const std::vector<uint32_t>& top_permutation) {
2377  const auto output_buffer = output_storage->getUnderlyingBuffer();
2378  const auto input_buffer = input_storage->getUnderlyingBuffer();
2379  for (const auto sorted_idx : top_permutation) {
2380  // permuting all group-columns in this result set into the final buffer:
2381  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2382  group_idx++) {
2383  const auto input_column_ptr =
2384  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2385  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2386  const auto output_column_ptr =
2387  output_buffer +
2388  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2389  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2390  memcpy(output_column_ptr,
2391  input_column_ptr,
2392  output_query_mem_desc.groupColWidth(group_idx));
2393  }
2394  // permuting all agg-columns in this result set into the final buffer:
2395  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2396  slot_idx++) {
2397  const auto input_column_ptr =
2398  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2399  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2400  const auto output_column_ptr =
2401  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2402  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2403  memcpy(output_column_ptr,
2404  input_column_ptr,
2405  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2406  }
2407  ++output_row_index;
2408  }
2409  return output_row_index;
2410 }
2411 
2421 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2422  const ResultSetStorage* output_storage,
2423  size_t output_row_index,
2424  const QueryMemoryDescriptor& output_query_mem_desc,
2425  const std::vector<uint32_t>& top_permutation) {
2426  const auto output_buffer = output_storage->getUnderlyingBuffer();
2427  const auto input_buffer = input_storage->getUnderlyingBuffer();
2428  for (const auto sorted_idx : top_permutation) {
2429  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2430  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2431  row_ptr,
2432  output_query_mem_desc.getRowSize());
2433  ++output_row_index;
2434  }
2435  return output_row_index;
2436 }
2437 } // namespace
2438 
2439 // Collect top results from each device, stitch them together and sort. Partial
2440 // results from each device are guaranteed to be disjunct because we only go on
2441 // this path when one of the columns involved is a shard key.
2443  SharedKernelContext& shared_context,
2444  const RelAlgExecutionUnit& ra_exe_unit) const {
2445  auto& result_per_device = shared_context.getFragmentResults();
2446  const auto first_result_set = result_per_device.front().first;
2447  CHECK(first_result_set);
2448  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2449  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2450  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2451  top_query_mem_desc.setEntryCount(0);
2452  for (auto& result : result_per_device) {
2453  const auto result_set = result.first;
2454  CHECK(result_set);
2455  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2456  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2457  top_query_mem_desc.setEntryCount(new_entry_cnt);
2458  }
2459  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2460  first_result_set->getDeviceType(),
2461  top_query_mem_desc,
2462  first_result_set->getRowSetMemOwner(),
2463  catalog_,
2464  blockSize(),
2465  gridSize());
2466  auto top_storage = top_result_set->allocateStorage();
2467  size_t top_output_row_idx{0};
2468  for (auto& result : result_per_device) {
2469  const auto result_set = result.first;
2470  CHECK(result_set);
2471  const auto& top_permutation = result_set->getPermutationBuffer();
2472  CHECK_LE(top_permutation.size(), top_n);
2473  if (top_query_mem_desc.didOutputColumnar()) {
2474  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2475  result_set->getQueryMemDesc(),
2476  top_storage,
2477  top_output_row_idx,
2478  top_query_mem_desc,
2479  top_permutation);
2480  } else {
2481  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2482  top_storage,
2483  top_output_row_idx,
2484  top_query_mem_desc,
2485  top_permutation);
2486  }
2487  }
2488  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2489  return top_result_set;
2490 }
2491 
2492 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2493  const {
2494  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2495  const auto& join_info = plan_state_->join_info_;
2496  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2497  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2498  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2499  id_to_cond.insert(
2500  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2501  }
2502  return id_to_cond;
2503 }
2504 
2505 namespace {
2506 
2507 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2508  for (const auto& col : fetched_cols) {
2509  if (col.is_lazily_fetched) {
2510  return true;
2511  }
2512  }
2513  return false;
2514 }
2515 
2516 } // namespace
2517 
2518 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2519  SharedKernelContext& shared_context,
2520  const RelAlgExecutionUnit& ra_exe_unit,
2521  ColumnFetcher& column_fetcher,
2522  const std::vector<InputTableInfo>& table_infos,
2523  const ExecutionOptions& eo,
2524  const bool is_agg,
2525  const bool allow_single_frag_table_opt,
2526  const size_t context_count,
2527  const QueryCompilationDescriptor& query_comp_desc,
2529  RenderInfo* render_info,
2530  std::unordered_set<int>& available_gpus,
2531  int& available_cpus) {
2532  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2533 
2534  QueryFragmentDescriptor fragment_descriptor(
2535  ra_exe_unit,
2536  table_infos,
2537  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2539  : std::vector<Data_Namespace::MemoryInfo>{},
2542  CHECK(!ra_exe_unit.input_descs.empty());
2543 
2544  const auto device_type = query_comp_desc.getDeviceType();
2545  const bool uses_lazy_fetch =
2546  plan_state_->allow_lazy_fetch_ &&
2548  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2549  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2550  const auto device_count = deviceCount(device_type);
2551  CHECK_GT(device_count, 0);
2552 
2553  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2554  shared_context.getFragOffsets(),
2555  device_count,
2556  device_type,
2557  use_multifrag_kernel,
2559  this);
2560  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2561  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2562  }
2563 
2564  if (use_multifrag_kernel) {
2565  VLOG(1) << "Creating multifrag execution kernels";
2566  VLOG(1) << query_mem_desc.toString();
2567 
2568  // NB: We should never be on this path when the query is retried because of running
2569  // out of group by slots; also, for scan only queries on CPU we want the
2570  // high-granularity, fragment by fragment execution instead. For scan only queries on
2571  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2572  // buffer per fragment.
2573  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2574  &execution_kernels,
2575  &column_fetcher,
2576  &eo,
2577  &query_comp_desc,
2578  &query_mem_desc,
2579  render_info](const int device_id,
2580  const FragmentsList& frag_list,
2581  const int64_t rowid_lookup_key) {
2582  execution_kernels.emplace_back(
2583  std::make_unique<ExecutionKernel>(ra_exe_unit,
2585  device_id,
2586  eo,
2587  column_fetcher,
2588  query_comp_desc,
2589  query_mem_desc,
2590  frag_list,
2592  render_info,
2593  rowid_lookup_key));
2594  };
2595  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2596  } else {
2597  VLOG(1) << "Creating one execution kernel per fragment";
2598  VLOG(1) << query_mem_desc.toString();
2599 
2600  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2602  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2603  const auto max_frag_size =
2604  table_infos.front().info.getFragmentNumTuplesUpperBound();
2605  if (max_frag_size < query_mem_desc.getEntryCount()) {
2606  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2607  << " to match max fragment size " << max_frag_size
2608  << " for kernel per fragment execution path.";
2609  throw CompilationRetryNewScanLimit(max_frag_size);
2610  }
2611  }
2612 
2613  size_t frag_list_idx{0};
2614  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2615  &execution_kernels,
2616  &column_fetcher,
2617  &eo,
2618  &frag_list_idx,
2619  &device_type,
2620  &query_comp_desc,
2621  &query_mem_desc,
2622  render_info](const int device_id,
2623  const FragmentsList& frag_list,
2624  const int64_t rowid_lookup_key) {
2625  if (!frag_list.size()) {
2626  return;
2627  }
2628  CHECK_GE(device_id, 0);
2629 
2630  execution_kernels.emplace_back(
2631  std::make_unique<ExecutionKernel>(ra_exe_unit,
2632  device_type,
2633  device_id,
2634  eo,
2635  column_fetcher,
2636  query_comp_desc,
2637  query_mem_desc,
2638  frag_list,
2640  render_info,
2641  rowid_lookup_key));
2642  ++frag_list_idx;
2643  };
2644 
2645  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2646  ra_exe_unit);
2647  }
2648 
2649  return execution_kernels;
2650 }
2651 
2653  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2654  const ExecutorDeviceType device_type) {
2655  auto clock_begin = timer_start();
2656  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2657  kernel_queue_time_ms_ += timer_stop(clock_begin);
2658 
2660  // A hack to have unused unit for results collection.
2661  const RelAlgExecutionUnit* ra_exe_unit =
2662  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
2663 
2664 #ifdef HAVE_TBB
2665  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
2666  shared_context.setThreadPool(&tg);
2667  }
2668  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
2669 #endif // HAVE_TBB
2670 
2671  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
2672  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
2673  size_t kernel_idx = 1;
2674  for (auto& kernel : kernels) {
2675  CHECK(kernel.get());
2676  tg.run([this,
2677  &kernel,
2678  &shared_context,
2679  parent_thread_id = logger::thread_id(),
2680  crt_kernel_idx = kernel_idx++] {
2681  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2682  const size_t thread_i = crt_kernel_idx % cpu_threads();
2683  kernel->run(this, thread_i, shared_context);
2684  });
2685  }
2686  tg.wait();
2687 
2688  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2689  // The first arg is used for GPU only, it's not our case.
2690  // TODO: add QueryExecutionContext::getRowSet() interface
2691  // for our case.
2692  if (exec_ctx) {
2693  ResultSetPtr results;
2694  if (ra_exe_unit->estimator) {
2695  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2696  } else {
2697  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2698  }
2699  shared_context.addDeviceResults(std::move(results), {});
2700  }
2701  }
2702 }
2703 
2705  const RelAlgExecutionUnit& ra_exe_unit,
2706  const ExecutorDeviceType device_type,
2707  const size_t table_idx,
2708  const size_t outer_frag_idx,
2709  std::map<int, const TableFragments*>& selected_tables_fragments,
2710  const std::unordered_map<int, const Analyzer::BinOper*>&
2711  inner_table_id_to_join_condition) {
2712  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2713  auto table_frags_it = selected_tables_fragments.find(table_id);
2714  CHECK(table_frags_it != selected_tables_fragments.end());
2715  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2716  const auto outer_table_fragments_it =
2717  selected_tables_fragments.find(outer_input_desc.getTableId());
2718  const auto outer_table_fragments = outer_table_fragments_it->second;
2719  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2720  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2721  if (!table_idx) {
2722  return {outer_frag_idx};
2723  }
2724  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2725  auto& inner_frags = table_frags_it->second;
2726  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2727  std::vector<size_t> all_frag_ids;
2728  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2729  ++inner_frag_idx) {
2730  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2731  if (skipFragmentPair(outer_fragment_info,
2732  inner_frag_info,
2733  table_idx,
2734  inner_table_id_to_join_condition,
2735  ra_exe_unit,
2736  device_type)) {
2737  continue;
2738  }
2739  all_frag_ids.push_back(inner_frag_idx);
2740  }
2741  return all_frag_ids;
2742 }
2743 
2744 // Returns true iff the join between two fragments cannot yield any results, per
2745 // shard information. The pair can be skipped to avoid full broadcast.
2747  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2748  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2749  const int table_idx,
2750  const std::unordered_map<int, const Analyzer::BinOper*>&
2751  inner_table_id_to_join_condition,
2752  const RelAlgExecutionUnit& ra_exe_unit,
2753  const ExecutorDeviceType device_type) {
2754  if (device_type != ExecutorDeviceType::GPU) {
2755  return false;
2756  }
2757  CHECK(table_idx >= 0 &&
2758  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2759  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2760  // Both tables need to be sharded the same way.
2761  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2762  outer_fragment_info.shard == inner_fragment_info.shard) {
2763  return false;
2764  }
2765  const Analyzer::BinOper* join_condition{nullptr};
2766  if (ra_exe_unit.join_quals.empty()) {
2767  CHECK(!inner_table_id_to_join_condition.empty());
2768  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2769  CHECK(condition_it != inner_table_id_to_join_condition.end());
2770  join_condition = condition_it->second;
2771  CHECK(join_condition);
2772  } else {
2773  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2774  plan_state_->join_info_.join_hash_tables_.size());
2775  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2776  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2777  table_idx) {
2778  CHECK(!join_condition);
2779  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2780  }
2781  }
2782  }
2783  if (!join_condition) {
2784  return false;
2785  }
2786  // TODO(adb): support fragment skipping based on the overlaps operator
2787  if (join_condition->is_overlaps_oper()) {
2788  return false;
2789  }
2790  size_t shard_count{0};
2791  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2792  join_condition->get_left_operand())) {
2793  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
2794  join_condition, *getCatalog(), getTemporaryTables())
2795  .first;
2797  join_condition, this, inner_outer_pairs);
2798  } else {
2799  shard_count = get_shard_count(join_condition, this);
2800  }
2801  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2802  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2803  }
2804  return shard_count;
2805 }
2806 
2807 namespace {
2808 
2811  const int table_id = col_desc->getScanDesc().getTableId();
2812  const int col_id = col_desc->getColId();
2813  return get_column_descriptor_maybe(col_id, table_id, cat);
2814 }
2815 
2816 } // namespace
2817 
2818 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2819  const std::vector<InputDescriptor>& input_descs,
2820  const std::map<int, const TableFragments*>& all_tables_fragments) {
2821  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2822  for (auto& desc : input_descs) {
2823  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2824  CHECK(fragments_it != all_tables_fragments.end());
2825  const auto& fragments = *fragments_it->second;
2826  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2827  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2828  frag_offsets[i] = off;
2829  off += fragments[i].getNumTuples();
2830  }
2831  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2832  }
2833  return tab_id_to_frag_offsets;
2834 }
2835 
2836 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2838  const RelAlgExecutionUnit& ra_exe_unit,
2839  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2840  const std::vector<InputDescriptor>& input_descs,
2841  const std::map<int, const TableFragments*>& all_tables_fragments) {
2842  std::vector<std::vector<int64_t>> all_num_rows;
2843  std::vector<std::vector<uint64_t>> all_frag_offsets;
2844  const auto tab_id_to_frag_offsets =
2845  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2846  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2847  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2848  std::vector<int64_t> num_rows;
2849  std::vector<uint64_t> frag_offsets;
2850  if (!ra_exe_unit.union_all) {
2851  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2852  }
2853  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2854  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2855  const auto fragments_it =
2856  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2857  CHECK(fragments_it != all_tables_fragments.end());
2858  const auto& fragments = *fragments_it->second;
2859  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2860  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2861  const auto& fragment = fragments[frag_id];
2862  num_rows.push_back(fragment.getNumTuples());
2863  } else {
2864  size_t total_row_count{0};
2865  for (const auto& fragment : fragments) {
2866  total_row_count += fragment.getNumTuples();
2867  }
2868  num_rows.push_back(total_row_count);
2869  }
2870  const auto frag_offsets_it =
2871  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2872  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2873  const auto& offsets = frag_offsets_it->second;
2874  CHECK_LT(frag_id, offsets.size());
2875  frag_offsets.push_back(offsets[frag_id]);
2876  }
2877  all_num_rows.push_back(num_rows);
2878  // Fragment offsets of outer table should be ONLY used by rowid for now.
2879  all_frag_offsets.push_back(frag_offsets);
2880  }
2881  return {all_num_rows, all_frag_offsets};
2882 }
2883 
2884 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2885 // all the table fragments.
2887  const RelAlgExecutionUnit& ra_exe_unit,
2888  const FragmentsList& selected_fragments) const {
2889  const auto& input_descs = ra_exe_unit.input_descs;
2890  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2891  if (nest_level < 1 ||
2892  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2893  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2894  (ra_exe_unit.join_quals.empty() &&
2895  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2896  return false;
2897  }
2898  const int table_id = inner_col_desc.getScanDesc().getTableId();
2899  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2900  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2901  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2902  return fragments.size() > 1;
2903 }
2904 
2906  const ColumnDescriptor* cd,
2907  const InputColDescriptor& inner_col_desc,
2908  const RelAlgExecutionUnit& ra_exe_unit,
2909  const FragmentsList& selected_fragments,
2910  const Data_Namespace::MemoryLevel memory_level) const {
2911  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2912  const int table_id = inner_col_desc.getScanDesc().getTableId();
2913  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2914  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2915  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2916  auto need_linearize =
2917  cd->columnType.is_array() ||
2919  return table_id > 0 && need_linearize && fragments.size() > 1;
2920 }
2921 
2922 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2923  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2924  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2925  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2926 }
2927 
2929  const ColumnFetcher& column_fetcher,
2930  const RelAlgExecutionUnit& ra_exe_unit,
2931  const int device_id,
2932  const Data_Namespace::MemoryLevel memory_level,
2933  const std::map<int, const TableFragments*>& all_tables_fragments,
2934  const FragmentsList& selected_fragments,
2936  std::list<ChunkIter>& chunk_iterators,
2937  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2938  DeviceAllocator* device_allocator,
2939  const size_t thread_idx,
2940  const bool allow_runtime_interrupt) {
2941  auto timer = DEBUG_TIMER(__func__);
2943  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2944  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2945  std::vector<size_t> local_col_to_frag_pos;
2946  buildSelectedFragsMapping(selected_fragments_crossjoin,
2947  local_col_to_frag_pos,
2948  col_global_ids,
2949  selected_fragments,
2950  ra_exe_unit);
2951 
2953  selected_fragments_crossjoin);
2954  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2955  std::vector<std::vector<int64_t>> all_num_rows;
2956  std::vector<std::vector<uint64_t>> all_frag_offsets;
2957  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2958  std::vector<const int8_t*> frag_col_buffers(
2959  plan_state_->global_to_local_col_ids_.size());
2960  for (const auto& col_id : col_global_ids) {
2961  if (allow_runtime_interrupt) {
2962  bool isInterrupted = false;
2963  {
2964  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2965  const auto query_session = getCurrentQuerySession(session_read_lock);
2966  isInterrupted =
2967  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2968  }
2969  if (isInterrupted) {
2971  }
2972  }
2973  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2975  }
2976  CHECK(col_id);
2977  const int table_id = col_id->getScanDesc().getTableId();
2978  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2979  if (cd && cd->isVirtualCol) {
2980  CHECK_EQ("rowid", cd->columnName);
2981  continue;
2982  }
2983  const auto fragments_it = all_tables_fragments.find(table_id);
2984  CHECK(fragments_it != all_tables_fragments.end());
2985  const auto fragments = fragments_it->second;
2986  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2987  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2988  CHECK_LT(static_cast<size_t>(it->second),
2989  plan_state_->global_to_local_col_ids_.size());
2990  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2991  if (!fragments->size()) {
2992  return {};
2993  }
2994  CHECK_LT(frag_id, fragments->size());
2995  auto memory_level_for_column = memory_level;
2996  auto tbl_col_ids =
2997  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId());
2998  if (plan_state_->columns_to_fetch_.find(tbl_col_ids) ==
2999  plan_state_->columns_to_fetch_.end()) {
3000  memory_level_for_column = Data_Namespace::CPU_LEVEL;
3001  }
3002  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3003  frag_col_buffers[it->second] =
3004  column_fetcher.getResultSetColumn(col_id.get(),
3005  memory_level_for_column,
3006  device_id,
3007  device_allocator,
3008  thread_idx);
3009  } else {
3010  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3011  // determine if we need special treatment to linearlize multi-frag table
3012  // i.e., a column that is classified as varlen type, i.e., array
3013  // for now, we only support fixed-length array that contains
3014  // geo point coordianates but we can support more types in this way
3016  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3017  bool for_lazy_fetch = false;
3018  if (plan_state_->columns_to_not_fetch_.find(tbl_col_ids) !=
3019  plan_state_->columns_to_not_fetch_.end()) {
3020  for_lazy_fetch = true;
3021  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3022  << ", col_name: " << cd->columnName << ")";
3023  }
3024  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3025  table_id,
3026  col_id->getColId(),
3027  all_tables_fragments,
3028  chunks,
3029  chunk_iterators,
3030  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3031  for_lazy_fetch ? 0 : device_id,
3032  device_allocator,
3033  thread_idx);
3034  } else {
3035  frag_col_buffers[it->second] =
3036  column_fetcher.getAllTableColumnFragments(table_id,
3037  col_id->getColId(),
3038  all_tables_fragments,
3039  memory_level_for_column,
3040  device_id,
3041  device_allocator,
3042  thread_idx);
3043  }
3044  } else {
3045  frag_col_buffers[it->second] =
3046  column_fetcher.getOneTableColumnFragment(table_id,
3047  frag_id,
3048  col_id->getColId(),
3049  all_tables_fragments,
3050  chunks,
3051  chunk_iterators,
3052  memory_level_for_column,
3053  device_id,
3054  device_allocator);
3055  }
3056  }
3057  }
3058  all_frag_col_buffers.push_back(frag_col_buffers);
3059  }
3060  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3061  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3062  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3063 }
3064 
3065 // fetchChunks() is written under the assumption that multiple inputs implies a JOIN.
3066 // This is written under the assumption that multiple inputs implies a UNION ALL.
3068  const ColumnFetcher& column_fetcher,
3069  const RelAlgExecutionUnit& ra_exe_unit,
3070  const int device_id,
3071  const Data_Namespace::MemoryLevel memory_level,
3072  const std::map<int, const TableFragments*>& all_tables_fragments,
3073  const FragmentsList& selected_fragments,
3075  std::list<ChunkIter>& chunk_iterators,
3076  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3077  DeviceAllocator* device_allocator,
3078  const size_t thread_idx,
3079  const bool allow_runtime_interrupt) {
3080  auto timer = DEBUG_TIMER(__func__);
3082 
3083  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3084  std::vector<std::vector<int64_t>> all_num_rows;
3085  std::vector<std::vector<uint64_t>> all_frag_offsets;
3086 
3087  CHECK(!selected_fragments.empty());
3088  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3089  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3090  using TableId = int;
3091  TableId const selected_table_id = selected_fragments.front().table_id;
3092  bool const input_descs_index =
3093  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
3094  if (!input_descs_index) {
3095  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
3096  }
3097  bool const input_col_descs_index =
3098  selected_table_id ==
3099  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
3100  if (!input_col_descs_index) {
3101  CHECK_EQ(selected_table_id,
3102  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
3103  }
3104  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
3105  << " selected_table_id=" << selected_table_id
3106  << " input_descs_index=" << int(input_descs_index)
3107  << " input_col_descs_index=" << int(input_col_descs_index)
3108  << " ra_exe_unit.input_descs="
3109  << shared::printContainer(ra_exe_unit.input_descs)
3110  << " ra_exe_unit.input_col_descs="
3111  << shared::printContainer(ra_exe_unit.input_col_descs);
3112 
3113  // Partition col_global_ids by table_id
3114  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
3115  table_id_to_input_col_descs;
3116  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
3117  TableId const table_id = input_col_desc->getScanDesc().getTableId();
3118  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
3119  }
3120  for (auto const& pair : table_id_to_input_col_descs) {
3121  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3122  std::vector<size_t> local_col_to_frag_pos;
3123 
3124  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
3125  local_col_to_frag_pos,
3126  pair.second,
3127  selected_fragments,
3128  ra_exe_unit);
3129 
3131  selected_fragments_crossjoin);
3132 
3133  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3134  if (allow_runtime_interrupt) {
3135  bool isInterrupted = false;
3136  {
3137  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3138  const auto query_session = getCurrentQuerySession(session_read_lock);
3139  isInterrupted =
3140  checkIsQuerySessionInterrupted(query_session, session_read_lock);
3141  }
3142  if (isInterrupted) {
3144  }
3145  }
3146  std::vector<const int8_t*> frag_col_buffers(
3147  plan_state_->global_to_local_col_ids_.size());
3148  for (const auto& col_id : pair.second) {
3149  CHECK(col_id);
3150  const int table_id = col_id->getScanDesc().getTableId();
3151  CHECK_EQ(table_id, pair.first);
3152  const auto cd = try_get_column_descriptor(col_id.get(), cat);
3153  if (cd && cd->isVirtualCol) {
3154  CHECK_EQ("rowid", cd->columnName);
3155  continue;
3156  }
3157  const auto fragments_it = all_tables_fragments.find(table_id);
3158  CHECK(fragments_it != all_tables_fragments.end());
3159  const auto fragments = fragments_it->second;
3160  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3161  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3162  CHECK_LT(static_cast<size_t>(it->second),
3163  plan_state_->global_to_local_col_ids_.size());
3164  const size_t frag_id = ra_exe_unit.union_all
3165  ? 0
3166  : selected_frag_ids[local_col_to_frag_pos[it->second]];
3167  if (!fragments->size()) {
3168  return {};
3169  }
3170  CHECK_LT(frag_id, fragments->size());
3171  auto memory_level_for_column = memory_level;
3172  if (plan_state_->columns_to_fetch_.find(
3173  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
3174  plan_state_->columns_to_fetch_.end()) {
3175  memory_level_for_column = Data_Namespace::CPU_LEVEL;
3176  }
3177  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3178  frag_col_buffers[it->second] =
3179  column_fetcher.getResultSetColumn(col_id.get(),
3180  memory_level_for_column,
3181  device_id,
3182  device_allocator,
3183  thread_idx);
3184  } else {
3185  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3186  frag_col_buffers[it->second] =
3187  column_fetcher.getAllTableColumnFragments(table_id,
3188  col_id->getColId(),
3189  all_tables_fragments,
3190  memory_level_for_column,
3191  device_id,
3192  device_allocator,
3193  thread_idx);
3194  } else {
3195  frag_col_buffers[it->second] =
3196  column_fetcher.getOneTableColumnFragment(table_id,
3197  frag_id,
3198  col_id->getColId(),
3199  all_tables_fragments,
3200  chunks,
3201  chunk_iterators,
3202  memory_level_for_column,
3203  device_id,
3204  device_allocator);
3205  }
3206  }
3207  }
3208  all_frag_col_buffers.push_back(frag_col_buffers);
3209  }
3210  std::vector<std::vector<int64_t>> num_rows;
3211  std::vector<std::vector<uint64_t>> frag_offsets;
3212  std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags(
3213  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3214  all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
3215  all_frag_offsets.insert(
3216  all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
3217  }
3218  // The hack below assumes a particular table traversal order which is not
3219  // always achieved due to unordered map in the outermost loop. According
3220  // to the code below we expect NULLs in even positions of all_frag_col_buffers[0]
3221  // and odd positions of all_frag_col_buffers[1]. As an additional hack we
3222  // swap these vectors if NULLs are not on expected positions.
3223  if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
3224  !all_frag_col_buffers[0][1]) {
3225  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
3226  }
3227  // UNION ALL hacks.
3228  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers);
3229  for (size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
3230  all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
3231  }
3232  if (input_descs_index == input_col_descs_index) {
3233  std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
3234  }
3235 
3236  VLOG(2) << "all_frag_col_buffers=" << shared::printContainer(all_frag_col_buffers)
3237  << " all_num_rows=" << shared::printContainer(all_num_rows)
3238  << " all_frag_offsets=" << shared::printContainer(all_frag_offsets)
3239  << " input_col_descs_index=" << input_col_descs_index;
3240  return {{all_frag_col_buffers[input_descs_index]},
3241  {{all_num_rows[0][input_descs_index]}},
3242  {{all_frag_offsets[0][input_descs_index]}}};
3243 }
3244 
3245 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3246  const size_t scan_idx,
3247  const RelAlgExecutionUnit& ra_exe_unit) {
3248  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3249  scan_idx > 0 &&
3250  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3251  !selected_fragments[scan_idx].fragment_ids.empty()) {
3252  // Fetch all fragments
3253  return {size_t(0)};
3254  }
3255 
3256  return selected_fragments[scan_idx].fragment_ids;
3257 }
3258 
3260  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3261  std::vector<size_t>& local_col_to_frag_pos,
3262  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3263  const FragmentsList& selected_fragments,
3264  const RelAlgExecutionUnit& ra_exe_unit) {
3265  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3266  size_t frag_pos{0};
3267  const auto& input_descs = ra_exe_unit.input_descs;
3268  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3269  const int table_id = input_descs[scan_idx].getTableId();
3270  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
3271  selected_fragments_crossjoin.push_back(
3272  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3273  for (const auto& col_id : col_global_ids) {
3274  CHECK(col_id);
3275  const auto& input_desc = col_id->getScanDesc();
3276  if (input_desc.getTableId() != table_id ||
3277  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3278  continue;
3279  }
3280  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3281  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3282  CHECK_LT(static_cast<size_t>(it->second),
3283  plan_state_->global_to_local_col_ids_.size());
3284  local_col_to_frag_pos[it->second] = frag_pos;
3285  }
3286  ++frag_pos;
3287  }
3288 }
3289 
3291  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3292  std::vector<size_t>& local_col_to_frag_pos,
3293  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3294  const FragmentsList& selected_fragments,
3295  const RelAlgExecutionUnit& ra_exe_unit) {
3296  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3297  size_t frag_pos{0};
3298  const auto& input_descs = ra_exe_unit.input_descs;
3299  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3300  const int table_id = input_descs[scan_idx].getTableId();
3301  // selected_fragments here is from assignFragsToKernelDispatch
3302  // execution_kernel.fragments
3303  if (selected_fragments[0].table_id != table_id) { // TODO 0
3304  continue;
3305  }
3306  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
3307  selected_fragments_crossjoin.push_back(
3308  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3309  {size_t(1)}); // TODO
3310  for (const auto& col_id : col_global_ids) {
3311  CHECK(col_id);
3312  const auto& input_desc = col_id->getScanDesc();
3313  if (input_desc.getTableId() != table_id ||
3314  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3315  continue;
3316  }
3317  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3318  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3319  CHECK_LT(static_cast<size_t>(it->second),
3320  plan_state_->global_to_local_col_ids_.size());
3321  local_col_to_frag_pos[it->second] = frag_pos;
3322  }
3323  ++frag_pos;
3324  }
3325 }
3326 
3327 namespace {
3328 
3330  public:
3331  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3333  for (auto out : out_vec_) {
3334  delete[] out;
3335  }
3336  }
3337 
3338  private:
3339  std::vector<int64_t*> out_vec_;
3340 };
3341 } // namespace
3342 
3344  const RelAlgExecutionUnit& ra_exe_unit,
3345  const CompilationResult& compilation_result,
3346  const bool hoist_literals,
3347  ResultSetPtr* results,
3348  const std::vector<Analyzer::Expr*>& target_exprs,
3349  const ExecutorDeviceType device_type,
3350  std::vector<std::vector<const int8_t*>>& col_buffers,
3351  QueryExecutionContext* query_exe_context,
3352  const std::vector<std::vector<int64_t>>& num_rows,
3353  const std::vector<std::vector<uint64_t>>& frag_offsets,
3354  Data_Namespace::DataMgr* data_mgr,
3355  const int device_id,
3356  const uint32_t start_rowid,
3357  const uint32_t num_tables,
3358  const bool allow_runtime_interrupt,
3359  RenderInfo* render_info,
3360  const int64_t rows_to_process) {
3362  auto timer = DEBUG_TIMER(__func__);
3363  CHECK(!results || !(*results));
3364  if (col_buffers.empty()) {
3365  return 0;
3366  }
3367 
3368  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3369  if (render_info) {
3370  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3371  // here, we are in non-insitu mode.
3372  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
3373  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3374  "currently unsupported.";
3375  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3376  }
3377 
3378  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3379  std::vector<int64_t*> out_vec;
3380  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3381  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3382  std::unique_ptr<OutVecOwner> output_memory_scope;
3383  if (allow_runtime_interrupt) {
3384  bool isInterrupted = false;
3385  {
3386  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3387  const auto query_session = getCurrentQuerySession(session_read_lock);
3388  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3389  }
3390  if (isInterrupted) {
3392  }
3393  }
3394  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3396  }
3397  if (device_type == ExecutorDeviceType::CPU) {
3398  CpuCompilationContext* cpu_generated_code =
3399  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3400  CHECK(cpu_generated_code);
3401  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3402  cpu_generated_code,
3403  hoist_literals,
3404  hoist_buf,
3405  col_buffers,
3406  num_rows,
3407  frag_offsets,
3408  0,
3409  &error_code,
3410  num_tables,
3411  join_hash_table_ptrs,
3412  rows_to_process);
3413  output_memory_scope.reset(new OutVecOwner(out_vec));
3414  } else {
3415  GpuCompilationContext* gpu_generated_code =
3416  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3417  CHECK(gpu_generated_code);
3418  try {
3419  out_vec = query_exe_context->launchGpuCode(
3420  ra_exe_unit,
3421  gpu_generated_code,
3422  hoist_literals,
3423  hoist_buf,
3424  col_buffers,
3425  num_rows,
3426  frag_offsets,
3427  0,
3428  data_mgr,
3429  blockSize(),
3430  gridSize(),
3431  device_id,
3432  compilation_result.gpu_smem_context.getSharedMemorySize(),
3433  &error_code,
3434  num_tables,
3435  allow_runtime_interrupt,
3436  join_hash_table_ptrs,
3437  render_allocator_map_ptr);
3438  output_memory_scope.reset(new OutVecOwner(out_vec));
3439  } catch (const OutOfMemory&) {
3440  return ERR_OUT_OF_GPU_MEM;
3441  } catch (const std::exception& e) {
3442  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3443  }
3444  }
3445  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3446  error_code == Executor::ERR_DIV_BY_ZERO ||
3447  error_code == Executor::ERR_OUT_OF_TIME ||
3448  error_code == Executor::ERR_INTERRUPTED ||
3450  error_code == Executor::ERR_GEOS ||
3452  return error_code;
3453  }
3454  if (ra_exe_unit.estimator) {
3455  CHECK(!error_code);
3456  if (results) {
3457  *results =
3458  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3459  }
3460  return 0;
3461  }
3462  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3463  CHECK(results);
3464  std::vector<int64_t> reduced_outs;
3465  const auto num_frags = col_buffers.size();
3466  const size_t entry_count =
3467  device_type == ExecutorDeviceType::GPU
3468  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3469  ? 1
3470  : blockSize() * gridSize() * num_frags)
3471  : num_frags;
3472  if (size_t(1) == entry_count) {
3473  for (auto out : out_vec) {
3474  CHECK(out);
3475  reduced_outs.push_back(*out);
3476  }
3477  } else {
3478  size_t out_vec_idx = 0;
3479 
3480  for (const auto target_expr : target_exprs) {
3481  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3482  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3483  << target_expr->toString();
3484 
3485  const int num_iterations = agg_info.sql_type.is_geometry()
3486  ? agg_info.sql_type.get_physical_coord_cols()
3487  : 1;
3488 
3489  for (int i = 0; i < num_iterations; i++) {
3490  int64_t val1;
3491  const bool float_argument_input = takes_float_argument(agg_info);
3492  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_QUANTILE) {
3493  CHECK(agg_info.agg_kind == kCOUNT ||
3494  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
3495  agg_info.agg_kind == kAPPROX_QUANTILE);
3496  val1 = out_vec[out_vec_idx][0];
3497  error_code = 0;
3498  } else {
3499  const auto chosen_bytes = static_cast<size_t>(
3500  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3501  std::tie(val1, error_code) = Executor::reduceResults(
3502  agg_info.agg_kind,
3503  agg_info.sql_type,
3504  query_exe_context->getAggInitValForIndex(out_vec_idx),
3505  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3506  out_vec[out_vec_idx],
3507  entry_count,
3508  false,
3509  float_argument_input);
3510  }
3511  if (error_code) {
3512  break;
3513  }
3514  reduced_outs.push_back(val1);
3515  if (agg_info.agg_kind == kAVG ||
3516  (agg_info.agg_kind == kSAMPLE &&
3517  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3518  const auto chosen_bytes = static_cast<size_t>(
3519  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3520  1));
3521  int64_t val2;
3522  std::tie(val2, error_code) = Executor::reduceResults(
3523  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3524  agg_info.sql_type,
3525  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3526  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3527  out_vec[out_vec_idx + 1],
3528  entry_count,
3529  false,
3530  false);
3531  if (error_code) {
3532  break;
3533  }
3534  reduced_outs.push_back(val2);
3535  ++out_vec_idx;
3536  }
3537  ++out_vec_idx;
3538  }
3539  }
3540  }
3541 
3542  if (error_code) {
3543  return error_code;
3544  }
3545 
3546  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3547  auto rows_ptr = std::shared_ptr<ResultSet>(
3548  query_exe_context->query_buffers_->result_sets_[0].release());
3549  rows_ptr->fillOneEntry(reduced_outs);
3550  *results = std::move(rows_ptr);
3551  return error_code;
3552 }
3553 
3554 namespace {
3555 
3556 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3557  CHECK(scan_limit);
3558  return results && results->rowCount() < scan_limit;
3559 }
3560 
3561 } // namespace
3562 
3564  const RelAlgExecutionUnit& ra_exe_unit,
3565  const CompilationResult& compilation_result,
3566  const bool hoist_literals,
3567  ResultSetPtr* results,
3568  const ExecutorDeviceType device_type,
3569  std::vector<std::vector<const int8_t*>>& col_buffers,
3570  const std::vector<size_t> outer_tab_frag_ids,
3571  QueryExecutionContext* query_exe_context,
3572  const std::vector<std::vector<int64_t>>& num_rows,
3573  const std::vector<std::vector<uint64_t>>& frag_offsets,
3574  Data_Namespace::DataMgr* data_mgr,
3575  const int device_id,
3576  const int outer_table_id,
3577  const int64_t scan_limit,
3578  const uint32_t start_rowid,
3579  const uint32_t num_tables,
3580  const bool allow_runtime_interrupt,
3581  RenderInfo* render_info,
3582  const int64_t rows_to_process) {
3583  auto timer = DEBUG_TIMER(__func__);
3585  // TODO: get results via a separate method, but need to do something with literals.
3586  CHECK(!results || !(*results));
3587  if (col_buffers.empty()) {
3588  return 0;
3589  }
3590  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3591  // TODO(alex):
3592  // 1. Optimize size (make keys more compact).
3593  // 2. Resize on overflow.
3594  // 3. Optimize runtime.
3595  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3596  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3597  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3598  if (allow_runtime_interrupt) {
3599  bool isInterrupted = false;
3600  {
3601  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3602  const auto query_session = getCurrentQuerySession(session_read_lock);
3603  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3604  }
3605  if (isInterrupted) {
3607  }
3608  }
3609  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3610  return ERR_INTERRUPTED;
3611  }
3612 
3613  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3614  if (render_info && render_info->useCudaBuffers()) {
3615  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3616  }
3617 
3618  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3619  << " ra_exe_unit.input_descs="
3620  << shared::printContainer(ra_exe_unit.input_descs)
3621  << " ra_exe_unit.input_col_descs="
3622  << shared::printContainer(ra_exe_unit.input_col_descs)
3623  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3624  << " num_rows=" << shared::printContainer(num_rows)
3625  << " frag_offsets=" << shared::printContainer(frag_offsets)
3626  << " query_exe_context->query_buffers_->num_rows_="
3627  << query_exe_context->query_buffers_->num_rows_
3628  << " query_exe_context->query_mem_desc_.getEntryCount()="
3629  << query_exe_context->query_mem_desc_.getEntryCount()
3630  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3631  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3632  << " num_tables=" << num_tables;
3633 
3634  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3635  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3636  // with outer_table_id.
3637  if (ra_exe_unit_copy.union_all) {
3638  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3639  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3640  ra_exe_unit_copy.input_descs.end(),
3641  [outer_table_id](auto const& a, auto const& b) {
3642  return a.getTableId() == outer_table_id &&
3643  b.getTableId() != outer_table_id;
3644  });
3645  while (!ra_exe_unit_copy.input_descs.empty() &&
3646  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3647  ra_exe_unit_copy.input_descs.pop_back();
3648  }
3649  // Filter ra_exe_unit_copy.input_col_descs.
3650  ra_exe_unit_copy.input_col_descs.remove_if(
3651  [outer_table_id](auto const& input_col_desc) {
3652  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3653  });
3654  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3655  }
3656 
3657  if (device_type == ExecutorDeviceType::CPU) {
3658  const int32_t scan_limit_for_query =
3659  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3660  const int32_t max_matched = scan_limit_for_query == 0
3661  ? query_exe_context->query_mem_desc_.getEntryCount()
3662  : scan_limit_for_query;
3663  CpuCompilationContext* cpu_generated_code =
3664  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3665  CHECK(cpu_generated_code);
3666  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3667  cpu_generated_code,
3668  hoist_literals,
3669  hoist_buf,
3670  col_buffers,
3671  num_rows,
3672  frag_offsets,
3673  max_matched,
3674  &error_code,
3675  num_tables,
3676  join_hash_table_ptrs,
3677  rows_to_process);
3678  } else {
3679  try {
3680  GpuCompilationContext* gpu_generated_code =
3681  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3682  CHECK(gpu_generated_code);
3683  query_exe_context->launchGpuCode(
3684  ra_exe_unit_copy,
3685  gpu_generated_code,
3686  hoist_literals,
3687  hoist_buf,
3688  col_buffers,
3689  num_rows,
3690  frag_offsets,
3691  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3692  data_mgr,
3693  blockSize(),
3694  gridSize(),
3695  device_id,
3696  compilation_result.gpu_smem_context.getSharedMemorySize(),
3697  &error_code,
3698  num_tables,
3699  allow_runtime_interrupt,
3700  join_hash_table_ptrs,
3701  render_allocator_map_ptr);
3702  } catch (const OutOfMemory&) {
3703  return ERR_OUT_OF_GPU_MEM;
3704  } catch (const OutOfRenderMemory&) {
3705  return ERR_OUT_OF_RENDER_MEM;
3706  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3708  } catch (const std::exception& e) {
3709  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3710  }
3711  }
3712 
3713  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3714  error_code == Executor::ERR_DIV_BY_ZERO ||
3715  error_code == Executor::ERR_OUT_OF_TIME ||
3716  error_code == Executor::ERR_INTERRUPTED ||
3718  error_code == Executor::ERR_GEOS ||
3720  return error_code;
3721  }
3722 
3723  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3724  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3725  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
3726  query_exe_context->query_mem_desc_);
3727  CHECK(*results);
3728  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
3729  (*results)->holdLiterals(hoist_buf);
3730  }
3731  if (error_code < 0 && render_allocator_map_ptr) {
3732  auto const adjusted_scan_limit =
3733  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3734  // More rows passed the filter than available slots. We don't have a count to check,
3735  // so assume we met the limit if a scan limit is set
3736  if (adjusted_scan_limit != 0) {
3737  return 0;
3738  } else {
3739  return error_code;
3740  }
3741  }
3742  if (results && error_code &&
3743  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
3744  return error_code; // unlucky, not enough results and we ran out of slots
3745  }
3746 
3747  return 0;
3748 }
3749 
3750 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3751  const int device_id) {
3752  std::vector<int8_t*> table_ptrs;
3753  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3754  for (auto hash_table : join_hash_tables) {
3755  if (!hash_table) {
3756  CHECK(table_ptrs.empty());
3757  return {};
3758  }
3759  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3760  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3761  }
3762  return table_ptrs;
3763 }
3764 
3765 void Executor::nukeOldState(const bool allow_lazy_fetch,
3766  const std::vector<InputTableInfo>& query_infos,
3767  const PlanState::DeletedColumnsMap& deleted_cols_map,
3768  const RelAlgExecutionUnit* ra_exe_unit) {
3771  const bool contains_left_deep_outer_join =
3772  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3773  ra_exe_unit->join_quals.end(),
3774  [](const JoinCondition& join_condition) {
3775  return join_condition.type == JoinType::LEFT;
3776  }) != ra_exe_unit->join_quals.end();
3777  cgen_state_.reset(
3778  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
3779  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3780  query_infos,
3781  deleted_cols_map,
3782  this));
3783 }
3784 
3785 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3786  const std::vector<InputTableInfo>& query_infos) {
3788  const auto ld_count = input_descs.size();
3789  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3790  for (size_t i = 0; i < ld_count; ++i) {
3791  CHECK_LT(i, query_infos.size());
3792  const auto frag_count = query_infos[i].info.fragments.size();
3793  if (i > 0) {
3794  cgen_state_->frag_offsets_.push_back(nullptr);
3795  } else {
3796  if (frag_count > 1) {
3797  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
3798  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3799  } else {
3800  cgen_state_->frag_offsets_.push_back(nullptr);
3801  }
3802  }
3803  }
3804 }
3805 
3807  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3808  const std::vector<InputTableInfo>& query_infos,
3809  const MemoryLevel memory_level,
3810  const JoinType join_type,
3811  const HashType preferred_hash_type,
3812  ColumnCacheMap& column_cache,
3813  const HashTableBuildDagMap& hashtable_build_dag_map,
3814  const RegisteredQueryHint& query_hint,
3815  const TableIdToNodeMap& table_id_to_node_map) {
3816  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3817  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3818  }
3819  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3821  }
3822  try {
3823  auto tbl = HashJoin::getInstance(qual_bin_oper,
3824  query_infos,
3825  memory_level,
3826  join_type,
3827  preferred_hash_type,
3828  deviceCountForMemoryLevel(memory_level),
3829  column_cache,
3830  this,
3831  hashtable_build_dag_map,
3832  query_hint,
3833  table_id_to_node_map);
3834  return {tbl, ""};
3835  } catch (const HashJoinFail& e) {
3836  return {nullptr, e.what()};
3837  }
3838 }
3839 
3840 int8_t Executor::warpSize() const {
3841  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3842  CHECK(!dev_props.empty());
3843  return dev_props.front().warpSize;
3844 }
3845 
3846 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3847 // exist?
3848 unsigned Executor::gridSize() const {
3849  CHECK(data_mgr_);
3850  const auto cuda_mgr = data_mgr_->getCudaMgr();
3851  if (!cuda_mgr) {
3852  return 0;
3853  }
3854  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3855 }
3856 
3857 unsigned Executor::numBlocksPerMP() const {
3858  return grid_size_x_ ? std::ceil(grid_size_x_ / cudaMgr()->getMinNumMPsForAllDevices())
3859  : 2;
3860 }
3861 
3862 unsigned Executor::blockSize() const {
3863  CHECK(data_mgr_);
3864  const auto cuda_mgr = data_mgr_->getCudaMgr();
3865  if (!cuda_mgr) {
3866  return 0;
3867  }
3868  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3869  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3870 }
3871 
3873  return max_gpu_slab_size_;
3874 }
3875 
3876 int64_t Executor::deviceCycles(int milliseconds) const {
3877  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3878  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3879 }
3880 
3881 llvm::Value* Executor::castToFP(llvm::Value* value,
3882  SQLTypeInfo const& from_ti,
3883  SQLTypeInfo const& to_ti) {
3885  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3886  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3887  llvm::Type* fp_type{nullptr};
3888  switch (to_ti.get_size()) {
3889  case 4:
3890  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3891  break;
3892  case 8:
3893  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3894  break;
3895  default:
3896  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3897  }
3898  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3899  if (from_ti.get_scale()) {
3900  value = cgen_state_->ir_builder_.CreateFDiv(
3901  value,
3902  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3903  }
3904  }
3905  return value;
3906 }
3907 
3908 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3910  CHECK(val->getType()->isPointerTy());
3911 
3912  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3913  const auto val_type = val_ptr_type->getElementType();
3914  size_t val_width = 0;
3915  if (val_type->isIntegerTy()) {
3916  val_width = val_type->getIntegerBitWidth();
3917  } else {
3918  if (val_type->isFloatTy()) {
3919  val_width = 32;
3920  } else {
3921  CHECK(val_type->isDoubleTy());
3922  val_width = 64;
3923  }
3924  }
3925  CHECK_LT(size_t(0), val_width);
3926  if (bitWidth == val_width) {
3927  return val;
3928  }
3929  return cgen_state_->ir_builder_.CreateBitCast(
3930  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3931 }
3932 
3933 #define EXECUTE_INCLUDE
3934 #include "ArrayOps.cpp"
3935 #include "DateAdd.cpp"
3936 #include "GeoOps.cpp"
3937 #include "StringFunctions.cpp"
3939 #undef EXECUTE_INCLUDE
3940 
3941 namespace {
3943  const ColumnDescriptor* deleted_cd) {
3944  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3945  if (deleted_cols_it == deleted_cols_map.end()) {
3946  CHECK(
3947  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3948  } else {
3949  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3950  }
3951 }
3952 } // namespace
3953 
3954 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3955  const RelAlgExecutionUnit& ra_exe_unit,
3956  const CompilationOptions& co) {
3957  if (!co.filter_on_deleted_column) {
3958  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3959  }
3960  auto ra_exe_unit_with_deleted = ra_exe_unit;
3961  PlanState::DeletedColumnsMap deleted_cols_map;
3962  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3963  if (input_table.getSourceType() != InputSourceType::TABLE) {
3964  continue;
3965  }
3966  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3967  CHECK(td);
3968  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3969  if (!deleted_cd) {
3970  continue;
3971  }
3972  CHECK(deleted_cd->columnType.is_boolean());
3973  // check deleted column is not already present
3974  bool found = false;
3975  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3976  if (input_col.get()->getColId() == deleted_cd->columnId &&
3977  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3978  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3979  found = true;
3980  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3981  break;
3982  }
3983  }
3984  if (!found) {
3985  // add deleted column
3986  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3987  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3988  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3989  }
3990  }
3991  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3992 }
3993 
3994 namespace {
3995 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3996 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3997 // return type.
3998 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3999  const int64_t chunk_min,
4000  const int64_t chunk_max,
4001  const SQLTypeInfo& lhs_type,
4002  const SQLTypeInfo& rhs_type) {
4003  const int32_t ldim = lhs_type.get_dimension();
4004  const int32_t rdim = rhs_type.get_dimension();
4005  CHECK(ldim != rdim);
4006  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
4007  if (ldim > rdim) {
4008  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
4009  return {true, chunk_min / scale, chunk_max / scale};
4010  }
4011 
4012  using checked_int64_t = boost::multiprecision::number<
4013  boost::multiprecision::cpp_int_backend<64,
4014  64,
4015  boost::multiprecision::signed_magnitude,
4016  boost::multiprecision::checked,
4017  void>>;
4018 
4019  try {
4020  auto ret =
4021  std::make_tuple(true,
4022  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
4023  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
4024  return ret;
4025  } catch (const std::overflow_error& e) {
4026  // noop
4027  }
4028  return std::make_tuple(false, chunk_min, chunk_max);
4029 }
4030 
4031 } // namespace
4032 
4034  const int table_id,
4035  const Fragmenter_Namespace::FragmentInfo& fragment) {
4036  // Skip temporary tables
4037  if (table_id < 0) {
4038  return false;
4039  }
4040 
4041  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
4042  CHECK(td);
4043  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
4044  if (!deleted_cd) {
4045  return false;
4046  }
4047 
4048  const auto& chunk_type = deleted_cd->columnType;
4049  CHECK(chunk_type.is_boolean());
4050 
4051  const auto deleted_col_id = deleted_cd->columnId;
4052  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4053  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4054  const int64_t chunk_min =
4055  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4056  const int64_t chunk_max =
4057  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4058  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4059  // is true (signifying all rows deleted)
4060  return true;
4061  }
4062  }
4063  return false;
4064 }
4065 
4067  const Analyzer::BinOper* comp_expr,
4068  const Analyzer::ColumnVar* lhs_col,
4069  const Fragmenter_Namespace::FragmentInfo& fragment,
4070  const Analyzer::Constant* rhs_const) const {
4071  const int col_id = lhs_col->get_column_id();
4072  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4073  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4075  }
4076  double chunk_min{0.};
4077  double chunk_max{0.};
4078  const auto& chunk_type = lhs_col->get_type_info();
4079  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4080  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4081  if (chunk_min > chunk_max) {
4083  }
4084 
4085  const auto datum_fp = rhs_const->get_constval();
4086  const auto rhs_type = rhs_const->get_type_info().get_type();
4087  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4088 
4089  // Do we need to codegen the constant like the integer path does?
4090  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4091 
4092  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4093  // slightly tricky due to do cleanly as we do not have rowid on this path
4094  switch (comp_expr->get_optype()) {
4095  case kGE:
4096  if (chunk_max < rhs_val) {
4098  }
4099  break;
4100  case kGT:
4101  if (chunk_max <= rhs_val) {
4103  }
4104  break;
4105  case kLE:
4106  if (chunk_min > rhs_val) {
4108  }
4109  break;
4110  case kLT:
4111  if (chunk_min >= rhs_val) {
4113  }
4114  break;
4115  case kEQ:
4116  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4118  }
4119  break;
4120  default:
4121  break;
4122  }
4124 }
4125 
4126 std::pair<bool, int64_t> Executor::skipFragment(
4127  const InputDescriptor& table_desc,
4128  const Fragmenter_Namespace::FragmentInfo& fragment,
4129  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4130  const std::vector<uint64_t>& frag_offsets,
4131  const size_t frag_idx) {
4132  const int table_id = table_desc.getTableId();
4133 
4134  // First check to see if all of fragment is deleted, in which case we know we can skip
4135  if (isFragmentFullyDeleted(table_id, fragment)) {
4136  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4137  << ", fragment id: " << frag_idx;
4138  return {true, -1};
4139  }
4140 
4141  for (const auto& simple_qual : simple_quals) {
4142  const auto comp_expr =
4143  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4144  if (!comp_expr) {
4145  // is this possible?
4146  return {false, -1};
4147  }
4148  const auto lhs = comp_expr->get_left_operand();
4149  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4150  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4151  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4152  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4153  if (lhs_uexpr) {
4154  CHECK(lhs_uexpr->get_optype() ==
4155  kCAST); // We should have only been passed a cast expression
4156  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4157  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4158  continue;
4159  }
4160  } else {
4161  continue;
4162  }
4163  }
4164  const auto rhs = comp_expr->get_right_operand();
4165  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4166  if (!rhs_const) {
4167  // is this possible?
4168  return {false, -1};
4169  }
4170  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4171  !lhs->get_type_info().is_fp()) {
4172  continue;
4173  }
4174 
4175  if (lhs->get_type_info().is_fp()) {
4176  const auto fragment_skip_status =
4177  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4178  switch (fragment_skip_status) {
4180  return {true, -1};
4182  return {false, -1};
4184  continue;
4185  default:
4186  UNREACHABLE();
4187  }
4188  }
4189 
4190  // Everything below is logic for integer and integer-backed timestamps
4191  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4192 
4193  const int col_id = lhs_col->get_column_id();
4194  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4195  int64_t chunk_min{0};
4196  int64_t chunk_max{0};
4197  bool is_rowid{false};
4198  size_t start_rowid{0};
4199  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4200  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
4201  if (cd->isVirtualCol) {
4202  CHECK(cd->columnName == "rowid");
4203  const auto& table_generation = getTableGeneration(table_id);
4204  start_rowid = table_generation.start_rowid;
4205  chunk_min = frag_offsets[frag_idx] + start_rowid;
4206  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4207  is_rowid = true;
4208  }
4209  } else {
4210  const auto& chunk_type = lhs_col->get_type_info();
4211  chunk_min =
4212  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4213  chunk_max =
4214  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4215  }
4216  if (chunk_min > chunk_max) {
4217  // invalid metadata range, do not skip fragment
4218  return {false, -1};
4219  }
4220  if (lhs->get_type_info().is_timestamp() &&
4221  (lhs_col->get_type_info().get_dimension() !=
4222  rhs_const->get_type_info().get_dimension()) &&
4223  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4224  rhs_const->get_type_info().is_high_precision_timestamp())) {
4225  // If original timestamp lhs col has different precision,
4226  // column metadata holds value in original precision
4227  // therefore adjust rhs value to match lhs precision
4228 
4229  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4230  // artificially limit the lhs column range. RHS overflow/underflow is already
4231  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4232  bool is_valid;
4233  std::tie(is_valid, chunk_min, chunk_max) =
4235  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4236  if (!is_valid) {
4237  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4238  "value: "
4239  << std::to_string(chunk_min)
4240  << "\nChunk max value: " << std::to_string(chunk_max)
4241  << "\nLHS col precision is: "
4242  << std::to_string(lhs_col->get_type_info().get_dimension())
4243  << "\nRHS precision is: "
4244  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4245  return {false, -1};
4246  }
4247  }
4248  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4249  // It is obvious that a cast from timestamp to date is happening here,
4250  // so we have to correct the chunk min and max values to lower the precision as of
4251  // the date
4252  chunk_min = DateTruncateHighPrecisionToDate(
4253  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4254  chunk_max = DateTruncateHighPrecisionToDate(
4255  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4256  }
4257  llvm::LLVMContext local_context;
4258  CgenState local_cgen_state(local_context);
4259  CodeGenerator code_generator(&local_cgen_state, nullptr);
4260 
4261  const auto rhs_val =
4262  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4263 
4264  switch (comp_expr->get_optype()) {
4265  case kGE:
4266  if (chunk_max < rhs_val) {
4267  return {true, -1};
4268  }
4269  break;
4270  case kGT:
4271  if (chunk_max <= rhs_val) {
4272  return {true, -1};
4273  }
4274  break;
4275  case kLE:
4276  if (chunk_min > rhs_val) {
4277  return {true, -1};
4278  }
4279  break;
4280  case kLT:
4281  if (chunk_min >= rhs_val) {
4282  return {true, -1};
4283  }
4284  break;
4285  case kEQ:
4286  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4287  return {true, -1};
4288  } else if (is_rowid) {
4289  return {false, rhs_val - start_rowid};
4290  }
4291  break;
4292  default:
4293  break;
4294  }
4295  }
4296  return {false, -1};
4297 }
4298 
4299 /*
4300  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4301  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4302  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4303  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4304  * if at least one of these skipFragment calls return a true statment in its first value.
4305  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4306  * second value only if its first value is "false".
4307  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4308  * i.e., we only skip if none of the quals trigger the code to update the
4309  * rowid_lookup_key
4310  * - Only AND operations are valid and considered:
4311  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4312  * the skip
4313  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4314  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4315  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4316  * implemented differently, which cause the first one to skip correctly, but the second
4317  * one will not skip.
4318  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4319  * possible
4320  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4321  * projection, no skipping
4322  */
4323 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4324  const InputDescriptor& table_desc,
4325  const RelAlgExecutionUnit& ra_exe_unit,
4326  const Fragmenter_Namespace::FragmentInfo& fragment,
4327  const std::vector<uint64_t>& frag_offsets,
4328  const size_t frag_idx) {
4329  std::pair<bool, int64_t> skip_frag{false, -1};
4330  for (auto& inner_join : ra_exe_unit.join_quals) {
4331  if (inner_join.type != JoinType::INNER) {
4332  continue;
4333  }
4334 
4335  // extracting all the conjunctive simple_quals from the quals stored for the inner
4336  // join
4337  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4338  for (auto& qual : inner_join.quals) {
4339  auto temp_qual = qual_to_conjunctive_form(qual);
4340  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4341  temp_qual.simple_quals.begin(),
4342  temp_qual.simple_quals.end());
4343  }
4344  auto temp_skip_frag = skipFragment(
4345  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4346  if (temp_skip_frag.second != -1) {
4347  skip_frag.second = temp_skip_frag.second;
4348  return skip_frag;
4349  } else {
4350  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4351  }
4352  }
4353  return skip_frag;
4354 }
4355 
4357  const std::unordered_set<PhysicalInput>& phys_inputs) {
4358  AggregatedColRange agg_col_range_cache;
4359  CHECK(catalog_);
4360  std::unordered_set<int> phys_table_ids;
4361  for (const auto& phys_input : phys_inputs) {
4362  phys_table_ids.insert(phys_input.table_id);
4363  }
4364  std::vector<InputTableInfo> query_infos;
4365  for (const int table_id : phys_table_ids) {
4366  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
4367  }
4368  for (const auto& phys_input : phys_inputs) {
4369  const auto cd =
4370  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4371  CHECK(cd);
4372  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4373  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
4374  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
4375  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4376  agg_col_range_cache.setColRange(phys_input, col_range);
4377  }
4378  }
4379  return agg_col_range_cache;
4380 }
4381 
4383  const std::unordered_set<PhysicalInput>& phys_inputs) {
4384  StringDictionaryGenerations string_dictionary_generations;
4385  CHECK(catalog_);
4386  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4387  // the case then we need to populate them here to make sure that the generations are set
4388  // correctly.
4389  prepare_string_dictionaries(phys_inputs, *catalog_);
4390  for (const auto& phys_input : phys_inputs) {
4391  const auto cd =
4392  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4393  CHECK(cd);
4394  const auto& col_ti =
4395  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4396  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4397  const int dict_id = col_ti.get_comp_param();
4398  const auto dd = catalog_->getMetadataForDict(dict_id);
4399  CHECK(dd && dd->stringDict);
4400  string_dictionary_generations.setGeneration(dict_id,
4401  dd->stringDict->storageEntryCount());
4402  }
4403  }
4404  return string_dictionary_generations;
4405 }
4406 
4408  std::unordered_set<int> phys_table_ids) {
4409  TableGenerations table_generations;
4410  for (const int table_id : phys_table_ids) {
4411  const auto table_info = getTableInfo(table_id);
4412  table_generations.setGeneration(
4413  table_id,
4414  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4415  }
4416  return table_generations;
4417 }
4418 
4419 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4420  const std::unordered_set<int>& phys_table_ids) {
4421  CHECK(catalog_);
4423  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
4424  row_set_mem_owner_->setDictionaryGenerations(
4425  computeStringDictionaryGenerations(phys_inputs));
4427  table_generations_ = computeTableGenerations(phys_table_ids);
4428 }
4429 
4431  return recycler_mutex_;
4432 }
4433 
4435  return query_plan_dag_cache_;
4436 }
4437 
4440 }
4441 
4443  return executor_session_mutex_;
4444 }
4445 
4447  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4448  return current_query_session_;
4449 }
4450 
4451 bool Executor::checkCurrentQuerySession(const QuerySessionId& candidate_query_session,
4452  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4453  // if current_query_session is equal to the candidate_query_session,
4454  // or it is empty session we consider
4455  return !candidate_query_session.empty() &&
4456  (current_query_session_ == candidate_query_session);
4457 }
4458 
4459 // used only for testing
4461  const QuerySessionId& candidate_query_session,
4462  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4463  if (queries_session_map_.count(candidate_query_session) &&
4464  !queries_session_map_.at(candidate_query_session).empty()) {
4465  return queries_session_map_.at(candidate_query_session)
4466  .begin()
4467  ->second.getQueryStatus();
4468  }
4469  return QuerySessionStatus::QueryStatus::UNDEFINED;
4470 }
4471 
4473  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4475 }
4476 
4478  const QuerySessionId& query_session_id,
4479  const std::string& query_str,
4480  const std::string& query_submitted_time) {
4481  if (!query_session_id.empty()) {
4482  // if session is valid, do update 1) the exact executor id and 2) query status
4483  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
4485  query_session_id, query_submitted_time, executor_id_, write_lock);
4486  updateQuerySessionStatusWithLock(query_session_id,
4487  query_submitted_time,
4488  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4489  write_lock);
4490  }
4491  return {query_session_id, query_str};
4492 }
4493 
4495  // check whether we are okay to execute the "pending" query
4496  // i.e., before running the query check if this query session is "ALREADY" interrupted
4497  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4498  if (query_session.empty()) {
4499  return;
4500  }
4501  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
4502  // something goes wrong since we assume this is caller's responsibility
4503  // (call this function only for enrolled query session)
4504  if (!queries_session_map_.count(query_session)) {
4505  VLOG(1) << "Interrupting pending query is not available since the query session is "
4506  "not enrolled";
4507  } else {
4508  // here the query session is enrolled but the interrupt flag is not registered
4509  VLOG(1)
4510  << "Interrupting pending query is not available since its interrupt flag is "
4511  "not registered";
4512  }
4513  return;
4514  }
4515  if (queries_interrupt_flag_[query_session]) {
4517  }
4518 }
4519 
4521  const std::string& submitted_time_str) {
4522  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4523  // clear the interrupt-related info for a finished query
4524  if (query_session.empty()) {
4525  return;
4526  }
4527  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
4528  if (query_session.compare(current_query_session_) == 0) {
4529  invalidateRunningQuerySession(session_write_lock);
4530  resetInterrupt();
4531  }
4532 }
4533 
4535  const QuerySessionId& query_session,
4536  const std::string& submitted_time_str,
4537  const QuerySessionStatus::QueryStatus new_query_status) {
4538  // update the running query session's the current status
4539  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4540  if (query_session.empty()) {
4541  return;
4542  }
4543  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4544  current_query_session_ = query_session;
4545  }
4547  query_session, submitted_time_str, new_query_status, session_write_lock);
4548 }
4549 
4551  const QuerySessionId& query_session,
4552  const std::string& query_str,
4553  const std::string& submitted_time_str,
4554  const size_t executor_id,
4555  const QuerySessionStatus::QueryStatus query_session_status) {
4556  // enroll the query session into the Executor's session map
4557  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
4558  if (query_session.empty()) {
4559  return;
4560  }
4561 
4562  addToQuerySessionList(query_session,
4563  query_str,
4564  submitted_time_str,
4565  executor_id,
4566  query_session_status,
4567  session_write_lock);
4568 
4569  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4570  current_query_session_ = query_session;
4571  }
4572 }
4573 
4575  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4576  return queries_session_map_.size();
4577 }
4578 
4580  const std::string& query_str,
4581  const std::string& submitted_time_str,
4582  const size_t executor_id,
4583  const QuerySessionStatus::QueryStatus query_status,
4584  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4585  // an internal API that enrolls the query session into the Executor's session map
4586  if (queries_session_map_.count(query_session)) {
4587  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4588  queries_session_map_.at(query_session).erase(submitted_time_str);
4589  queries_session_map_.at(query_session)
4590  .emplace(submitted_time_str,
4591  QuerySessionStatus(query_session,
4592  executor_id,
4593  query_str,
4594  submitted_time_str,
4595  query_status));
4596  } else {
4597  queries_session_map_.at(query_session)
4598  .emplace(submitted_time_str,
4599  QuerySessionStatus(query_session,
4600  executor_id,
4601  query_str,
4602  submitted_time_str,
4603  query_status));
4604  }
4605  } else {
4606  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4607  executor_per_query_map.emplace(
4608  submitted_time_str,
4610  query_session, executor_id, query_str, submitted_time_str, query_status));
4611  queries_session_map_.emplace(query_session, executor_per_query_map);
4612  }
4613  return queries_interrupt_flag_.emplace(query_session, false).second;
4614 }
4615 
4617  const QuerySessionId& query_session,
4618  const std::string& submitted_time_str,
4619  const QuerySessionStatus::QueryStatus updated_query_status,
4620  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4621  // an internal API that updates query session status
4622  if (query_session.empty()) {
4623  return false;
4624  }
4625  if (queries_session_map_.count(query_session)) {
4626  for (auto& query_status : queries_session_map_.at(query_session)) {
4627  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4628  // no time difference --> found the target query status
4629  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4630  auto prev_status = query_status.second.getQueryStatus();
4631  if (prev_status == updated_query_status) {
4632  return false;
4633  }
4634  query_status.second.setQueryStatus(updated_query_status);
4635  return true;
4636  }
4637  }
4638  }
4639  return false;
4640 }
4641 
4643  const QuerySessionId& query_session,
4644  const std::string& submitted_time_str,
4645  const size_t executor_id,
4646  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4647  // update the executor id of the query session
4648  if (query_session.empty()) {
4649  return false;
4650  }
4651  if (queries_session_map_.count(query_session)) {
4652  auto storage = queries_session_map_.at(query_session);
4653  for (auto it = storage.begin(); it != storage.end(); it++) {
4654  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4655  // no time difference --> found the target query status
4656  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4657  queries_session_map_.at(query_session)
4658  .at(submitted_time_str)
4659  .setExecutorId(executor_id);
4660  return true;
4661  }
4662  }
4663  }
4664  return false;
4665 }
4666 
4668  const QuerySessionId& query_session,
4669  const std::string& submitted_time_str,
4670  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4671  if (query_session.empty()) {
4672  return false;
4673  }
4674  if (queries_session_map_.count(query_session)) {
4675  auto& storage = queries_session_map_.at(query_session);
4676  if (storage.size() > 1) {
4677  // in this case we only remove query executor info
4678  for (auto it = storage.begin(); it != storage.end(); it++) {
4679  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4680  // no time difference && have the same executor id--> found the target query
4681  if (it->second.getExecutorId() == executor_id_ &&
4682  submitted_time_str.compare(target_submitted_t_str) == 0) {
4683  storage.erase(it);
4684  return true;
4685  }
4686  }
4687  } else if (storage.size() == 1) {
4688  // here this session only has a single query executor
4689  // so we clear both executor info and its interrupt flag
4690  queries_session_map_.erase(query_session);
4691  queries_interrupt_flag_.erase(query_session);
4692  if (interrupted_.load()) {
4693  interrupted_.store(false);
4694  }
4695  return true;
4696  }
4697  }
4698  return false;
4699 }
4700 
4702  const QuerySessionId& query_session,
4703  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
4704  if (query_session.empty()) {
4705  return;
4706  }
4707  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4708  queries_interrupt_flag_[query_session] = true;
4709  }
4710 }
4711 
4713  const QuerySessionId& query_session,
4714  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4715  if (query_session.empty()) {
4716  return false;
4717  }
4718  auto flag_it = queries_interrupt_flag_.find(query_session);
4719  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4720  flag_it->second;
4721 }
4722 
4724  const QuerySessionId& query_session,
4725  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4726  if (query_session.empty()) {
4727  return false;
4728  }
4729  return !query_session.empty() && queries_session_map_.count(query_session);
4730 }
4731 
4733  const double runtime_query_check_freq,
4734  const unsigned pending_query_check_freq) const {
4735  // The only one scenario that we intentionally call this function is
4736  // to allow runtime query interrupt in QueryRunner for test cases.
4737  // Because test machine's default setting does not allow runtime query interrupt,
4738  // so we have to turn it on within test code if necessary.
4740  g_pending_query_interrupt_freq = pending_query_check_freq;
4741  g_running_query_interrupt_freq = runtime_query_check_freq;
4744  }
4745 }
4746 
4747 void Executor::addToCardinalityCache(const std::string& cache_key,
4748  const size_t cache_value) {
4750  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4751  cardinality_cache_[cache_key] = cache_value;
4752  VLOG(1) << "Put estimated cardinality to the cache";
4753  }
4754 }
4755 
4757  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4759  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4760  VLOG(1) << "Reuse cached cardinality";
4761  return {true, cardinality_cache_[cache_key]};
4762  }
4763  return {false, -1};
4764 }
4765 
4766 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4767  const QuerySessionId& query_session,
4768  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
4769  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4770  auto& query_infos = queries_session_map_.at(query_session);
4771  std::vector<QuerySessionStatus> ret;
4772  for (auto& info : query_infos) {
4773  ret.push_back(QuerySessionStatus(query_session,
4774  info.second.getExecutorId(),
4775  info.second.getQueryStr(),
4776  info.second.getQuerySubmittedTime(),
4777  info.second.getQueryStatus()));
4778  }
4779  return ret;
4780  }
4781  return {};
4782 }
4783 
4784 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
4785  const QuerySessionId& interrupt_session) const {
4786  std::vector<size_t> res;
4787  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4788  auto it = queries_session_map_.find(interrupt_session);
4789  if (it != queries_session_map_.end()) {
4790  for (auto& kv : it->second) {
4791  if (kv.second.getQueryStatus() ==
4792  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4793  res.push_back(kv.second.getExecutorId());
4794  }
4795  }
4796  }
4797  return res;
4798 }
4799 
4801  // this function should be called within an executor which is assigned
4802  // to the specific query thread (that indicates we already enroll the session)
4803  // check whether this is called from non unitary executor
4805  return false;
4806  };
4807  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4808  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
4809  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
4810  flag_it->second;
4811 }
4812 
4814  // this function is called under the recycler lock
4815  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
4816  latest_query_plan_extracted_ = query_plan_dag;
4817 }
4818 
4820  mapd_shared_lock<mapd_shared_mutex> lock(recycler_mutex_);
4822 }
4823 
4824 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4825 
4826 // contain the interrupt flag's status per query session
4828 // contain a list of queries per query session
4830 // session lock
4832 
4835 
4839 
4841 std::mutex Executor::kernel_mutex_;
4842 
4845 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
4846 // Executor has a single global result set recycler holder
4847 // which contains two recyclers related to query resultset
4850 
4851 // Useful for debugging.
4852 std::string Executor::dumpCache() const {
4853  std::stringstream ss;
4854  ss << "colRangeCache: ";
4855  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
4856  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
4857  << "} = " << exp_range.toString() << ", ";
4858  }
4859  ss << "stringDictGenerations: ";
4860  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4861  ss << "{" << key << "} = " << val << ", ";
4862  }
4863  ss << "tableGenerations: ";
4864  for (auto& [key, val] : table_generations_.asMap()) {
4865  ss << "{" << key << "} = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
4866  }
4867  ss << "\n";
4868  return ss.str();
4869 }
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4616
int get_table_id() const
Definition: Analyzer.h:200
const TableGeneration & getGeneration(const uint32_t id) const
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:222
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4446
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1981
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1303
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4356
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4732
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:231
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
Definition: Execute.cpp:4819
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2518
std::vector< int > ChunkKey
Definition: types.h:37
double g_running_query_interrupt_freq
Definition: Execute.cpp:129
ExtModuleKinds
Definition: Execute.h:469
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:37
void reduce(SpeculativeTopNMap &that)
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2809
static QuerySessionMap queries_session_map_
Definition: Execute.h:1309
CudaMgr_Namespace::CudaMgr * cudaMgr() const
Definition: Execute.h:672
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1314
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3563
int64_t kernel_queue_time_ms_
Definition: Execute.h:1291
JoinType
Definition: sqldefs.h:136
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3872
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool useCudaBuffers() const
Definition: RenderInfo.cpp:73
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::DataMgr * data_mgr_
Definition: Execute.h:1287
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
Definition: Execute.h:1292
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:83
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
Definition: Execute.cpp:1249
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4472
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
Definition: Execute.cpp:284
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:610
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:4494
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
Definition: Execute.cpp:592
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1622
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1351
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3750
void setEntryCount(const size_t val)
input_table_info_cache_(this)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1560
grid_size_x_(grid_size_x)
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:666
std::atomic< bool > interrupted_
Definition: Execute.h:1262
static ResultSetRecyclerHolder resultset_recycler_holder_
Definition: Execute.h:1336
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2704
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3998
ExecutorDeviceType
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:2061
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:229
double extract_max_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static const int max_gpu_count
Definition: Execute.h:1255
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:3331
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:128
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:153
#define LOG(tag)
Definition: Logger.h:217
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
void freeLinearizedBuf()
std::string QueryPlanDAG
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:679
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:989
bool is_fp() const
Definition: sqltypes.h:514
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:85
static mapd_shared_mutex executors_cache_mutex_
Definition: Execute.h:1331
Definition: sqldefs.h:35
const std::list< Analyzer::OrderEntry > order_entries
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:204
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:729
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:4534
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:437
Definition: sqldefs.h:36
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1438
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3954
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:4407
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:718
#define UNREACHABLE()
Definition: Logger.h:267
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
Definition: Execute.cpp:211
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1461
ExpressionRange getColRange(const PhysicalInput &) const
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:236
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:1053
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Functions to support geospatial operations used by the executor.
QuerySessionId current_query_session_
Definition: Execute.h:1305
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:375
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4451
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3343
static const int32_t ERR_GEOS
Definition: Execute.h:1357
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1301
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1260
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1222
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:2230
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4550
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1464
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:89
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1259
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2652
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
Definition: Execute.cpp:4066
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3290
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:1038
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:628
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3908
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:686
void reset(bool discard_runtime_modules_only=false)
Definition: Execute.cpp:313
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1378
bool g_is_test_env
Definition: Execute.cpp:141
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3857
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:515
#define CHECK_GT(x, y)
Definition: Logger.h:235
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:636
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4766
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:2143
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2421
std::vector< FragmentsPerTable > FragmentsList
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
bool is_time() const
Definition: sqltypes.h:516
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2886
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:394
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4712
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const int source_dict_id, const int dest_dict_id, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:573
mapd_shared_mutex & getSessionLock()
Definition: Execute.cpp:4442
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:102
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:4800
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:507
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:91
std::vector< Analyzer::Expr * > target_exprs_union
void populate_string_dictionary(const int32_t table_id, const int32_t col_id, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:221
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:674
constexpr double a
Definition: Utm.h:32
bool g_enable_string_functions
static const size_t high_scan_limit
Definition: Execute.h:602
Definition: sqldefs.h:75
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:81
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:483
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3785
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:4033
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4723
SQLOps get_optype() const
Definition: Analyzer.h:446
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3067
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1355
const ExecutorId executor_id_
Definition: Execute.h:1196
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:228
int8_t warpSize() const
Definition: Execute.cpp:3840
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:86
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:1282
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
Definition: Execute.cpp:1270
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1411
std::string get_cuda_home(void)
Definition: CudaMgr.cpp:465
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:2327
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:647
int8_t groupColWidth(const size_t key_idx) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1343
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
static SysCatalog & instance()
Definition: SysCatalog.h:337
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4579
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:166
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
int getDeviceCount() const
Definition: CudaMgr.h:86
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1452
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
int64_t deviceCycles(int milliseconds) const
D