OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
41 #include "Parser/ParserNode.h"
72 #include "Shared/checked_alloc.h"
73 #include "Shared/measure.h"
74 #include "Shared/misc.h"
75 #include "Shared/scope.h"
76 #include "Shared/shard_key.h"
77 #include "Shared/threading.h"
78 
79 bool g_enable_watchdog{false};
83 size_t g_cpu_sub_task_size{500'000};
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118  256}; // minimum memory allocation required for projection query output buffer
119  // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131  4096}; // GPU shared memory threshold (in bytes), if larger
132  // buffer sizes are required we do not use GPU shared
133  // memory optimizations Setting this to 0 means unlimited
134  // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136  true}; // enable use of shared memory when performing group-by with select non-count
137  // aggregates
138 bool g_enable_smem_non_grouped_agg{
139  true}; // enable optimizations for using GPU shared memory in implementation of
140  // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142  // limits the allocation for the output buffer arena
143  // and data recycler test
144 size_t g_enable_parallel_linearization{
145  10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
157 bool g_optimize_cuda_block_and_grid_sizes{false};
158 
159 size_t g_approx_quantile_buffer{1000};
160 size_t g_approx_quantile_centroids{300};
161 
162 bool g_enable_automatic_ir_metadata{true};
163 
164 size_t g_max_log_length{500};
165 
166 extern bool g_cache_string_hash;
167 
168 int const Executor::max_gpu_count;
169 
170 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
171 
172 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
173 
174 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
175  const std::string& udf_ir_filename,
176  llvm::LLVMContext& ctx);
177 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
178  const std::string& udf_ir_filename,
179  llvm::LLVMContext& ctx,
180  bool is_gpu = false);
181 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
182  const std::string& udf_ir_string,
183  llvm::LLVMContext& ctx,
184  bool is_gpu = false);
185 
186 namespace {
187 // This function is notably different from that in RelAlgExecutor because it already
188 // expects SPI values and therefore needs to avoid that transformation.
189 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
190  for (const auto [col_id, table_id, db_id] : phys_inputs) {
191  foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
192  }
193 }
194 
195 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
196  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
197  // The fragmenter always returns at least one fragment, even when the table is empty.
198  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
199 }
200 } // namespace
201 
202 namespace foreign_storage {
203 // Foreign tables skip the population of dictionaries during metadata scan. This function
204 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
205 void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id) {
206  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
207  CHECK(catalog);
208  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
209  catalog->getMetadataForTable(table_id, false))) {
210  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
211  if (col_desc->columnType.is_dict_encoded_type()) {
212  auto& fragmenter = foreign_table->fragmenter;
213  CHECK(fragmenter != nullptr);
214  if (is_empty_table(fragmenter.get())) {
215  return;
216  }
217  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
218  ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
219  // If the key is sharded across leaves, only populate fragments that are sharded
220  // to this leaf.
221  if (key_does_not_shard_to_leaf(chunk_key)) {
222  continue;
223  }
224 
225  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
226  CHECK(metadata_map.find(col_id) != metadata_map.end());
227  if (auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
228  // When this goes out of scope it will stay in CPU cache but become
229  // evictable
230  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
231  &(catalog->getDataMgr()),
232  chunk_key,
234  0,
235  0,
236  0);
237  }
238  }
239  }
240  }
241 }
242 } // namespace foreign_storage
243 
244 Executor::Executor(const ExecutorId executor_id,
245  Data_Namespace::DataMgr* data_mgr,
246  const size_t block_size_x,
247  const size_t grid_size_x,
248  const size_t max_gpu_slab_size,
249  const std::string& debug_dir,
250  const std::string& debug_file)
251  : executor_id_(executor_id)
252  , context_(new llvm::LLVMContext())
253  , cgen_state_(new CgenState({}, false, this))
254  , block_size_x_(block_size_x)
255  , grid_size_x_(grid_size_x)
256  , max_gpu_slab_size_(max_gpu_slab_size)
257  , debug_dir_(debug_dir)
258  , debug_file_(debug_file)
259  , data_mgr_(data_mgr)
260  , temporary_tables_(nullptr)
263  update_extension_modules();
264 }
265 
270  auto root_path = heavyai::get_root_abs_path();
271  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
272  CHECK(boost::filesystem::exists(template_path));
274  template_path;
275 #ifdef ENABLE_GEOS
276  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
277  CHECK(boost::filesystem::exists(rt_geos_path));
279  rt_geos_path;
280 #endif
281 #ifdef HAVE_CUDA
282  auto rt_libdevice_path = get_cuda_libdevice_dir() + "/libdevice.10.bc";
283  if (boost::filesystem::exists(rt_libdevice_path)) {
285  rt_libdevice_path;
286  } else {
287  LOG(WARNING) << "File " << rt_libdevice_path
288  << " does not exist; support for some UDF "
289  "functions might not be available.";
290  }
291 #endif
292  }
293 }
294 
295 void Executor::reset(bool discard_runtime_modules_only) {
296  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
297  auto qe = QueryEngine::getInstance();
298  qe->s_code_accessor->clear();
299  qe->s_stubs_accessor->clear();
300  qe->cpu_code_accessor->clear();
301  qe->gpu_code_accessor->clear();
302  qe->tf_code_accessor->clear();
303 
304  if (discard_runtime_modules_only) {
305  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
306 #ifdef HAVE_CUDA
307  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
308 #endif
309  cgen_state_->module_ = nullptr;
310  } else {
311  extension_modules_.clear();
312  cgen_state_.reset();
313  context_.reset(new llvm::LLVMContext());
314  cgen_state_.reset(new CgenState({}, false, this));
315  }
316 }
317 
318 void Executor::update_extension_modules(bool update_runtime_modules_only) {
319  auto read_module = [&](Executor::ExtModuleKinds module_kind,
320  const std::string& source) {
321  /*
322  source can be either a filename of a LLVM IR
323  or LLVM BC source, or a string containing
324  LLVM IR code.
325  */
326  CHECK(!source.empty());
327  switch (module_kind) {
331  return read_llvm_module_from_bc_file(source, getContext());
332  }
334  return read_llvm_module_from_ir_file(source, getContext(), false);
335  }
337  return read_llvm_module_from_ir_file(source, getContext(), true);
338  }
340  return read_llvm_module_from_ir_string(source, getContext(), false);
341  }
343  return read_llvm_module_from_ir_string(source, getContext(), true);
344  }
345  default: {
346  UNREACHABLE();
347  return std::unique_ptr<llvm::Module>();
348  }
349  }
350  };
351  auto update_module = [&](Executor::ExtModuleKinds module_kind,
352  bool erase_not_found = false) {
353  auto it = Executor::extension_module_sources.find(module_kind);
354  if (it != Executor::extension_module_sources.end()) {
355  auto llvm_module = read_module(module_kind, it->second);
356  if (llvm_module) {
357  extension_modules_[module_kind] = std::move(llvm_module);
358  } else if (erase_not_found) {
359  extension_modules_.erase(module_kind);
360  } else {
361  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
362  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
363  << " LLVM module. The module will be unavailable.";
364  } else {
365  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
366  << " LLVM module. Using the existing module.";
367  }
368  }
369  } else {
370  if (erase_not_found) {
371  extension_modules_.erase(module_kind);
372  } else {
373  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
374  LOG(WARNING) << "Source of " << ::toString(module_kind)
375  << " LLVM module is unavailable. The module will be unavailable.";
376  } else {
377  LOG(WARNING) << "Source of " << ::toString(module_kind)
378  << " LLVM module is unavailable. Using the existing module.";
379  }
380  }
381  }
382  };
383 
384  if (!update_runtime_modules_only) {
385  // required compile-time modules, their requirements are enforced
386  // by Executor::initialize_extension_module_sources():
388 #ifdef ENABLE_GEOS
390 #endif
391  // load-time modules, these are optional:
392  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
393 #ifdef HAVE_CUDA
394  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
396 #endif
397  }
398  // run-time modules, these are optional and erasable:
399  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
400 #ifdef HAVE_CUDA
401  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
402 #endif
403 }
404 
405 // Used by StubGenerator::generateStub
407  : executor_(executor)
408  , lock_queue_clock_(timer_start())
409  , lock_(executor_.compilation_mutex_)
410  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
411 {
412  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
413  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
414 }
415 
417  Executor& executor,
418  const bool allow_lazy_fetch,
419  const std::vector<InputTableInfo>& query_infos,
420  const PlanState::DeletedColumnsMap& deleted_cols_map,
421  const RelAlgExecutionUnit* ra_exe_unit)
422  : executor_(executor)
423  , lock_queue_clock_(timer_start())
425  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
426 {
427  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
428  // nukeOldState creates new CgenState and PlanState instances for
429  // the subsequent code generation. It also resets
430  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
431  // not currently restore.. should we accumulate these timings?
432  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
433 }
434 
436  // prevent memory leak from hoisted literals
437  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
438  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
439  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
440  // The llvm::Value instance stored in p.first is created by the
441  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
442  p.first->deleteValue();
443  }
444  }
445  executor_.cgen_state_->row_func_hoisted_literals_.clear();
446 
447  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
448  // to the old CgenState instance as the execution of the generated
449  // code uses these bitmaps
450 
451  for (auto& str_dict_translation_mgr :
452  executor_.cgen_state_->str_dict_translation_mgrs_) {
453  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
454  }
455  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
456 
457  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
458  cgen_state_->moveInValuesBitmap(bm);
459  }
460  executor_.cgen_state_->in_values_bitmaps_.clear();
461 
462  // Delete worker module that may have been set by
463  // set_module_shallow_copy. If QueryMustRunOnCpu is thrown, the
464  // worker module is not instantiated, so the worker module needs to
465  // be deleted conditionally [see "Managing LLVM modules" comment in
466  // CgenState.h]:
467  if (executor_.cgen_state_->module_) {
468  delete executor_.cgen_state_->module_;
469  }
470 
471  // restore the old CgenState instance
472  executor_.cgen_state_.reset(cgen_state_.release());
473 }
474 
475 std::shared_ptr<Executor> Executor::getExecutor(
476  const ExecutorId executor_id,
477  const std::string& debug_dir,
478  const std::string& debug_file,
479  const SystemParameters& system_parameters) {
481  auto it = executors_.find(executor_id);
482  if (it != executors_.end()) {
483  return it->second;
484  }
486  auto executor = std::make_shared<Executor>(executor_id,
487  &data_mgr,
488  system_parameters.cuda_block_size,
489  system_parameters.cuda_grid_size,
490  system_parameters.max_gpu_slab_size,
491  debug_dir,
492  debug_file);
493  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
494  return executor;
495 }
496 
498  switch (memory_level) {
502  execute_mutex_); // Don't flush memory while queries are running
503 
504  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
505  // The hash table cache uses CPU memory not managed by the buffer manager. In the
506  // future, we should manage these allocations with the buffer manager directly.
507  // For now, assume the user wants to purge the hash table cache when they clear
508  // CPU memory (currently used in ExecuteTest to lower memory pressure)
510  }
513  break;
514  }
515  default: {
516  throw std::runtime_error(
517  "Clearing memory levels other than the CPU level or GPU level is not "
518  "supported.");
519  }
520  }
521 }
522 
524  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
525 }
526 
528  const shared::StringDictKey& dict_id_in,
529  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
530  const bool with_generation) const {
531  CHECK(row_set_mem_owner);
532  std::lock_guard<std::mutex> lock(
533  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
534  return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
535 }
536 
538  const shared::StringDictKey& dict_key_in,
539  const bool with_generation) {
540  const int dict_id{dict_key_in.dict_id < 0 ? REGULAR_DICT(dict_key_in.dict_id)
541  : dict_key_in.dict_id};
542  const auto catalog =
544  if (catalog) {
545  const auto dd = catalog->getMetadataForDict(dict_id);
546  if (dd) {
547  auto dict_key = dict_key_in;
548  dict_key.dict_id = dict_id;
549  CHECK(dd->stringDict);
550  CHECK_LE(dd->dictNBits, 32);
551  const int64_t generation =
552  with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
553  return addStringDict(dd->stringDict, dict_key, generation);
554  }
555  }
557  if (!lit_str_dict_proxy_) {
558  DictRef literal_dict_ref(dict_key_in.db_id, DictRef::literalsDictId);
559  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
560  literal_dict_ref, "", false, true, g_cache_string_hash);
561  lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
562  tsd, shared::StringDictKey{literal_dict_ref.dbId, literal_dict_ref.dictId}, 0);
563  }
564  return lit_str_dict_proxy_.get();
565 }
566 
568  const shared::StringDictKey& source_dict_key,
569  const shared::StringDictKey& dest_dict_key,
570  const RowSetMemoryOwner::StringTranslationType translation_type,
571  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
572  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
573  const bool with_generation) const {
574  CHECK(row_set_mem_owner);
575  std::lock_guard<std::mutex> lock(
576  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
577  return row_set_mem_owner->getOrAddStringProxyTranslationMap(
578  source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
579 }
580 
583  const StringDictionaryProxy* source_proxy,
584  StringDictionaryProxy* dest_proxy,
585  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
586  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
587  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
588  CHECK(row_set_mem_owner);
589  std::lock_guard<std::mutex> lock(
590  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
591  // First translate lhs onto itself if there are string ops
592  if (!dest_string_op_infos.empty()) {
593  row_set_mem_owner->addStringProxyUnionTranslationMap(
594  dest_proxy, dest_proxy, dest_string_op_infos);
595  }
596  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
597  source_proxy, dest_proxy, source_string_op_infos);
598 }
599 
602  const shared::StringDictKey& source_dict_key,
603  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
604  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
605  const bool with_generation) const {
606  CHECK(row_set_mem_owner);
607  std::lock_guard<std::mutex> lock(
608  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
609  return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
610  source_dict_key, with_generation, string_op_infos);
611 }
612 
614  const shared::StringDictKey& source_dict_key_in,
615  const shared::StringDictKey& dest_dict_key_in,
616  const bool with_generation,
617  const RowSetMemoryOwner::StringTranslationType translation_type,
618  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
619  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
620  const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
622  return addStringProxyIntersectionTranslationMap(
623  source_proxy, dest_proxy, string_op_infos);
624  } else {
625  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
626  }
627 }
628 
631  const shared::StringDictKey& source_dict_key_in,
632  const bool with_generation,
633  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
634  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
635  return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
636 }
637 
639  std::lock_guard<std::mutex> lock(state_mutex_);
640  return t_digests_
641  .emplace_back(std::make_unique<quantile::TDigest>(
643  .get();
644 }
645 
646 bool Executor::isCPUOnly() const {
647  CHECK(data_mgr_);
648  return !data_mgr_->getCudaMgr();
649 }
650 
652  const Analyzer::ColumnVar* col_var) const {
653  return get_column_descriptor_maybe(col_var->getColumnKey());
654 }
655 
657  const Analyzer::ColumnVar* col_var,
658  int n) const {
659  const auto cd = getColumnDescriptor(col_var);
660  if (!cd || n > cd->columnType.get_physical_cols()) {
661  return nullptr;
662  }
663  auto column_key = col_var->getColumnKey();
664  column_key.column_id += n;
665  return get_column_descriptor_maybe(column_key);
666 }
667 
668 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
669  return row_set_mem_owner_;
670 }
671 
673  return temporary_tables_;
674 }
675 
677  const shared::TableKey& table_key) const {
678  return input_table_info_cache_.getTableInfo(table_key);
679 }
680 
682  const shared::TableKey& table_key) const {
683  return table_generations_.getGeneration(table_key);
684 }
685 
687  return agg_col_range_cache_.getColRange(phys_input);
688 }
689 
691  const std::set<shared::TableKey>& table_ids_to_fetch) const {
692  size_t num_bytes = 0;
693  if (!plan_state_) {
694  return 0;
695  }
696  for (const auto& fetched_col : plan_state_->columns_to_fetch_) {
697  if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
698  continue;
699  }
700 
701  if (fetched_col.table_id < 0) {
702  num_bytes += 8;
703  } else {
705  {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
706  const auto& ti = cd->columnType;
707  const auto sz = ti.get_size();
708  if (sz < 0) {
709  // for varlen types, only account for the pointer/size for each row, for now
710  if (!ti.is_logical_geo_type()) {
711  // Don't count size for logical geo types, as they are
712  // backed by physical columns
713  num_bytes += 16;
714  }
715  } else {
716  num_bytes += sz;
717  }
718  }
719  }
720  return num_bytes;
721 }
722 
724  const std::vector<Analyzer::Expr*>& target_exprs) const {
726  for (const auto target_expr : target_exprs) {
727  if (plan_state_->isLazyFetchColumn(target_expr)) {
728  return true;
729  }
730  }
731  return false;
732 }
733 
734 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
735  const std::vector<Analyzer::Expr*>& target_exprs) const {
737  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
738  for (const auto target_expr : target_exprs) {
739  if (!plan_state_->isLazyFetchColumn(target_expr)) {
740  col_lazy_fetch_info.emplace_back(
741  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
742  } else {
743  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
744  CHECK(col_var);
745  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
746  const auto cd = get_column_descriptor_maybe(col_var->getColumnKey());
747  if (cd && IS_GEO(cd->columnType.get_type())) {
748  // Geo coords cols will be processed in sequence. So we only need to track the
749  // first coords col in lazy fetch info.
750  {
751  auto col_key = col_var->getColumnKey();
752  col_key.column_id += 1;
753  const auto cd0 = get_column_descriptor(col_key);
754  const auto col0_ti = cd0->columnType;
755  CHECK(!cd0->isVirtualCol);
756  const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
757  const auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
758  col_lazy_fetch_info.emplace_back(
759  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
760  }
761  } else {
762  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
763  const auto& col_ti = col_var->get_type_info();
764  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
765  }
766  }
767  }
768  return col_lazy_fetch_info;
769 }
770 
775 }
776 
777 std::vector<int8_t> Executor::serializeLiterals(
778  const std::unordered_map<int, CgenState::LiteralValues>& literals,
779  const int device_id) {
780  if (literals.empty()) {
781  return {};
782  }
783  const auto dev_literals_it = literals.find(device_id);
784  CHECK(dev_literals_it != literals.end());
785  const auto& dev_literals = dev_literals_it->second;
786  size_t lit_buf_size{0};
787  std::vector<std::string> real_strings;
788  std::vector<std::vector<double>> double_array_literals;
789  std::vector<std::vector<int8_t>> align64_int8_array_literals;
790  std::vector<std::vector<int32_t>> int32_array_literals;
791  std::vector<std::vector<int8_t>> align32_int8_array_literals;
792  std::vector<std::vector<int8_t>> int8_array_literals;
793  for (const auto& lit : dev_literals) {
794  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
795  if (lit.which() == 7) {
796  const auto p = boost::get<std::string>(&lit);
797  CHECK(p);
798  real_strings.push_back(*p);
799  } else if (lit.which() == 8) {
800  const auto p = boost::get<std::vector<double>>(&lit);
801  CHECK(p);
802  double_array_literals.push_back(*p);
803  } else if (lit.which() == 9) {
804  const auto p = boost::get<std::vector<int32_t>>(&lit);
805  CHECK(p);
806  int32_array_literals.push_back(*p);
807  } else if (lit.which() == 10) {
808  const auto p = boost::get<std::vector<int8_t>>(&lit);
809  CHECK(p);
810  int8_array_literals.push_back(*p);
811  } else if (lit.which() == 11) {
812  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
813  CHECK(p);
814  if (p->second == 64) {
815  align64_int8_array_literals.push_back(p->first);
816  } else if (p->second == 32) {
817  align32_int8_array_literals.push_back(p->first);
818  } else {
819  CHECK(false);
820  }
821  }
822  }
823  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
824  throw TooManyLiterals();
825  }
826  int16_t crt_real_str_off = lit_buf_size;
827  for (const auto& real_str : real_strings) {
828  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
829  lit_buf_size += real_str.size();
830  }
831  if (double_array_literals.size() > 0) {
832  lit_buf_size = align(lit_buf_size, sizeof(double));
833  }
834  int16_t crt_double_arr_lit_off = lit_buf_size;
835  for (const auto& double_array_literal : double_array_literals) {
836  CHECK_LE(double_array_literal.size(),
837  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
838  lit_buf_size += double_array_literal.size() * sizeof(double);
839  }
840  if (align64_int8_array_literals.size() > 0) {
841  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
842  }
843  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
844  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
845  CHECK_LE(align64_int8_array_literals.size(),
846  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
847  lit_buf_size += align64_int8_array_literal.size();
848  }
849  if (int32_array_literals.size() > 0) {
850  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
851  }
852  int16_t crt_int32_arr_lit_off = lit_buf_size;
853  for (const auto& int32_array_literal : int32_array_literals) {
854  CHECK_LE(int32_array_literal.size(),
855  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
856  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
857  }
858  if (align32_int8_array_literals.size() > 0) {
859  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
860  }
861  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
862  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
863  CHECK_LE(align32_int8_array_literals.size(),
864  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
865  lit_buf_size += align32_int8_array_literal.size();
866  }
867  int16_t crt_int8_arr_lit_off = lit_buf_size;
868  for (const auto& int8_array_literal : int8_array_literals) {
869  CHECK_LE(int8_array_literal.size(),
870  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
871  lit_buf_size += int8_array_literal.size();
872  }
873  unsigned crt_real_str_idx = 0;
874  unsigned crt_double_arr_lit_idx = 0;
875  unsigned crt_align64_int8_arr_lit_idx = 0;
876  unsigned crt_int32_arr_lit_idx = 0;
877  unsigned crt_align32_int8_arr_lit_idx = 0;
878  unsigned crt_int8_arr_lit_idx = 0;
879  std::vector<int8_t> serialized(lit_buf_size);
880  size_t off{0};
881  for (const auto& lit : dev_literals) {
882  const auto lit_bytes = CgenState::literalBytes(lit);
883  off = CgenState::addAligned(off, lit_bytes);
884  switch (lit.which()) {
885  case 0: {
886  const auto p = boost::get<int8_t>(&lit);
887  CHECK(p);
888  serialized[off - lit_bytes] = *p;
889  break;
890  }
891  case 1: {
892  const auto p = boost::get<int16_t>(&lit);
893  CHECK(p);
894  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
895  break;
896  }
897  case 2: {
898  const auto p = boost::get<int32_t>(&lit);
899  CHECK(p);
900  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
901  break;
902  }
903  case 3: {
904  const auto p = boost::get<int64_t>(&lit);
905  CHECK(p);
906  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
907  break;
908  }
909  case 4: {
910  const auto p = boost::get<float>(&lit);
911  CHECK(p);
912  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
913  break;
914  }
915  case 5: {
916  const auto p = boost::get<double>(&lit);
917  CHECK(p);
918  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
919  break;
920  }
921  case 6: {
922  const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
923  CHECK(p);
924  const auto str_id =
926  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
927  ->getOrAddTransient(p->first)
928  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
929  ->getIdOfString(p->first);
930  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
931  break;
932  }
933  case 7: {
934  const auto p = boost::get<std::string>(&lit);
935  CHECK(p);
936  int32_t off_and_len = crt_real_str_off << 16;
937  const auto& crt_real_str = real_strings[crt_real_str_idx];
938  off_and_len |= static_cast<int16_t>(crt_real_str.size());
939  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
940  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
941  ++crt_real_str_idx;
942  crt_real_str_off += crt_real_str.size();
943  break;
944  }
945  case 8: {
946  const auto p = boost::get<std::vector<double>>(&lit);
947  CHECK(p);
948  int32_t off_and_len = crt_double_arr_lit_off << 16;
949  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
950  int32_t len = crt_double_arr_lit.size();
951  CHECK_EQ((len >> 16), 0);
952  off_and_len |= static_cast<int16_t>(len);
953  int32_t double_array_bytesize = len * sizeof(double);
954  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
955  memcpy(&serialized[crt_double_arr_lit_off],
956  crt_double_arr_lit.data(),
957  double_array_bytesize);
958  ++crt_double_arr_lit_idx;
959  crt_double_arr_lit_off += double_array_bytesize;
960  break;
961  }
962  case 9: {
963  const auto p = boost::get<std::vector<int32_t>>(&lit);
964  CHECK(p);
965  int32_t off_and_len = crt_int32_arr_lit_off << 16;
966  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
967  int32_t len = crt_int32_arr_lit.size();
968  CHECK_EQ((len >> 16), 0);
969  off_and_len |= static_cast<int16_t>(len);
970  int32_t int32_array_bytesize = len * sizeof(int32_t);
971  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
972  memcpy(&serialized[crt_int32_arr_lit_off],
973  crt_int32_arr_lit.data(),
974  int32_array_bytesize);
975  ++crt_int32_arr_lit_idx;
976  crt_int32_arr_lit_off += int32_array_bytesize;
977  break;
978  }
979  case 10: {
980  const auto p = boost::get<std::vector<int8_t>>(&lit);
981  CHECK(p);
982  int32_t off_and_len = crt_int8_arr_lit_off << 16;
983  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
984  int32_t len = crt_int8_arr_lit.size();
985  CHECK_EQ((len >> 16), 0);
986  off_and_len |= static_cast<int16_t>(len);
987  int32_t int8_array_bytesize = len;
988  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
989  memcpy(&serialized[crt_int8_arr_lit_off],
990  crt_int8_arr_lit.data(),
991  int8_array_bytesize);
992  ++crt_int8_arr_lit_idx;
993  crt_int8_arr_lit_off += int8_array_bytesize;
994  break;
995  }
996  case 11: {
997  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
998  CHECK(p);
999  if (p->second == 64) {
1000  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1001  const auto& crt_align64_int8_arr_lit =
1002  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1003  int32_t len = crt_align64_int8_arr_lit.size();
1004  CHECK_EQ((len >> 16), 0);
1005  off_and_len |= static_cast<int16_t>(len);
1006  int32_t align64_int8_array_bytesize = len;
1007  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1008  memcpy(&serialized[crt_align64_int8_arr_lit_off],
1009  crt_align64_int8_arr_lit.data(),
1010  align64_int8_array_bytesize);
1011  ++crt_align64_int8_arr_lit_idx;
1012  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1013  } else if (p->second == 32) {
1014  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1015  const auto& crt_align32_int8_arr_lit =
1016  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1017  int32_t len = crt_align32_int8_arr_lit.size();
1018  CHECK_EQ((len >> 16), 0);
1019  off_and_len |= static_cast<int16_t>(len);
1020  int32_t align32_int8_array_bytesize = len;
1021  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1022  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1023  crt_align32_int8_arr_lit.data(),
1024  align32_int8_array_bytesize);
1025  ++crt_align32_int8_arr_lit_idx;
1026  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1027  } else {
1028  CHECK(false);
1029  }
1030  break;
1031  }
1032  default:
1033  CHECK(false);
1034  }
1035  }
1036  return serialized;
1037 }
1038 
1039 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1040  if (device_type == ExecutorDeviceType::GPU) {
1041  return cudaMgr()->getDeviceCount();
1042  } else {
1043  return 1;
1044  }
1045 }
1046 
1048  const Data_Namespace::MemoryLevel memory_level) const {
1049  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1051 }
1052 
1053 // TODO(alex): remove or split
1054 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1055  const SQLTypeInfo& ti,
1056  const int64_t agg_init_val,
1057  const int8_t out_byte_width,
1058  const int64_t* out_vec,
1059  const size_t out_vec_sz,
1060  const bool is_group_by,
1061  const bool float_argument_input) {
1062  switch (agg) {
1063  case kAVG:
1064  case kSUM:
1065  case kSUM_IF:
1066  if (0 != agg_init_val) {
1067  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1068  int64_t agg_result = agg_init_val;
1069  for (size_t i = 0; i < out_vec_sz; ++i) {
1070  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1071  }
1072  return {agg_result, 0};
1073  } else {
1074  CHECK(ti.is_fp());
1075  switch (out_byte_width) {
1076  case 4: {
1077  int agg_result = static_cast<int32_t>(agg_init_val);
1078  for (size_t i = 0; i < out_vec_sz; ++i) {
1080  &agg_result,
1081  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1082  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1083  }
1084  const int64_t converted_bin =
1085  float_argument_input
1086  ? static_cast<int64_t>(agg_result)
1087  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1088  return {converted_bin, 0};
1089  break;
1090  }
1091  case 8: {
1092  int64_t agg_result = agg_init_val;
1093  for (size_t i = 0; i < out_vec_sz; ++i) {
1095  &agg_result,
1096  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1097  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1098  }
1099  return {agg_result, 0};
1100  break;
1101  }
1102  default:
1103  CHECK(false);
1104  }
1105  }
1106  }
1107  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1108  int64_t agg_result = 0;
1109  for (size_t i = 0; i < out_vec_sz; ++i) {
1110  agg_result += out_vec[i];
1111  }
1112  return {agg_result, 0};
1113  } else {
1114  CHECK(ti.is_fp());
1115  switch (out_byte_width) {
1116  case 4: {
1117  float r = 0.;
1118  for (size_t i = 0; i < out_vec_sz; ++i) {
1119  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1120  }
1121  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1122  const int64_t converted_bin =
1123  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1124  return {converted_bin, 0};
1125  }
1126  case 8: {
1127  double r = 0.;
1128  for (size_t i = 0; i < out_vec_sz; ++i) {
1129  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1130  }
1131  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1132  }
1133  default:
1134  CHECK(false);
1135  }
1136  }
1137  break;
1138  case kCOUNT:
1139  case kCOUNT_IF: {
1140  uint64_t agg_result = 0;
1141  for (size_t i = 0; i < out_vec_sz; ++i) {
1142  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1143  agg_result += out;
1144  }
1145  return {static_cast<int64_t>(agg_result), 0};
1146  }
1147  case kMIN: {
1148  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1149  int64_t agg_result = agg_init_val;
1150  for (size_t i = 0; i < out_vec_sz; ++i) {
1151  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1152  }
1153  return {agg_result, 0};
1154  } else {
1155  switch (out_byte_width) {
1156  case 4: {
1157  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1158  for (size_t i = 0; i < out_vec_sz; ++i) {
1160  &agg_result,
1161  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1162  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1163  }
1164  const int64_t converted_bin =
1165  float_argument_input
1166  ? static_cast<int64_t>(agg_result)
1167  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1168  return {converted_bin, 0};
1169  }
1170  case 8: {
1171  int64_t agg_result = agg_init_val;
1172  for (size_t i = 0; i < out_vec_sz; ++i) {
1174  &agg_result,
1175  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1176  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1177  }
1178  return {agg_result, 0};
1179  }
1180  default:
1181  CHECK(false);
1182  }
1183  }
1184  }
1185  case kMAX:
1186  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1187  int64_t agg_result = agg_init_val;
1188  for (size_t i = 0; i < out_vec_sz; ++i) {
1189  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1190  }
1191  return {agg_result, 0};
1192  } else {
1193  switch (out_byte_width) {
1194  case 4: {
1195  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1196  for (size_t i = 0; i < out_vec_sz; ++i) {
1198  &agg_result,
1199  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1200  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1201  }
1202  const int64_t converted_bin =
1203  float_argument_input ? static_cast<int64_t>(agg_result)
1204  : float_to_double_bin(agg_result, !ti.get_notnull());
1205  return {converted_bin, 0};
1206  }
1207  case 8: {
1208  int64_t agg_result = agg_init_val;
1209  for (size_t i = 0; i < out_vec_sz; ++i) {
1211  &agg_result,
1212  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1213  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1214  }
1215  return {agg_result, 0};
1216  }
1217  default:
1218  CHECK(false);
1219  }
1220  }
1221  case kSINGLE_VALUE: {
1222  int64_t agg_result = agg_init_val;
1223  for (size_t i = 0; i < out_vec_sz; ++i) {
1224  if (out_vec[i] != agg_init_val) {
1225  if (agg_result == agg_init_val) {
1226  agg_result = out_vec[i];
1227  } else if (out_vec[i] != agg_result) {
1229  }
1230  }
1231  }
1232  return {agg_result, 0};
1233  }
1234  case kSAMPLE: {
1235  int64_t agg_result = agg_init_val;
1236  for (size_t i = 0; i < out_vec_sz; ++i) {
1237  if (out_vec[i] != agg_init_val) {
1238  agg_result = out_vec[i];
1239  break;
1240  }
1241  }
1242  return {agg_result, 0};
1243  }
1244  default:
1245  UNREACHABLE() << "Unsupported SQLAgg: " << agg;
1246  }
1247  abort();
1248 }
1249 
1250 namespace {
1251 
1253  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1254  std::vector<TargetInfo> const& targets) {
1255  auto& first = results_per_device.front().first;
1256  CHECK(first);
1257  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1258  if (first_target_idx) {
1259  first->translateDictEncodedColumns(targets, *first_target_idx);
1260  }
1261  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1262  const auto& next = results_per_device[dev_idx].first;
1263  CHECK(next);
1264  if (first_target_idx) {
1265  next->translateDictEncodedColumns(targets, *first_target_idx);
1266  }
1267  first->append(*next);
1268  }
1269  return std::move(first);
1270 }
1271 
1273  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1274  return get_target_info(target_expr, g_bigint_count);
1275  }
1276 };
1277 
1278 } // namespace
1279 
1281  const RelAlgExecutionUnit& ra_exe_unit) {
1282  auto timer = DEBUG_TIMER(__func__);
1283  auto& results_per_device = shared_context.getFragmentResults();
1284  auto const targets = shared::transform<std::vector<TargetInfo>>(
1285  ra_exe_unit.target_exprs, GetTargetInfo{});
1286  if (results_per_device.empty()) {
1287  return std::make_shared<ResultSet>(targets,
1291  blockSize(),
1292  gridSize());
1293  }
1294  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1295  std::sort(results_per_device.begin(),
1296  results_per_device.end(),
1297  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1298  CHECK_GE(lhs.second.size(), size_t(1));
1299  CHECK_GE(rhs.second.size(), size_t(1));
1300  return lhs.second.front() < rhs.second.front();
1301  });
1302 
1303  return get_merged_result(results_per_device, targets);
1304 }
1305 
1307  const RelAlgExecutionUnit& ra_exe_unit,
1308  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1309  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1310  const QueryMemoryDescriptor& query_mem_desc) const {
1311  auto timer = DEBUG_TIMER(__func__);
1312  if (ra_exe_unit.estimator) {
1313  return reduce_estimator_results(ra_exe_unit, results_per_device);
1314  }
1315 
1316  if (results_per_device.empty()) {
1317  auto const targets = shared::transform<std::vector<TargetInfo>>(
1318  ra_exe_unit.target_exprs, GetTargetInfo{});
1319  return std::make_shared<ResultSet>(targets,
1322  nullptr,
1323  blockSize(),
1324  gridSize());
1325  }
1326 
1328  results_per_device,
1329  row_set_mem_owner,
1330  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1331 }
1332 
1333 namespace {
1334 
1336  const size_t executor_id,
1337  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1338  int64_t* compilation_queue_time) {
1339  auto clock_begin = timer_start();
1340  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1341  *compilation_queue_time = timer_stop(clock_begin);
1342  const auto& this_result_set = results_per_device[0].first;
1343  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1344  this_result_set->getTargetInfos(),
1345  this_result_set->getTargetInitVals(),
1346  executor_id);
1347  return reduction_jit.codegen();
1348 };
1349 
1350 } // namespace
1351 
1353  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1354  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1355  const QueryMemoryDescriptor& query_mem_desc) const {
1356  auto timer = DEBUG_TIMER(__func__);
1357  std::shared_ptr<ResultSet> reduced_results;
1358 
1359  const auto& first = results_per_device.front().first;
1360 
1361  if (query_mem_desc.getQueryDescriptionType() ==
1363  results_per_device.size() > 1) {
1364  const auto total_entry_count = std::accumulate(
1365  results_per_device.begin(),
1366  results_per_device.end(),
1367  size_t(0),
1368  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1369  const auto& r = rs.first;
1370  return init + r->getQueryMemDesc().getEntryCount();
1371  });
1372  CHECK(total_entry_count);
1373  auto query_mem_desc = first->getQueryMemDesc();
1374  query_mem_desc.setEntryCount(total_entry_count);
1375  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1378  row_set_mem_owner,
1379  blockSize(),
1380  gridSize());
1381  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1382  reduced_results->initializeStorage();
1383  switch (query_mem_desc.getEffectiveKeyWidth()) {
1384  case 4:
1385  first->getStorage()->moveEntriesToBuffer<int32_t>(
1386  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1387  break;
1388  case 8:
1389  first->getStorage()->moveEntriesToBuffer<int64_t>(
1390  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1391  break;
1392  default:
1393  CHECK(false);
1394  }
1395  } else {
1396  reduced_results = first;
1397  }
1398 
1399  int64_t compilation_queue_time = 0;
1400  const auto reduction_code =
1401  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1402 
1403  for (size_t i = 1; i < results_per_device.size(); ++i) {
1404  reduced_results->getStorage()->reduce(
1405  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1406  }
1407  reduced_results->addCompilationQueueTime(compilation_queue_time);
1408  reduced_results->invalidateCachedRowCount();
1409  return reduced_results;
1410 }
1411 
1413  const RelAlgExecutionUnit& ra_exe_unit,
1414  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1415  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1416  const QueryMemoryDescriptor& query_mem_desc) const {
1417  if (results_per_device.size() == 1) {
1418  return std::move(results_per_device.front().first);
1419  }
1420  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1422  for (const auto& result : results_per_device) {
1423  auto rows = result.first;
1424  CHECK(rows);
1425  if (!rows) {
1426  continue;
1427  }
1428  SpeculativeTopNMap that(
1429  *rows,
1430  ra_exe_unit.target_exprs,
1431  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1432  m.reduce(that);
1433  }
1434  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1435  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1436  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1437 }
1438 
1439 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1440  CHECK(data_mgr);
1441  std::unordered_set<int> available_gpus;
1442  if (data_mgr->gpusPresent()) {
1443  CHECK(data_mgr->getCudaMgr());
1444  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1445  CHECK_GT(gpu_count, 0);
1446  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1447  available_gpus.insert(gpu_id);
1448  }
1449  }
1450  return available_gpus;
1451 }
1452 
1453 size_t get_context_count(const ExecutorDeviceType device_type,
1454  const size_t cpu_count,
1455  const size_t gpu_count) {
1456  return device_type == ExecutorDeviceType::GPU ? gpu_count
1457  : static_cast<size_t>(cpu_count);
1458 }
1459 
1460 namespace {
1461 
1462 // Compute a very conservative entry count for the output buffer entry count using no
1463 // other information than the number of tuples in each table and multiplying them
1464 // together.
1465 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1467  // Check for overflows since we're multiplying potentially big table sizes.
1468  using checked_size_t = boost::multiprecision::number<
1469  boost::multiprecision::cpp_int_backend<64,
1470  64,
1471  boost::multiprecision::unsigned_magnitude,
1472  boost::multiprecision::checked,
1473  void>>;
1474  checked_size_t max_groups_buffer_entry_guess = 1;
1475  for (const auto& query_info : query_infos) {
1476  CHECK(!query_info.info.fragments.empty());
1477  auto it = std::max_element(query_info.info.fragments.begin(),
1478  query_info.info.fragments.end(),
1479  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1480  return f1.getNumTuples() < f2.getNumTuples();
1481  });
1482  max_groups_buffer_entry_guess *= it->getNumTuples();
1483  }
1484  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1485  // baseline group layout with that many entries anyway.
1486  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1487  try {
1488  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1489  max_groups_buffer_entry_guess_cap);
1490  } catch (...) {
1491  return max_groups_buffer_entry_guess_cap;
1492  }
1493 }
1494 
1495 std::string get_table_name(const InputDescriptor& input_desc) {
1496  const auto source_type = input_desc.getSourceType();
1497  if (source_type == InputSourceType::TABLE) {
1498  const auto& table_key = input_desc.getTableKey();
1499  CHECK_GT(table_key.table_id, 0);
1500  const auto td = Catalog_Namespace::get_metadata_for_table(table_key);
1501  CHECK(td);
1502  return td->tableName;
1503  } else {
1504  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableKey().table_id);
1505  }
1506 }
1507 
1508 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1509  const int device_count) {
1510  if (device_type == ExecutorDeviceType::GPU) {
1511  return device_count * Executor::high_scan_limit;
1512  }
1514 }
1515 
1517  const std::vector<InputTableInfo>& table_infos,
1518  const ExecutorDeviceType device_type,
1519  const int device_count) {
1520  for (const auto target_expr : ra_exe_unit.target_exprs) {
1521  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1522  return;
1523  }
1524  }
1525  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1526  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1527  // Allow a query with no scan limit to run on small tables
1528  return;
1529  }
1530  if (ra_exe_unit.use_bump_allocator) {
1531  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1532  // relative to the size of the input), so we bypass this check for now
1533  return;
1534  }
1535  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1536  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1537  (!ra_exe_unit.scan_limit ||
1538  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1539  std::vector<std::string> table_names;
1540  const auto& input_descs = ra_exe_unit.input_descs;
1541  for (const auto& input_desc : input_descs) {
1542  table_names.push_back(get_table_name(input_desc));
1543  }
1544  if (!ra_exe_unit.scan_limit) {
1545  throw WatchdogException(
1546  "Projection query would require a scan without a limit on table(s): " +
1547  boost::algorithm::join(table_names, ", "));
1548  } else {
1549  throw WatchdogException(
1550  "Projection query output result set on table(s): " +
1551  boost::algorithm::join(table_names, ", ") + " would contain " +
1552  std::to_string(ra_exe_unit.scan_limit) +
1553  " rows, which is more than the current system limit of " +
1554  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1555  }
1556  }
1557 }
1558 
1559 } // namespace
1560 
1561 size_t get_loop_join_size(const std::vector<InputTableInfo>& query_infos,
1562  const RelAlgExecutionUnit& ra_exe_unit) {
1563  const auto inner_table_key = ra_exe_unit.input_descs.back().getTableKey();
1564 
1565  std::optional<size_t> inner_table_idx;
1566  for (size_t i = 0; i < query_infos.size(); ++i) {
1567  if (query_infos[i].table_key == inner_table_key) {
1568  inner_table_idx = i;
1569  break;
1570  }
1571  }
1572  CHECK(inner_table_idx);
1573  return query_infos[*inner_table_idx].info.getNumTuples();
1574 }
1575 
1576 namespace {
1577 
1578 template <typename T>
1579 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1580  std::vector<std::string> expr_strs;
1581  for (const auto& expr : expr_container) {
1582  if (!expr) {
1583  expr_strs.emplace_back("NULL");
1584  } else {
1585  expr_strs.emplace_back(expr->toString());
1586  }
1587  }
1588  return expr_strs;
1589 }
1590 
1591 template <>
1592 std::vector<std::string> expr_container_to_string(
1593  const std::list<Analyzer::OrderEntry>& expr_container) {
1594  std::vector<std::string> expr_strs;
1595  for (const auto& expr : expr_container) {
1596  expr_strs.emplace_back(expr.toString());
1597  }
1598  return expr_strs;
1599 }
1600 
1601 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1602  switch (algorithm) {
1604  return "ResultSet";
1606  return "Speculative Top N";
1608  return "Streaming Top N";
1609  }
1610  UNREACHABLE();
1611  return "";
1612 }
1613 
1614 } // namespace
1615 
1616 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1617  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1618  // instead of ra_exec_unit description if possible
1619  std::ostringstream os;
1620  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1621  const auto& scan_desc = input_col_desc->getScanDesc();
1622  os << scan_desc.getTableKey() << "," << input_col_desc->getColId() << ","
1623  << scan_desc.getNestLevel();
1624  }
1625  if (!ra_exe_unit.simple_quals.empty()) {
1626  for (const auto& qual : ra_exe_unit.simple_quals) {
1627  if (qual) {
1628  os << qual->toString() << ",";
1629  }
1630  }
1631  }
1632  if (!ra_exe_unit.quals.empty()) {
1633  for (const auto& qual : ra_exe_unit.quals) {
1634  if (qual) {
1635  os << qual->toString() << ",";
1636  }
1637  }
1638  }
1639  if (!ra_exe_unit.join_quals.empty()) {
1640  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1641  const auto& join_condition = ra_exe_unit.join_quals[i];
1642  os << std::to_string(i) << ::toString(join_condition.type);
1643  for (const auto& qual : join_condition.quals) {
1644  if (qual) {
1645  os << qual->toString() << ",";
1646  }
1647  }
1648  }
1649  }
1650  if (!ra_exe_unit.groupby_exprs.empty()) {
1651  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1652  if (qual) {
1653  os << qual->toString() << ",";
1654  }
1655  }
1656  }
1657  for (const auto& expr : ra_exe_unit.target_exprs) {
1658  if (expr) {
1659  os << expr->toString() << ",";
1660  }
1661  }
1662  os << ::toString(ra_exe_unit.estimator == nullptr);
1663  os << std::to_string(ra_exe_unit.scan_limit);
1664  return os.str();
1665 }
1666 
1667 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1668  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
1669  os << "\n\tTable/Col/Levels: ";
1670  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1671  const auto& scan_desc = input_col_desc->getScanDesc();
1672  os << "(" << scan_desc.getTableKey() << ", " << input_col_desc->getColId() << ", "
1673  << scan_desc.getNestLevel() << ") ";
1674  }
1675  if (!ra_exe_unit.simple_quals.empty()) {
1676  os << "\n\tSimple Quals: "
1678  ", ");
1679  }
1680  if (!ra_exe_unit.quals.empty()) {
1681  os << "\n\tQuals: "
1682  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1683  }
1684  if (!ra_exe_unit.join_quals.empty()) {
1685  os << "\n\tJoin Quals: ";
1686  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1687  const auto& join_condition = ra_exe_unit.join_quals[i];
1688  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1689  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1690  }
1691  }
1692  if (!ra_exe_unit.groupby_exprs.empty()) {
1693  os << "\n\tGroup By: "
1695  ", ");
1696  }
1697  os << "\n\tProjected targets: "
1699  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1700  os << "\n\tSort Info: ";
1701  const auto& sort_info = ra_exe_unit.sort_info;
1702  os << "\n\t Order Entries: "
1703  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1704  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1705  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1706  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1707  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1708  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1709  if (ra_exe_unit.union_all) {
1710  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1711  }
1712  return os;
1713 }
1714 
1715 namespace {
1716 
1718  const size_t new_scan_limit) {
1719  return {ra_exe_unit_in.input_descs,
1720  ra_exe_unit_in.input_col_descs,
1721  ra_exe_unit_in.simple_quals,
1722  ra_exe_unit_in.quals,
1723  ra_exe_unit_in.join_quals,
1724  ra_exe_unit_in.groupby_exprs,
1725  ra_exe_unit_in.target_exprs,
1726  ra_exe_unit_in.target_exprs_original_type_infos,
1727  ra_exe_unit_in.estimator,
1728  ra_exe_unit_in.sort_info,
1729  new_scan_limit,
1730  ra_exe_unit_in.query_hint,
1731  ra_exe_unit_in.query_plan_dag_hash,
1732  ra_exe_unit_in.hash_table_build_plan_dag,
1733  ra_exe_unit_in.table_id_to_node_map,
1734  ra_exe_unit_in.use_bump_allocator,
1735  ra_exe_unit_in.union_all,
1736  ra_exe_unit_in.query_state};
1737 }
1738 
1739 } // namespace
1740 
1741 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1742  const bool is_agg,
1743  const std::vector<InputTableInfo>& query_infos,
1744  const RelAlgExecutionUnit& ra_exe_unit_in,
1745  const CompilationOptions& co,
1746  const ExecutionOptions& eo,
1747  RenderInfo* render_info,
1748  const bool has_cardinality_estimation,
1749  ColumnCacheMap& column_cache) {
1750  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1751 
1752  ScopeGuard cleanup_post_execution = [this] {
1753  // cleanup/unpin GPU buffer allocations
1754  // TODO: separate out this state into a single object
1755  plan_state_.reset(nullptr);
1756  if (cgen_state_) {
1757  cgen_state_->in_values_bitmaps_.clear();
1758  cgen_state_->str_dict_translation_mgrs_.clear();
1759  }
1760  };
1761 
1762  try {
1763  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1764  is_agg,
1765  true,
1766  query_infos,
1767  ra_exe_unit_in,
1768  co,
1769  eo,
1771  render_info,
1772  has_cardinality_estimation,
1773  column_cache);
1774  if (result) {
1775  result->setKernelQueueTime(kernel_queue_time_ms_);
1776  result->addCompilationQueueTime(compilation_queue_time_ms_);
1777  if (eo.just_validate) {
1778  result->setValidationOnlyRes();
1779  }
1780  }
1781  return result;
1782  } catch (const CompilationRetryNewScanLimit& e) {
1783  auto result =
1784  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1785  is_agg,
1786  false,
1787  query_infos,
1788  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1789  co,
1790  eo,
1792  render_info,
1793  has_cardinality_estimation,
1794  column_cache);
1795  if (result) {
1796  result->setKernelQueueTime(kernel_queue_time_ms_);
1797  result->addCompilationQueueTime(compilation_queue_time_ms_);
1798  if (eo.just_validate) {
1799  result->setValidationOnlyRes();
1800  }
1801  }
1802  return result;
1803  }
1804 }
1805 
1807  size_t& max_groups_buffer_entry_guess,
1808  const bool is_agg,
1809  const bool allow_single_frag_table_opt,
1810  const std::vector<InputTableInfo>& query_infos,
1811  const RelAlgExecutionUnit& ra_exe_unit_in,
1812  const CompilationOptions& co,
1813  const ExecutionOptions& eo,
1814  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1815  RenderInfo* render_info,
1816  const bool has_cardinality_estimation,
1817  ColumnCacheMap& column_cache) {
1818  INJECT_TIMER(Exec_executeWorkUnit);
1819  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1820  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1821  CHECK(!query_infos.empty());
1822  if (!max_groups_buffer_entry_guess) {
1823  // The query has failed the first execution attempt because of running out
1824  // of group by slots. Make the conservative choice: allocate fragment size
1825  // slots and run on the CPU.
1826  CHECK(device_type == ExecutorDeviceType::CPU);
1827  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1828  }
1829 
1830  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1831  do {
1832  SharedKernelContext shared_context(query_infos);
1833  ColumnFetcher column_fetcher(this, column_cache);
1834  ScopeGuard scope_guard = [&column_fetcher] {
1835  column_fetcher.freeLinearizedBuf();
1836  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1837  };
1838  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1839  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1840 
1841  if (eo.executor_type == ExecutorType::Native) {
1842  try {
1843  INJECT_TIMER(query_step_compilation);
1844  query_mem_desc_owned =
1845  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1846  crt_min_byte_width,
1847  has_cardinality_estimation,
1848  ra_exe_unit,
1849  query_infos,
1850  deleted_cols_map,
1851  column_fetcher,
1852  {device_type,
1853  co.hoist_literals,
1854  co.opt_level,
1856  co.allow_lazy_fetch,
1858  co.explain_type,
1860  eo,
1861  render_info,
1862  this);
1863  CHECK(query_mem_desc_owned);
1864  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1865  } catch (CompilationRetryNoCompaction&) {
1866  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1867  continue;
1868  }
1869  } else {
1870  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1871  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1872  CHECK(!query_mem_desc_owned);
1873  query_mem_desc_owned.reset(
1875  }
1876  if (eo.just_explain) {
1877  return executeExplain(*query_comp_desc_owned);
1878  }
1879 
1880  for (const auto target_expr : ra_exe_unit.target_exprs) {
1881  plan_state_->target_exprs_.push_back(target_expr);
1882  }
1883 
1884  if (!eo.just_validate) {
1885  int available_cpus = cpu_threads();
1886  auto available_gpus = get_available_gpus(data_mgr_);
1887 
1888  const auto context_count =
1889  get_context_count(device_type, available_cpus, available_gpus.size());
1890  try {
1891  auto kernels = createKernels(shared_context,
1892  ra_exe_unit,
1893  column_fetcher,
1894  query_infos,
1895  eo,
1896  is_agg,
1897  allow_single_frag_table_opt,
1898  context_count,
1899  *query_comp_desc_owned,
1900  *query_mem_desc_owned,
1901  render_info,
1902  available_gpus,
1903  available_cpus);
1904  launchKernels(
1905  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1906  } catch (QueryExecutionError& e) {
1907  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1908  e.getErrorCode() == ERR_OUT_OF_TIME) {
1910  }
1911  if (e.getErrorCode() == ERR_INTERRUPTED) {
1913  }
1915  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1916  crt_min_byte_width <<= 1;
1917  continue;
1918  }
1919  throw;
1920  }
1921  }
1922  if (is_agg) {
1923  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1924  // update query status to let user know we are now in the reduction phase
1925  std::string curRunningSession{""};
1926  std::string curRunningQuerySubmittedTime{""};
1927  bool sessionEnrolled = false;
1928  {
1931  curRunningSession = getCurrentQuerySession(session_read_lock);
1932  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1933  sessionEnrolled =
1934  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1935  }
1936  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1937  sessionEnrolled) {
1938  updateQuerySessionStatus(curRunningSession,
1939  curRunningQuerySubmittedTime,
1941  }
1942  }
1943  try {
1944  return collectAllDeviceResults(shared_context,
1945  ra_exe_unit,
1946  *query_mem_desc_owned,
1947  query_comp_desc_owned->getDeviceType(),
1948  row_set_mem_owner);
1949  } catch (ReductionRanOutOfSlots&) {
1951  } catch (OverflowOrUnderflow&) {
1952  crt_min_byte_width <<= 1;
1953  continue;
1954  } catch (QueryExecutionError& e) {
1955  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1956  << ", what(): " << e.what();
1957  throw QueryExecutionError(e.getErrorCode());
1958  }
1959  }
1960  return resultsUnion(shared_context, ra_exe_unit);
1961 
1962  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1963 
1964  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1967  nullptr,
1968  blockSize(),
1969  gridSize());
1970 }
1971 
1973  const RelAlgExecutionUnit& ra_exe_unit_in,
1974  const InputTableInfo& table_info,
1975  const CompilationOptions& co,
1976  const ExecutionOptions& eo,
1978  PerFragmentCallBack& cb,
1979  const std::set<size_t>& fragment_indexes_param) {
1980  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1981  ColumnCacheMap column_cache;
1982 
1983  std::vector<InputTableInfo> table_infos{table_info};
1984  SharedKernelContext kernel_context(table_infos);
1985 
1986  ColumnFetcher column_fetcher(this, column_cache);
1987  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1988  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1989  {
1990  query_mem_desc_owned =
1991  query_comp_desc_owned->compile(0,
1992  8,
1993  /*has_cardinality_estimation=*/false,
1994  ra_exe_unit,
1995  table_infos,
1996  deleted_cols_map,
1997  column_fetcher,
1998  co,
1999  eo,
2000  nullptr,
2001  this);
2002  }
2003  CHECK(query_mem_desc_owned);
2004  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2005  const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2006  const auto& outer_fragments = table_info.info.fragments;
2007 
2008  std::set<size_t> fragment_indexes;
2009  if (fragment_indexes_param.empty()) {
2010  // An empty `fragment_indexes_param` set implies executing
2011  // the query for all fragments in the table. In this
2012  // case, populate `fragment_indexes` with all fragment indexes.
2013  for (size_t i = 0; i < outer_fragments.size(); i++) {
2014  fragment_indexes.emplace(i);
2015  }
2016  } else {
2017  fragment_indexes = fragment_indexes_param;
2018  }
2019 
2020  {
2021  auto clock_begin = timer_start();
2022  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2023  kernel_queue_time_ms_ += timer_stop(clock_begin);
2024 
2025  for (auto fragment_index : fragment_indexes) {
2026  // We may want to consider in the future allowing this to execute on devices other
2027  // than CPU
2028  FragmentsList fragments_list{{table_key, {fragment_index}}};
2029  ExecutionKernel kernel(ra_exe_unit,
2030  co.device_type,
2031  /*device_id=*/0,
2032  eo,
2033  column_fetcher,
2034  *query_comp_desc_owned,
2035  *query_mem_desc_owned,
2036  fragments_list,
2038  /*render_info=*/nullptr,
2039  /*rowid_lookup_key=*/-1);
2040  kernel.run(this, 0, kernel_context);
2041  }
2042  }
2043 
2044  const auto& all_fragment_results = kernel_context.getFragmentResults();
2045 
2046  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2047  CHECK_EQ(result_fragment_indexes.size(), 1);
2048  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2049  }
2050 }
2051 
2053  const TableFunctionExecutionUnit exe_unit,
2054  const std::vector<InputTableInfo>& table_infos,
2055  const CompilationOptions& co,
2056  const ExecutionOptions& eo) {
2057  INJECT_TIMER(Exec_executeTableFunction);
2058  if (eo.just_validate) {
2060  /*entry_count=*/0,
2062  /*is_table_function=*/true);
2063  query_mem_desc.setOutputColumnar(true);
2064  return std::make_shared<ResultSet>(
2065  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2066  co.device_type,
2067  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2068  this->getRowSetMemoryOwner(),
2069  this->blockSize(),
2070  this->gridSize());
2071  }
2072 
2073  // Avoid compile functions that set the sizer at runtime if the device is GPU
2074  // This should be fixed in the python script as well to minimize the number of
2075  // QueryMustRunOnCpu exceptions
2078  throw QueryMustRunOnCpu();
2079  }
2080 
2081  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2082  // framework, we may want to move this up a level
2083 
2084  ColumnFetcher column_fetcher(this, column_cache);
2086 
2087  if (exe_unit.table_func.containsPreFlightFn()) {
2088  std::shared_ptr<CompilationContext> compilation_context;
2089  {
2090  Executor::CgenStateManager cgenstate_manager(*this,
2091  false,
2092  table_infos,
2094  nullptr); // locks compilation_mutex
2096  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2097  compilation_context =
2098  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2099  }
2100  exe_context.execute(exe_unit,
2101  table_infos,
2102  compilation_context,
2103  column_fetcher,
2105  this,
2106  true /* is_pre_launch_udtf */);
2107  }
2108  std::shared_ptr<CompilationContext> compilation_context;
2109  {
2110  Executor::CgenStateManager cgenstate_manager(*this,
2111  false,
2112  table_infos,
2114  nullptr); // locks compilation_mutex
2115  TableFunctionCompilationContext tf_compilation_context(this, co);
2116  compilation_context =
2117  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2118  }
2119  return exe_context.execute(exe_unit,
2120  table_infos,
2121  compilation_context,
2122  column_fetcher,
2123  co.device_type,
2124  this,
2125  false /* is_pre_launch_udtf */);
2126 }
2127 
2129  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2130 }
2131 
2133  const RelAlgExecutionUnit& ra_exe_unit,
2134  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2135  TransientDictIdVisitor dict_id_visitor;
2136 
2137  auto visit_expr =
2138  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2139  if (!expr) {
2140  return;
2141  }
2142  const auto& dict_key = dict_id_visitor.visit(expr);
2143  if (dict_key.dict_id >= 0) {
2144  auto sdp = getStringDictionaryProxy(dict_key, row_set_mem_owner, true);
2145  CHECK(sdp);
2146  TransientStringLiteralsVisitor visitor(sdp, this);
2147  visitor.visit(expr);
2148  }
2149  };
2150 
2151  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2152  visit_expr(group_expr.get());
2153  }
2154 
2155  for (const auto& group_expr : ra_exe_unit.quals) {
2156  visit_expr(group_expr.get());
2157  }
2158 
2159  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2160  visit_expr(group_expr.get());
2161  }
2162 
2163  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2164  const auto& target_type = target_expr->get_type_info();
2165  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2166  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2167  if (agg_expr) {
2168  // The following agg types require taking into account transient string values
2169  if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() == kSINGLE_VALUE ||
2170  agg_expr->get_aggtype() == kSAMPLE || agg_expr->get_aggtype() == kMODE) {
2171  visit_expr(agg_expr->get_arg());
2172  }
2173  } else {
2174  visit_expr(target_expr);
2175  }
2176  }
2177  };
2178  const auto& target_exprs = ra_exe_unit.target_exprs;
2179  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2180  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2181  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2182 }
2183 
2185  const RelAlgExecutionUnit& ra_exe_unit,
2186  const ExecutorDeviceType requested_device_type) {
2187  for (const auto target_expr : ra_exe_unit.target_exprs) {
2188  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2189  if (!ra_exe_unit.groupby_exprs.empty() &&
2190  !isArchPascalOrLater(requested_device_type)) {
2191  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM ||
2192  agg_info.agg_kind == kSUM_IF) &&
2193  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2194  return ExecutorDeviceType::CPU;
2195  }
2196  }
2197  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2198  return ExecutorDeviceType::CPU;
2199  }
2200  }
2201  return requested_device_type;
2202 }
2203 
2204 namespace {
2205 
2206 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2207  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2208  if (ti.is_fp()) {
2209  if (float_argument_input && ti.get_type() == kFLOAT) {
2210  int64_t float_null_val = 0;
2211  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2212  static_cast<float>(inline_fp_null_val(ti));
2213  return float_null_val;
2214  }
2215  const auto double_null_val = inline_fp_null_val(ti);
2216  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2217  }
2218  return inline_int_null_val(ti);
2219 }
2220 
2221 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2222  std::vector<int64_t>& entry,
2223  const std::vector<Analyzer::Expr*>& target_exprs,
2225  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2226  const auto target_expr = target_exprs[target_idx];
2227  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2228  CHECK(agg_info.is_agg);
2229  target_infos.push_back(agg_info);
2230  if (g_cluster) {
2231  const auto executor = query_mem_desc.getExecutor();
2232  CHECK(executor);
2233  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2234  CHECK(row_set_mem_owner);
2235  const auto& count_distinct_desc =
2236  query_mem_desc.getCountDistinctDescriptor(target_idx);
2237  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2238  CHECK(row_set_mem_owner);
2239  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2240  count_distinct_desc.bitmapPaddedSizeBytes(),
2241  /*thread_idx=*/0); // TODO: can we detect thread idx here?
2242  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2243  continue;
2244  }
2245  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2246  auto count_distinct_set = new CountDistinctSet();
2247  CHECK(row_set_mem_owner);
2248  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2249  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2250  continue;
2251  }
2252  }
2253  const bool float_argument_input = takes_float_argument(agg_info);
2254  if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2255  entry.push_back(0);
2256  } else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2257  entry.push_back(0);
2258  entry.push_back(0);
2259  } else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2260  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2261  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2262  entry.push_back(0);
2263  }
2264  } else if (agg_info.sql_type.is_varlen()) {
2265  entry.push_back(0);
2266  entry.push_back(0);
2267  } else {
2268  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2269  }
2270  } else {
2271  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2272  }
2273  }
2274 }
2275 
2277  const std::vector<Analyzer::Expr*>& target_exprs_in,
2279  const ExecutorDeviceType device_type) {
2280  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2281  std::vector<Analyzer::Expr*> target_exprs;
2282  for (const auto target_expr : target_exprs_in) {
2283  const auto target_expr_copy =
2284  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2285  CHECK(target_expr_copy);
2286  auto ti = target_expr->get_type_info();
2287  ti.set_notnull(false);
2288  target_expr_copy->set_type_info(ti);
2289  if (target_expr_copy->get_arg()) {
2290  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2291  arg_ti.set_notnull(false);
2292  target_expr_copy->get_arg()->set_type_info(arg_ti);
2293  }
2294  target_exprs_owned_copies.push_back(target_expr_copy);
2295  target_exprs.push_back(target_expr_copy.get());
2296  }
2297  std::vector<TargetInfo> target_infos;
2298  std::vector<int64_t> entry;
2299  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2300  const auto executor = query_mem_desc.getExecutor();
2301  CHECK(executor);
2302  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2303  CHECK(row_set_mem_owner);
2304  auto rs = std::make_shared<ResultSet>(target_infos,
2305  device_type,
2307  row_set_mem_owner,
2308  executor->blockSize(),
2309  executor->gridSize());
2310  rs->allocateStorage();
2311  rs->fillOneEntry(entry);
2312  return rs;
2313 }
2314 
2315 } // namespace
2316 
2318  SharedKernelContext& shared_context,
2319  const RelAlgExecutionUnit& ra_exe_unit,
2321  const ExecutorDeviceType device_type,
2322  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2323  auto timer = DEBUG_TIMER(__func__);
2324  auto& result_per_device = shared_context.getFragmentResults();
2325  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2328  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2329  }
2330  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2331  try {
2332  return reduceSpeculativeTopN(
2333  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2334  } catch (const std::bad_alloc&) {
2335  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2336  }
2337  }
2338  const auto shard_count =
2339  device_type == ExecutorDeviceType::GPU
2341  : 0;
2342 
2343  if (shard_count && !result_per_device.empty()) {
2344  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
2345  }
2346  return reduceMultiDeviceResults(
2347  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2348 }
2349 
2350 namespace {
2361 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2362  const QueryMemoryDescriptor& input_query_mem_desc,
2363  const ResultSetStorage* output_storage,
2364  size_t output_row_index,
2365  const QueryMemoryDescriptor& output_query_mem_desc,
2366  const std::vector<uint32_t>& top_permutation) {
2367  const auto output_buffer = output_storage->getUnderlyingBuffer();
2368  const auto input_buffer = input_storage->getUnderlyingBuffer();
2369  for (const auto sorted_idx : top_permutation) {
2370  // permuting all group-columns in this result set into the final buffer:
2371  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2372  group_idx++) {
2373  const auto input_column_ptr =
2374  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2375  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2376  const auto output_column_ptr =
2377  output_buffer +
2378  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2379  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2380  memcpy(output_column_ptr,
2381  input_column_ptr,
2382  output_query_mem_desc.groupColWidth(group_idx));
2383  }
2384  // permuting all agg-columns in this result set into the final buffer:
2385  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2386  slot_idx++) {
2387  const auto input_column_ptr =
2388  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2389  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2390  const auto output_column_ptr =
2391  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2392  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2393  memcpy(output_column_ptr,
2394  input_column_ptr,
2395  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2396  }
2397  ++output_row_index;
2398  }
2399  return output_row_index;
2400 }
2401 
2411 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2412  const ResultSetStorage* output_storage,
2413  size_t output_row_index,
2414  const QueryMemoryDescriptor& output_query_mem_desc,
2415  const std::vector<uint32_t>& top_permutation) {
2416  const auto output_buffer = output_storage->getUnderlyingBuffer();
2417  const auto input_buffer = input_storage->getUnderlyingBuffer();
2418  for (const auto sorted_idx : top_permutation) {
2419  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2420  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2421  row_ptr,
2422  output_query_mem_desc.getRowSize());
2423  ++output_row_index;
2424  }
2425  return output_row_index;
2426 }
2427 } // namespace
2428 
2429 // Collect top results from each device, stitch them together and sort. Partial
2430 // results from each device are guaranteed to be disjunct because we only go on
2431 // this path when one of the columns involved is a shard key.
2433  SharedKernelContext& shared_context,
2434  const RelAlgExecutionUnit& ra_exe_unit) const {
2435  auto& result_per_device = shared_context.getFragmentResults();
2436  const auto first_result_set = result_per_device.front().first;
2437  CHECK(first_result_set);
2438  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2439  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2440  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2441  top_query_mem_desc.setEntryCount(0);
2442  for (auto& result : result_per_device) {
2443  const auto result_set = result.first;
2444  CHECK(result_set);
2445  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2446  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2447  top_query_mem_desc.setEntryCount(new_entry_cnt);
2448  }
2449  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2450  first_result_set->getDeviceType(),
2451  top_query_mem_desc,
2452  first_result_set->getRowSetMemOwner(),
2453  blockSize(),
2454  gridSize());
2455  auto top_storage = top_result_set->allocateStorage();
2456  size_t top_output_row_idx{0};
2457  for (auto& result : result_per_device) {
2458  const auto result_set = result.first;
2459  CHECK(result_set);
2460  const auto& top_permutation = result_set->getPermutationBuffer();
2461  CHECK_LE(top_permutation.size(), top_n);
2462  if (top_query_mem_desc.didOutputColumnar()) {
2463  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2464  result_set->getQueryMemDesc(),
2465  top_storage,
2466  top_output_row_idx,
2467  top_query_mem_desc,
2468  top_permutation);
2469  } else {
2470  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2471  top_storage,
2472  top_output_row_idx,
2473  top_query_mem_desc,
2474  top_permutation);
2475  }
2476  }
2477  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2478  return top_result_set;
2479 }
2480 
2481 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2483  std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2484  const auto& join_info = plan_state_->join_info_;
2485  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2486  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2487  const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2488  id_to_cond.insert(
2489  std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2490  }
2491  return id_to_cond;
2492 }
2493 
2494 namespace {
2495 
2496 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2497  for (const auto& col : fetched_cols) {
2498  if (col.is_lazily_fetched) {
2499  return true;
2500  }
2501  }
2502  return false;
2503 }
2504 
2505 } // namespace
2506 
2507 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2508  SharedKernelContext& shared_context,
2509  const RelAlgExecutionUnit& ra_exe_unit,
2510  ColumnFetcher& column_fetcher,
2511  const std::vector<InputTableInfo>& table_infos,
2512  const ExecutionOptions& eo,
2513  const bool is_agg,
2514  const bool allow_single_frag_table_opt,
2515  const size_t context_count,
2516  const QueryCompilationDescriptor& query_comp_desc,
2518  RenderInfo* render_info,
2519  std::unordered_set<int>& available_gpus,
2520  int& available_cpus) {
2521  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2522 
2523  QueryFragmentDescriptor fragment_descriptor(
2524  ra_exe_unit,
2525  table_infos,
2526  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2528  : std::vector<Data_Namespace::MemoryInfo>{},
2531  CHECK(!ra_exe_unit.input_descs.empty());
2532 
2533  const auto device_type = query_comp_desc.getDeviceType();
2534  const bool uses_lazy_fetch =
2535  plan_state_->allow_lazy_fetch_ &&
2537  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2538  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2539  const auto device_count = deviceCount(device_type);
2540  CHECK_GT(device_count, 0);
2541 
2542  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2543  shared_context.getFragOffsets(),
2544  device_count,
2545  device_type,
2546  use_multifrag_kernel,
2548  this);
2549  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2550  checkWorkUnitWatchdog(ra_exe_unit, table_infos, device_type, device_count);
2551  }
2552 
2553  if (use_multifrag_kernel) {
2554  VLOG(1) << "Creating multifrag execution kernels";
2555  VLOG(1) << query_mem_desc.toString();
2556 
2557  // NB: We should never be on this path when the query is retried because of running
2558  // out of group by slots; also, for scan only queries on CPU we want the
2559  // high-granularity, fragment by fragment execution instead. For scan only queries on
2560  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2561  // buffer per fragment.
2562  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2563  &execution_kernels,
2564  &column_fetcher,
2565  &eo,
2566  &query_comp_desc,
2567  &query_mem_desc,
2568  render_info](const int device_id,
2569  const FragmentsList& frag_list,
2570  const int64_t rowid_lookup_key) {
2571  execution_kernels.emplace_back(
2572  std::make_unique<ExecutionKernel>(ra_exe_unit,
2574  device_id,
2575  eo,
2576  column_fetcher,
2577  query_comp_desc,
2578  query_mem_desc,
2579  frag_list,
2581  render_info,
2582  rowid_lookup_key));
2583  };
2584  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2585  } else {
2586  VLOG(1) << "Creating one execution kernel per fragment";
2587  VLOG(1) << query_mem_desc.toString();
2588 
2589  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2591  table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2592  const auto max_frag_size =
2593  table_infos.front().info.getFragmentNumTuplesUpperBound();
2594  if (max_frag_size < query_mem_desc.getEntryCount()) {
2595  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2596  << " to match max fragment size " << max_frag_size
2597  << " for kernel per fragment execution path.";
2598  throw CompilationRetryNewScanLimit(max_frag_size);
2599  }
2600  }
2601 
2602  size_t frag_list_idx{0};
2603  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2604  &execution_kernels,
2605  &column_fetcher,
2606  &eo,
2607  &frag_list_idx,
2608  &device_type,
2609  &query_comp_desc,
2610  &query_mem_desc,
2611  render_info](const int device_id,
2612  const FragmentsList& frag_list,
2613  const int64_t rowid_lookup_key) {
2614  if (!frag_list.size()) {
2615  return;
2616  }
2617  CHECK_GE(device_id, 0);
2618 
2619  execution_kernels.emplace_back(
2620  std::make_unique<ExecutionKernel>(ra_exe_unit,
2621  device_type,
2622  device_id,
2623  eo,
2624  column_fetcher,
2625  query_comp_desc,
2626  query_mem_desc,
2627  frag_list,
2629  render_info,
2630  rowid_lookup_key));
2631  ++frag_list_idx;
2632  };
2633 
2634  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2635  ra_exe_unit);
2636  }
2637 
2638  return execution_kernels;
2639 }
2640 
2642  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2643  const ExecutorDeviceType device_type) {
2644  auto clock_begin = timer_start();
2645  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2646  kernel_queue_time_ms_ += timer_stop(clock_begin);
2647 
2649  // A hack to have unused unit for results collection.
2650  const RelAlgExecutionUnit* ra_exe_unit =
2651  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
2652 
2653 #ifdef HAVE_TBB
2654  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
2655  shared_context.setThreadPool(&tg);
2656  }
2657  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
2658 #endif // HAVE_TBB
2659 
2660  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
2661  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
2662  size_t kernel_idx = 1;
2663  for (auto& kernel : kernels) {
2664  CHECK(kernel.get());
2665  tg.run([this,
2666  &kernel,
2667  &shared_context,
2668  parent_thread_local_ids = logger::thread_local_ids(),
2669  crt_kernel_idx = kernel_idx++] {
2670  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
2671  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
2672  const size_t thread_i = crt_kernel_idx % cpu_threads();
2673  kernel->run(this, thread_i, shared_context);
2674  });
2675  }
2676  tg.wait();
2677 
2678  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2679  // The first arg is used for GPU only, it's not our case.
2680  // TODO: add QueryExecutionContext::getRowSet() interface
2681  // for our case.
2682  if (exec_ctx) {
2683  ResultSetPtr results;
2684  if (ra_exe_unit->estimator) {
2685  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2686  } else {
2687  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2688  }
2689  shared_context.addDeviceResults(std::move(results), {});
2690  }
2691  }
2692 }
2693 
2695  const RelAlgExecutionUnit& ra_exe_unit,
2696  const ExecutorDeviceType device_type,
2697  const size_t table_idx,
2698  const size_t outer_frag_idx,
2699  std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
2700  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
2701  inner_table_id_to_join_condition) {
2702  const auto& table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
2703  auto table_frags_it = selected_tables_fragments.find(table_key);
2704  CHECK(table_frags_it != selected_tables_fragments.end());
2705  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2706  const auto outer_table_fragments_it =
2707  selected_tables_fragments.find(outer_input_desc.getTableKey());
2708  const auto outer_table_fragments = outer_table_fragments_it->second;
2709  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2710  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2711  if (!table_idx) {
2712  return {outer_frag_idx};
2713  }
2714  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2715  auto& inner_frags = table_frags_it->second;
2716  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2717  std::vector<size_t> all_frag_ids;
2718  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2719  ++inner_frag_idx) {
2720  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2721  if (skipFragmentPair(outer_fragment_info,
2722  inner_frag_info,
2723  table_idx,
2724  inner_table_id_to_join_condition,
2725  ra_exe_unit,
2726  device_type)) {
2727  continue;
2728  }
2729  all_frag_ids.push_back(inner_frag_idx);
2730  }
2731  return all_frag_ids;
2732 }
2733 
2734 // Returns true iff the join between two fragments cannot yield any results, per
2735 // shard information. The pair can be skipped to avoid full broadcast.
2737  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2738  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2739  const int table_idx,
2740  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
2741  inner_table_id_to_join_condition,
2742  const RelAlgExecutionUnit& ra_exe_unit,
2743  const ExecutorDeviceType device_type) {
2744  if (device_type != ExecutorDeviceType::GPU) {
2745  return false;
2746  }
2747  CHECK(table_idx >= 0 &&
2748  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2749  const auto& inner_table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
2750  // Both tables need to be sharded the same way.
2751  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2752  outer_fragment_info.shard == inner_fragment_info.shard) {
2753  return false;
2754  }
2755  const Analyzer::BinOper* join_condition{nullptr};
2756  if (ra_exe_unit.join_quals.empty()) {
2757  CHECK(!inner_table_id_to_join_condition.empty());
2758  auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
2759  CHECK(condition_it != inner_table_id_to_join_condition.end());
2760  join_condition = condition_it->second;
2761  CHECK(join_condition);
2762  } else {
2763  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2764  plan_state_->join_info_.join_hash_tables_.size());
2765  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2766  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2767  table_idx) {
2768  CHECK(!join_condition);
2769  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2770  }
2771  }
2772  }
2773  if (!join_condition) {
2774  return false;
2775  }
2776  // TODO(adb): support fragment skipping based on the overlaps operator
2777  if (join_condition->is_overlaps_oper()) {
2778  return false;
2779  }
2780  size_t shard_count{0};
2781  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2782  join_condition->get_left_operand())) {
2783  auto inner_outer_pairs =
2784  HashJoin::normalizeColumnPairs(join_condition, getTemporaryTables()).first;
2786  join_condition, this, inner_outer_pairs);
2787  } else {
2788  shard_count = get_shard_count(join_condition, this);
2789  }
2790  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2791  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2792  }
2793  return shard_count;
2794 }
2795 
2796 namespace {
2797 
2799  const auto& table_key = col_desc->getScanDesc().getTableKey();
2800  const auto col_id = col_desc->getColId();
2801  return get_column_descriptor_maybe({table_key, col_id});
2802 }
2803 
2804 } // namespace
2805 
2806 std::map<shared::TableKey, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2807  const std::vector<InputDescriptor>& input_descs,
2808  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
2809  std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
2810  for (auto& desc : input_descs) {
2811  const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
2812  CHECK(fragments_it != all_tables_fragments.end());
2813  const auto& fragments = *fragments_it->second;
2814  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2815  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2816  frag_offsets[i] = off;
2817  off += fragments[i].getNumTuples();
2818  }
2819  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
2820  }
2821  return tab_id_to_frag_offsets;
2822 }
2823 
2824 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2826  const RelAlgExecutionUnit& ra_exe_unit,
2827  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2828  const std::vector<InputDescriptor>& input_descs,
2829  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
2830  std::vector<std::vector<int64_t>> all_num_rows;
2831  std::vector<std::vector<uint64_t>> all_frag_offsets;
2832  const auto tab_id_to_frag_offsets =
2833  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2834  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2835  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2836  std::vector<int64_t> num_rows;
2837  std::vector<uint64_t> frag_offsets;
2838  if (!ra_exe_unit.union_all) {
2839  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2840  }
2841  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2842  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2843  const auto fragments_it =
2844  all_tables_fragments.find(input_descs[tab_idx].getTableKey());
2845  CHECK(fragments_it != all_tables_fragments.end());
2846  const auto& fragments = *fragments_it->second;
2847  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2848  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2849  const auto& fragment = fragments[frag_id];
2850  num_rows.push_back(fragment.getNumTuples());
2851  } else {
2852  size_t total_row_count{0};
2853  for (const auto& fragment : fragments) {
2854  total_row_count += fragment.getNumTuples();
2855  }
2856  num_rows.push_back(total_row_count);
2857  }
2858  const auto frag_offsets_it =
2859  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
2860  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2861  const auto& offsets = frag_offsets_it->second;
2862  CHECK_LT(frag_id, offsets.size());
2863  frag_offsets.push_back(offsets[frag_id]);
2864  }
2865  all_num_rows.push_back(num_rows);
2866  // Fragment offsets of outer table should be ONLY used by rowid for now.
2867  all_frag_offsets.push_back(frag_offsets);
2868  }
2869  return {all_num_rows, all_frag_offsets};
2870 }
2871 
2872 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2873 // all the table fragments.
2875  const RelAlgExecutionUnit& ra_exe_unit,
2876  const FragmentsList& selected_fragments) const {
2877  const auto& input_descs = ra_exe_unit.input_descs;
2878  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2879  if (nest_level < 1 ||
2880  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2881  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2882  (ra_exe_unit.join_quals.empty() &&
2883  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2884  return false;
2885  }
2886  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
2887  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2888  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
2889  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2890  return fragments.size() > 1;
2891 }
2892 
2894  const ColumnDescriptor* cd,
2895  const InputColDescriptor& inner_col_desc,
2896  const RelAlgExecutionUnit& ra_exe_unit,
2897  const FragmentsList& selected_fragments,
2898  const Data_Namespace::MemoryLevel memory_level) const {
2899  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2900  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
2901  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2902  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
2903  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2904  auto need_linearize =
2905  cd->columnType.is_array() ||
2907  return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
2908 }
2909 
2910 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2911  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2912  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2913  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2914 }
2915 
2917  const ColumnFetcher& column_fetcher,
2918  const RelAlgExecutionUnit& ra_exe_unit,
2919  const int device_id,
2920  const Data_Namespace::MemoryLevel memory_level,
2921  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
2922  const FragmentsList& selected_fragments,
2923  std::list<ChunkIter>& chunk_iterators,
2924  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2925  DeviceAllocator* device_allocator,
2926  const size_t thread_idx,
2927  const bool allow_runtime_interrupt) {
2928  auto timer = DEBUG_TIMER(__func__);
2930  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2931  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2932  std::vector<size_t> local_col_to_frag_pos;
2933  buildSelectedFragsMapping(selected_fragments_crossjoin,
2934  local_col_to_frag_pos,
2935  col_global_ids,
2936  selected_fragments,
2937  ra_exe_unit);
2938 
2940  selected_fragments_crossjoin);
2941  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2942  std::vector<std::vector<int64_t>> all_num_rows;
2943  std::vector<std::vector<uint64_t>> all_frag_offsets;
2944  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2945  std::vector<const int8_t*> frag_col_buffers(
2946  plan_state_->global_to_local_col_ids_.size());
2947  for (const auto& col_id : col_global_ids) {
2948  if (allow_runtime_interrupt) {
2949  bool isInterrupted = false;
2950  {
2953  const auto query_session = getCurrentQuerySession(session_read_lock);
2954  isInterrupted =
2955  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2956  }
2957  if (isInterrupted) {
2959  }
2960  }
2961  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2963  }
2964  CHECK(col_id);
2965  const auto cd = try_get_column_descriptor(col_id.get());
2966  if (cd && cd->isVirtualCol) {
2967  CHECK_EQ("rowid", cd->columnName);
2968  continue;
2969  }
2970  const auto& table_key = col_id->getScanDesc().getTableKey();
2971  const auto fragments_it = all_tables_fragments.find(table_key);
2972  CHECK(fragments_it != all_tables_fragments.end());
2973  const auto fragments = fragments_it->second;
2974  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2975  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2976  CHECK_LT(static_cast<size_t>(it->second),
2977  plan_state_->global_to_local_col_ids_.size());
2978  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2979  if (!fragments->size()) {
2980  return {};
2981  }
2982  CHECK_LT(frag_id, fragments->size());
2983  auto memory_level_for_column = memory_level;
2984  const shared::ColumnKey tbl_col_key{col_id->getScanDesc().getTableKey(),
2985  col_id->getColId()};
2986  if (plan_state_->columns_to_fetch_.find(tbl_col_key) ==
2987  plan_state_->columns_to_fetch_.end()) {
2988  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2989  }
2990  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2991  frag_col_buffers[it->second] =
2992  column_fetcher.getResultSetColumn(col_id.get(),
2993  memory_level_for_column,
2994  device_id,
2995  device_allocator,
2996  thread_idx);
2997  } else {
2998  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2999  // determine if we need special treatment to linearlize multi-frag table
3000  // i.e., a column that is classified as varlen type, i.e., array
3001  // for now, we only support fixed-length array that contains
3002  // geo point coordianates but we can support more types in this way
3004  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3005  bool for_lazy_fetch = false;
3006  if (plan_state_->columns_to_not_fetch_.find(tbl_col_key) !=
3007  plan_state_->columns_to_not_fetch_.end()) {
3008  for_lazy_fetch = true;
3009  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3010  << ", col_name: " << cd->columnName << ")";
3011  }
3012  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3013  col_id->getScanDesc().getTableKey(),
3014  col_id->getColId(),
3015  all_tables_fragments,
3016  chunks,
3017  chunk_iterators,
3018  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3019  for_lazy_fetch ? 0 : device_id,
3020  device_allocator,
3021  thread_idx);
3022  } else {
3023  frag_col_buffers[it->second] = column_fetcher.getAllTableColumnFragments(
3024  col_id->getScanDesc().getTableKey(),
3025  col_id->getColId(),
3026  all_tables_fragments,
3027  memory_level_for_column,
3028  device_id,
3029  device_allocator,
3030  thread_idx);
3031  }
3032  } else {
3033  frag_col_buffers[it->second] = column_fetcher.getOneTableColumnFragment(
3034  col_id->getScanDesc().getTableKey(),
3035  frag_id,
3036  col_id->getColId(),
3037  all_tables_fragments,
3038  chunks,
3039  chunk_iterators,
3040  memory_level_for_column,
3041  device_id,
3042  device_allocator);
3043  }
3044  }
3045  }
3046  all_frag_col_buffers.push_back(frag_col_buffers);
3047  }
3048  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3049  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3050  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3051 }
3052 
3053 namespace {
3055  std::vector<InputDescriptor> const& input_descs) {
3056  auto const has_table_key = [&table_key](InputDescriptor const& input_desc) {
3057  return table_key == input_desc.getTableKey();
3058  };
3059  return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3060  input_descs.begin();
3061 }
3062 
3064  const shared::TableKey& table_key,
3065  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3066  auto const has_table_key = [&table_key](auto const& input_desc) {
3067  return table_key == input_desc->getScanDesc().getTableKey();
3068  };
3069  return std::distance(
3070  input_col_descs.begin(),
3071  std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3072 }
3073 
3074 std::list<std::shared_ptr<const InputColDescriptor>> get_selected_input_col_descs(
3075  const shared::TableKey& table_key,
3076  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3077  std::list<std::shared_ptr<const InputColDescriptor>> selected;
3078  for (auto const& input_col_desc : input_col_descs) {
3079  if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3080  selected.push_back(input_col_desc);
3081  }
3082  }
3083  return selected;
3084 }
3085 
3086 // Set N consecutive elements of frag_col_buffers to ptr in the range of local_col_id.
3087 void set_mod_range(std::vector<int8_t const*>& frag_col_buffers,
3088  int8_t const* const ptr,
3089  size_t const local_col_id,
3090  size_t const N) {
3091  size_t const begin = local_col_id - local_col_id % N; // N divides begin
3092  size_t const end = begin + N;
3093  CHECK_LE(end, frag_col_buffers.size()) << (void*)ptr << ' ' << local_col_id << ' ' << N;
3094  for (size_t i = begin; i < end; ++i) {
3095  frag_col_buffers[i] = ptr;
3096  }
3097 }
3098 } // namespace
3099 
3100 // fetchChunks() assumes that multiple inputs implies a JOIN.
3101 // fetchUnionChunks() assumes that multiple inputs implies a UNION ALL.
3103  const ColumnFetcher& column_fetcher,
3104  const RelAlgExecutionUnit& ra_exe_unit,
3105  const int device_id,
3106  const Data_Namespace::MemoryLevel memory_level,
3107  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3108  const FragmentsList& selected_fragments,
3109  std::list<ChunkIter>& chunk_iterators,
3110  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3111  DeviceAllocator* device_allocator,
3112  const size_t thread_idx,
3113  const bool allow_runtime_interrupt) {
3114  auto timer = DEBUG_TIMER(__func__);
3116 
3117  CHECK_EQ(1u, selected_fragments.size());
3118  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3119  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3120  auto const& input_descs = ra_exe_unit.input_descs;
3121  const auto& selected_table_key = selected_fragments.front().table_key;
3122  size_t const input_descs_index =
3123  get_selected_input_descs_index(selected_table_key, input_descs);
3124  CHECK_LT(input_descs_index, input_descs.size());
3125  size_t const input_col_descs_index =
3126  get_selected_input_col_descs_index(selected_table_key, ra_exe_unit.input_col_descs);
3127  CHECK_LT(input_col_descs_index, ra_exe_unit.input_col_descs.size());
3128  VLOG(2) << "selected_table_key=" << selected_table_key
3129  << " input_descs_index=" << input_descs_index
3130  << " input_col_descs_index=" << input_col_descs_index
3131  << " input_descs=" << shared::printContainer(input_descs)
3132  << " ra_exe_unit.input_col_descs="
3133  << shared::printContainer(ra_exe_unit.input_col_descs);
3134 
3135  std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3136  get_selected_input_col_descs(selected_table_key, ra_exe_unit.input_col_descs);
3137  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3138 
3140  selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3141 
3143  selected_fragments_crossjoin);
3144 
3145  if (allow_runtime_interrupt) {
3146  bool isInterrupted = false;
3147  {
3150  const auto query_session = getCurrentQuerySession(session_read_lock);
3151  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3152  }
3153  if (isInterrupted) {
3155  }
3156  }
3157  std::vector<const int8_t*> frag_col_buffers(
3158  plan_state_->global_to_local_col_ids_.size());
3159  for (const auto& col_id : selected_input_col_descs) {
3160  CHECK(col_id);
3161  const auto cd = try_get_column_descriptor(col_id.get());
3162  if (cd && cd->isVirtualCol) {
3163  CHECK_EQ("rowid", cd->columnName);
3164  continue;
3165  }
3166  const auto fragments_it = all_tables_fragments.find(selected_table_key);
3167  CHECK(fragments_it != all_tables_fragments.end());
3168  const auto fragments = fragments_it->second;
3169  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3170  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3171  size_t const local_col_id = it->second;
3172  CHECK_LT(local_col_id, plan_state_->global_to_local_col_ids_.size());
3173  constexpr size_t frag_id = 0;
3174  if (fragments->empty()) {
3175  return {};
3176  }
3177  MemoryLevel const memory_level_for_column =
3178  plan_state_->columns_to_fetch_.count({selected_table_key, col_id->getColId()})
3179  ? memory_level
3181  int8_t const* ptr;
3182  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3183  ptr = column_fetcher.getResultSetColumn(
3184  col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3185  } else if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3186  ptr = column_fetcher.getAllTableColumnFragments(selected_table_key,
3187  col_id->getColId(),
3188  all_tables_fragments,
3189  memory_level_for_column,
3190  device_id,
3191  device_allocator,
3192  thread_idx);
3193  } else {
3194  ptr = column_fetcher.getOneTableColumnFragment(selected_table_key,
3195  frag_id,
3196  col_id->getColId(),
3197  all_tables_fragments,
3198  chunks,
3199  chunk_iterators,
3200  memory_level_for_column,
3201  device_id,
3202  device_allocator);
3203  }
3204  // Set frag_col_buffers[i]=ptr for i in mod input_descs.size() range of local_col_id.
3205  set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3206  }
3207  auto const [num_rows, frag_offsets] = getRowCountAndOffsetForAllFrags(
3208  ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3209 
3210  VLOG(2) << "frag_col_buffers=" << shared::printContainer(frag_col_buffers)
3211  << " num_rows=" << shared::printContainer(num_rows)
3212  << " frag_offsets=" << shared::printContainer(frag_offsets)
3213  << " input_descs_index=" << input_descs_index
3214  << " input_col_descs_index=" << input_col_descs_index;
3215  return {{std::move(frag_col_buffers)},
3216  {{num_rows[0][input_descs_index]}},
3217  {{frag_offsets[0][input_descs_index]}}};
3218 }
3219 
3220 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3221  const size_t scan_idx,
3222  const RelAlgExecutionUnit& ra_exe_unit) {
3223  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3224  scan_idx > 0 &&
3225  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3226  !selected_fragments[scan_idx].fragment_ids.empty()) {
3227  // Fetch all fragments
3228  return {size_t(0)};
3229  }
3230 
3231  return selected_fragments[scan_idx].fragment_ids;
3232 }
3233 
3235  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3236  std::vector<size_t>& local_col_to_frag_pos,
3237  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3238  const FragmentsList& selected_fragments,
3239  const RelAlgExecutionUnit& ra_exe_unit) {
3240  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3241  size_t frag_pos{0};
3242  const auto& input_descs = ra_exe_unit.input_descs;
3243  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3244  const auto& table_key = input_descs[scan_idx].getTableKey();
3245  CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3246  selected_fragments_crossjoin.push_back(
3247  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3248  for (const auto& col_id : col_global_ids) {
3249  CHECK(col_id);
3250  const auto& input_desc = col_id->getScanDesc();
3251  if (input_desc.getTableKey() != table_key ||
3252  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3253  continue;
3254  }
3255  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3256  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3257  CHECK_LT(static_cast<size_t>(it->second),
3258  plan_state_->global_to_local_col_ids_.size());
3259  local_col_to_frag_pos[it->second] = frag_pos;
3260  }
3261  ++frag_pos;
3262  }
3263 }
3264 
3266  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3267  const FragmentsList& selected_fragments,
3268  const RelAlgExecutionUnit& ra_exe_unit) {
3269  const auto& input_descs = ra_exe_unit.input_descs;
3270  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3271  // selected_fragments is set in assignFragsToKernelDispatch execution_kernel.fragments
3272  if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3273  selected_fragments_crossjoin.push_back({size_t(1)});
3274  }
3275  }
3276 }
3277 
3278 namespace {
3279 
3281  public:
3282  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3284  for (auto out : out_vec_) {
3285  delete[] out;
3286  }
3287  }
3288 
3289  private:
3290  std::vector<int64_t*> out_vec_;
3291 };
3292 } // namespace
3293 
3295  const RelAlgExecutionUnit& ra_exe_unit,
3296  const CompilationResult& compilation_result,
3297  const bool hoist_literals,
3298  ResultSetPtr* results,
3299  const std::vector<Analyzer::Expr*>& target_exprs,
3300  const ExecutorDeviceType device_type,
3301  std::vector<std::vector<const int8_t*>>& col_buffers,
3302  QueryExecutionContext* query_exe_context,
3303  const std::vector<std::vector<int64_t>>& num_rows,
3304  const std::vector<std::vector<uint64_t>>& frag_offsets,
3305  Data_Namespace::DataMgr* data_mgr,
3306  const int device_id,
3307  const uint32_t start_rowid,
3308  const uint32_t num_tables,
3309  const bool allow_runtime_interrupt,
3310  RenderInfo* render_info,
3311  const bool optimize_cuda_block_and_grid_sizes,
3312  const int64_t rows_to_process) {
3314  auto timer = DEBUG_TIMER(__func__);
3315  CHECK(!results || !(*results));
3316  if (col_buffers.empty()) {
3317  return 0;
3318  }
3319 
3320  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3321  if (render_info) {
3322  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3323  // here, we are in non-insitu mode.
3324  CHECK(render_info->useCudaBuffers() || !render_info->isInSitu())
3325  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3326  "currently unsupported.";
3327  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3328  }
3329 
3330  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3331  std::vector<int64_t*> out_vec;
3332  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3333  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3334  std::unique_ptr<OutVecOwner> output_memory_scope;
3335  if (allow_runtime_interrupt) {
3336  bool isInterrupted = false;
3337  {
3340  const auto query_session = getCurrentQuerySession(session_read_lock);
3341  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3342  }
3343  if (isInterrupted) {
3345  }
3346  }
3347  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3349  }
3350  if (device_type == ExecutorDeviceType::CPU) {
3351  CpuCompilationContext* cpu_generated_code =
3352  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3353  CHECK(cpu_generated_code);
3354  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3355  cpu_generated_code,
3356  hoist_literals,
3357  hoist_buf,
3358  col_buffers,
3359  num_rows,
3360  frag_offsets,
3361  0,
3362  &error_code,
3363  num_tables,
3364  join_hash_table_ptrs,
3365  rows_to_process);
3366  output_memory_scope.reset(new OutVecOwner(out_vec));
3367  } else {
3368  GpuCompilationContext* gpu_generated_code =
3369  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3370  CHECK(gpu_generated_code);
3371  try {
3372  out_vec = query_exe_context->launchGpuCode(
3373  ra_exe_unit,
3374  gpu_generated_code,
3375  hoist_literals,
3376  hoist_buf,
3377  col_buffers,
3378  num_rows,
3379  frag_offsets,
3380  0,
3381  data_mgr,
3382  blockSize(),
3383  gridSize(),
3384  device_id,
3385  compilation_result.gpu_smem_context.getSharedMemorySize(),
3386  &error_code,
3387  num_tables,
3388  allow_runtime_interrupt,
3389  join_hash_table_ptrs,
3390  render_allocator_map_ptr,
3391  optimize_cuda_block_and_grid_sizes);
3392  output_memory_scope.reset(new OutVecOwner(out_vec));
3393  } catch (const OutOfMemory&) {
3394  return ERR_OUT_OF_GPU_MEM;
3395  } catch (const std::exception& e) {
3396  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3397  }
3398  }
3399  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3400  error_code == Executor::ERR_DIV_BY_ZERO ||
3401  error_code == Executor::ERR_OUT_OF_TIME ||
3402  error_code == Executor::ERR_INTERRUPTED ||
3404  error_code == Executor::ERR_GEOS ||
3406  return error_code;
3407  }
3408  if (ra_exe_unit.estimator) {
3409  CHECK(!error_code);
3410  if (results) {
3411  *results =
3412  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3413  }
3414  return 0;
3415  }
3416  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3417  CHECK(results);
3418  std::vector<int64_t> reduced_outs;
3419  const auto num_frags = col_buffers.size();
3420  const size_t entry_count =
3421  device_type == ExecutorDeviceType::GPU
3422  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3423  ? 1
3424  : blockSize() * gridSize() * num_frags)
3425  : num_frags;
3426  if (size_t(1) == entry_count) {
3427  for (auto out : out_vec) {
3428  CHECK(out);
3429  reduced_outs.push_back(*out);
3430  }
3431  } else {
3432  size_t out_vec_idx = 0;
3433 
3434  for (const auto target_expr : target_exprs) {
3435  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3436  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3437  << target_expr->toString();
3438 
3439  const int num_iterations = agg_info.sql_type.is_geometry()
3440  ? agg_info.sql_type.get_physical_coord_cols()
3441  : 1;
3442 
3443  for (int i = 0; i < num_iterations; i++) {
3444  int64_t val1;
3445  const bool float_argument_input = takes_float_argument(agg_info);
3446  if (is_distinct_target(agg_info) ||
3447  shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3448  bool const check = shared::
3449  is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3450  agg_info.agg_kind);
3451  CHECK(check) << agg_info.agg_kind;
3452  val1 = out_vec[out_vec_idx][0];
3453  error_code = 0;
3454  } else {
3455  const auto chosen_bytes = static_cast<size_t>(
3456  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3457  std::tie(val1, error_code) = Executor::reduceResults(
3458  agg_info.agg_kind,
3459  agg_info.sql_type,
3460  query_exe_context->getAggInitValForIndex(out_vec_idx),
3461  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3462  out_vec[out_vec_idx],
3463  entry_count,
3464  false,
3465  float_argument_input);
3466  }
3467  if (error_code) {
3468  break;
3469  }
3470  reduced_outs.push_back(val1);
3471  if (agg_info.agg_kind == kAVG ||
3472  (agg_info.agg_kind == kSAMPLE &&
3473  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3474  const auto chosen_bytes = static_cast<size_t>(
3475  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3476  1));
3477  int64_t val2;
3478  std::tie(val2, error_code) = Executor::reduceResults(
3479  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3480  agg_info.sql_type,
3481  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3482  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3483  out_vec[out_vec_idx + 1],
3484  entry_count,
3485  false,
3486  false);
3487  if (error_code) {
3488  break;
3489  }
3490  reduced_outs.push_back(val2);
3491  ++out_vec_idx;
3492  }
3493  ++out_vec_idx;
3494  }
3495  }
3496  }
3497 
3498  if (error_code) {
3499  return error_code;
3500  }
3501 
3502  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3503  auto rows_ptr = std::shared_ptr<ResultSet>(
3504  query_exe_context->query_buffers_->result_sets_[0].release());
3505  rows_ptr->fillOneEntry(reduced_outs);
3506  *results = std::move(rows_ptr);
3507  return error_code;
3508 }
3509 
3510 namespace {
3511 
3512 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3513  CHECK(scan_limit);
3514  return results && results->rowCount() < scan_limit;
3515 }
3516 
3517 } // namespace
3518 
3520  const RelAlgExecutionUnit& ra_exe_unit,
3521  const CompilationResult& compilation_result,
3522  const bool hoist_literals,
3523  ResultSetPtr* results,
3524  const ExecutorDeviceType device_type,
3525  std::vector<std::vector<const int8_t*>>& col_buffers,
3526  const std::vector<size_t> outer_tab_frag_ids,
3527  QueryExecutionContext* query_exe_context,
3528  const std::vector<std::vector<int64_t>>& num_rows,
3529  const std::vector<std::vector<uint64_t>>& frag_offsets,
3530  Data_Namespace::DataMgr* data_mgr,
3531  const int device_id,
3532  const shared::TableKey& outer_table_key,
3533  const int64_t scan_limit,
3534  const uint32_t start_rowid,
3535  const uint32_t num_tables,
3536  const bool allow_runtime_interrupt,
3537  RenderInfo* render_info,
3538  const bool optimize_cuda_block_and_grid_sizes,
3539  const int64_t rows_to_process) {
3540  auto timer = DEBUG_TIMER(__func__);
3542  // TODO: get results via a separate method, but need to do something with literals.
3543  CHECK(!results || !(*results));
3544  if (col_buffers.empty()) {
3545  return 0;
3546  }
3547  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3548  // TODO(alex):
3549  // 1. Optimize size (make keys more compact).
3550  // 2. Resize on overflow.
3551  // 3. Optimize runtime.
3552  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3553  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3554  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3555  if (allow_runtime_interrupt) {
3556  bool isInterrupted = false;
3557  {
3560  const auto query_session = getCurrentQuerySession(session_read_lock);
3561  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3562  }
3563  if (isInterrupted) {
3565  }
3566  }
3567  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3568  return ERR_INTERRUPTED;
3569  }
3570 
3571  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3572  if (render_info && render_info->useCudaBuffers()) {
3573  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3574  }
3575 
3576  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3577  << " ra_exe_unit.input_descs="
3578  << shared::printContainer(ra_exe_unit.input_descs)
3579  << " ra_exe_unit.input_col_descs="
3580  << shared::printContainer(ra_exe_unit.input_col_descs)
3581  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3582  << " num_rows=" << shared::printContainer(num_rows)
3583  << " frag_offsets=" << shared::printContainer(frag_offsets)
3584  << " query_exe_context->query_buffers_->num_rows_="
3585  << query_exe_context->query_buffers_->num_rows_
3586  << " query_exe_context->query_mem_desc_.getEntryCount()="
3587  << query_exe_context->query_mem_desc_.getEntryCount()
3588  << " device_id=" << device_id << " outer_table_key=" << outer_table_key
3589  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3590  << " num_tables=" << num_tables;
3591 
3592  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3593  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3594  // with outer_table_id.
3595  if (ra_exe_unit_copy.union_all) {
3596  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3597  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3598  ra_exe_unit_copy.input_descs.end(),
3599  [outer_table_key](auto const& a, auto const& b) {
3600  return a.getTableKey() == outer_table_key &&
3601  b.getTableKey() != outer_table_key;
3602  });
3603  while (!ra_exe_unit_copy.input_descs.empty() &&
3604  ra_exe_unit_copy.input_descs.back().getTableKey() != outer_table_key) {
3605  ra_exe_unit_copy.input_descs.pop_back();
3606  }
3607  // Filter ra_exe_unit_copy.input_col_descs.
3608  ra_exe_unit_copy.input_col_descs.remove_if(
3609  [outer_table_key](auto const& input_col_desc) {
3610  return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
3611  });
3612  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3613  }
3614 
3615  if (device_type == ExecutorDeviceType::CPU) {
3616  const int32_t scan_limit_for_query =
3617  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3618  const int32_t max_matched = scan_limit_for_query == 0
3619  ? query_exe_context->query_mem_desc_.getEntryCount()
3620  : scan_limit_for_query;
3621  CpuCompilationContext* cpu_generated_code =
3622  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3623  CHECK(cpu_generated_code);
3624  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3625  cpu_generated_code,
3626  hoist_literals,
3627  hoist_buf,
3628  col_buffers,
3629  num_rows,
3630  frag_offsets,
3631  max_matched,
3632  &error_code,
3633  num_tables,
3634  join_hash_table_ptrs,
3635  rows_to_process);
3636  } else {
3637  try {
3638  GpuCompilationContext* gpu_generated_code =
3639  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3640  CHECK(gpu_generated_code);
3641  query_exe_context->launchGpuCode(
3642  ra_exe_unit_copy,
3643  gpu_generated_code,
3644  hoist_literals,
3645  hoist_buf,
3646  col_buffers,
3647  num_rows,
3648  frag_offsets,
3649  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3650  data_mgr,
3651  blockSize(),
3652  gridSize(),
3653  device_id,
3654  compilation_result.gpu_smem_context.getSharedMemorySize(),
3655  &error_code,
3656  num_tables,
3657  allow_runtime_interrupt,
3658  join_hash_table_ptrs,
3659  render_allocator_map_ptr,
3660  optimize_cuda_block_and_grid_sizes);
3661  } catch (const OutOfMemory&) {
3662  return ERR_OUT_OF_GPU_MEM;
3663  } catch (const OutOfRenderMemory&) {
3664  return ERR_OUT_OF_RENDER_MEM;
3665  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3667  } catch (const std::exception& e) {
3668  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3669  }
3670  }
3671 
3672  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3673  error_code == Executor::ERR_DIV_BY_ZERO ||
3674  error_code == Executor::ERR_OUT_OF_TIME ||
3675  error_code == Executor::ERR_INTERRUPTED ||
3677  error_code == Executor::ERR_GEOS ||
3679  return error_code;
3680  }
3681 
3682  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3683  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3684  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
3685  query_exe_context->query_mem_desc_);
3686  CHECK(*results);
3687  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
3688  (*results)->holdLiterals(hoist_buf);
3689  }
3690  if (error_code < 0 && render_allocator_map_ptr) {
3691  auto const adjusted_scan_limit =
3692  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3693  // More rows passed the filter than available slots. We don't have a count to check,
3694  // so assume we met the limit if a scan limit is set
3695  if (adjusted_scan_limit != 0) {
3696  return 0;
3697  } else {
3698  return error_code;
3699  }
3700  }
3701  if (results && error_code &&
3702  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
3703  return error_code; // unlucky, not enough results and we ran out of slots
3704  }
3705 
3706  return 0;
3707 }
3708 
3709 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3710  const int device_id) {
3711  std::vector<int8_t*> table_ptrs;
3712  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3713  for (auto hash_table : join_hash_tables) {
3714  if (!hash_table) {
3715  CHECK(table_ptrs.empty());
3716  return {};
3717  }
3718  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3719  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3720  }
3721  return table_ptrs;
3722 }
3723 
3724 void Executor::nukeOldState(const bool allow_lazy_fetch,
3725  const std::vector<InputTableInfo>& query_infos,
3726  const PlanState::DeletedColumnsMap& deleted_cols_map,
3727  const RelAlgExecutionUnit* ra_exe_unit) {
3730  const bool contains_left_deep_outer_join =
3731  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3732  ra_exe_unit->join_quals.end(),
3733  [](const JoinCondition& join_condition) {
3734  return join_condition.type == JoinType::LEFT;
3735  }) != ra_exe_unit->join_quals.end();
3736  cgen_state_.reset(
3737  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
3738  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3739  query_infos,
3740  deleted_cols_map,
3741  this));
3742 }
3743 
3744 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3745  const std::vector<InputTableInfo>& query_infos) {
3747  const auto ld_count = input_descs.size();
3748  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3749  for (size_t i = 0; i < ld_count; ++i) {
3750  CHECK_LT(i, query_infos.size());
3751  const auto frag_count = query_infos[i].info.fragments.size();
3752  if (i > 0) {
3753  cgen_state_->frag_offsets_.push_back(nullptr);
3754  } else {
3755  if (frag_count > 1) {
3756  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
3757  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3758  } else {
3759  cgen_state_->frag_offsets_.push_back(nullptr);
3760  }
3761  }
3762  }
3763 }
3764 
3766  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3767  const std::vector<InputTableInfo>& query_infos,
3768  const MemoryLevel memory_level,
3769  const JoinType join_type,
3770  const HashType preferred_hash_type,
3771  ColumnCacheMap& column_cache,
3772  const HashTableBuildDagMap& hashtable_build_dag_map,
3773  const RegisteredQueryHint& query_hint,
3774  const TableIdToNodeMap& table_id_to_node_map) {
3775  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3776  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3777  }
3778  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3780  }
3781  try {
3782  auto tbl = HashJoin::getInstance(qual_bin_oper,
3783  query_infos,
3784  memory_level,
3785  join_type,
3786  preferred_hash_type,
3787  deviceCountForMemoryLevel(memory_level),
3788  column_cache,
3789  this,
3790  hashtable_build_dag_map,
3791  query_hint,
3792  table_id_to_node_map);
3793  return {tbl, ""};
3794  } catch (const HashJoinFail& e) {
3795  return {nullptr, e.what()};
3796  }
3797 }
3798 
3799 int8_t Executor::warpSize() const {
3800  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3801  CHECK(!dev_props.empty());
3802  return dev_props.front().warpSize;
3803 }
3804 
3805 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3806 // exist?
3807 unsigned Executor::gridSize() const {
3808  CHECK(data_mgr_);
3809  const auto cuda_mgr = data_mgr_->getCudaMgr();
3810  if (!cuda_mgr) {
3811  return 0;
3812  }
3813  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3814 }
3815 
3816 unsigned Executor::numBlocksPerMP() const {
3817  return std::max((unsigned)2,
3818  shared::ceil_div(grid_size_x_, cudaMgr()->getMinNumMPsForAllDevices()));
3819 }
3820 
3821 unsigned Executor::blockSize() const {
3822  CHECK(data_mgr_);
3823  const auto cuda_mgr = data_mgr_->getCudaMgr();
3824  if (!cuda_mgr) {
3825  return 0;
3826  }
3827  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3828  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3829 }
3830 
3831 void Executor::setGridSize(unsigned grid_size) {
3832  grid_size_x_ = grid_size;
3833 }
3834 
3836  grid_size_x_ = 0;
3837 }
3838 
3839 void Executor::setBlockSize(unsigned block_size) {
3840  block_size_x_ = block_size;
3841 }
3842 
3844  block_size_x_ = 0;
3845 }
3846 
3848  return max_gpu_slab_size_;
3849 }
3850 
3851 int64_t Executor::deviceCycles(int milliseconds) const {
3852  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3853  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3854 }
3855 
3856 llvm::Value* Executor::castToFP(llvm::Value* value,
3857  SQLTypeInfo const& from_ti,
3858  SQLTypeInfo const& to_ti) {
3860  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3861  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3862  llvm::Type* fp_type{nullptr};
3863  switch (to_ti.get_size()) {
3864  case 4:
3865  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3866  break;
3867  case 8:
3868  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3869  break;
3870  default:
3871  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3872  }
3873  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3874  if (from_ti.get_scale()) {
3875  value = cgen_state_->ir_builder_.CreateFDiv(
3876  value,
3877  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3878  }
3879  }
3880  return value;
3881 }
3882 
3883 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3885  CHECK(val->getType()->isPointerTy());
3886 
3887  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3888  const auto val_type = val_ptr_type->getPointerElementType();
3889  size_t val_width = 0;
3890  if (val_type->isIntegerTy()) {
3891  val_width = val_type->getIntegerBitWidth();
3892  } else {
3893  if (val_type->isFloatTy()) {
3894  val_width = 32;
3895  } else {
3896  CHECK(val_type->isDoubleTy());
3897  val_width = 64;
3898  }
3899  }
3900  CHECK_LT(size_t(0), val_width);
3901  if (bitWidth == val_width) {
3902  return val;
3903  }
3904  return cgen_state_->ir_builder_.CreateBitCast(
3905  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3906 }
3907 
3908 #define EXECUTE_INCLUDE
3909 #include "ArrayOps.cpp"
3910 #include "DateAdd.cpp"
3911 #include "GeoOps.cpp"
3912 #include "RowFunctionOps.cpp"
3913 #include "StringFunctions.cpp"
3915 #undef EXECUTE_INCLUDE
3916 
3917 namespace {
3919  const ColumnDescriptor* deleted_cd,
3920  const shared::TableKey& table_key) {
3921  auto deleted_cols_it = deleted_cols_map.find(table_key);
3922  if (deleted_cols_it == deleted_cols_map.end()) {
3923  CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
3924  } else {
3925  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3926  }
3927 }
3928 } // namespace
3929 
3930 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3931  const RelAlgExecutionUnit& ra_exe_unit,
3932  const CompilationOptions& co) {
3933  if (!co.filter_on_deleted_column) {
3934  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3935  }
3936  auto ra_exe_unit_with_deleted = ra_exe_unit;
3937  PlanState::DeletedColumnsMap deleted_cols_map;
3938  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3939  if (input_table.getSourceType() != InputSourceType::TABLE) {
3940  continue;
3941  }
3942  const auto& table_key = input_table.getTableKey();
3943  const auto catalog =
3945  CHECK(catalog);
3946  const auto td = catalog->getMetadataForTable(table_key.table_id);
3947  CHECK(td);
3948  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
3949  if (!deleted_cd) {
3950  continue;
3951  }
3952  CHECK(deleted_cd->columnType.is_boolean());
3953  // check deleted column is not already present
3954  bool found = false;
3955  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3956  if (input_col.get()->getColId() == deleted_cd->columnId &&
3957  input_col.get()->getScanDesc().getTableKey() == table_key &&
3958  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3959  found = true;
3960  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
3961  break;
3962  }
3963  }
3964  if (!found) {
3965  // add deleted column
3966  ra_exe_unit_with_deleted.input_col_descs.emplace_back(
3967  new InputColDescriptor(deleted_cd->columnId,
3968  deleted_cd->tableId,
3969  table_key.db_id,
3970  input_table.getNestLevel()));
3971  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
3972  }
3973  }
3974  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3975 }
3976 
3977 namespace {
3978 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3979 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3980 // return type.
3981 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3982  const int64_t chunk_min,
3983  const int64_t chunk_max,
3984  const SQLTypeInfo& lhs_type,
3985  const SQLTypeInfo& rhs_type) {
3986  const int32_t ldim = lhs_type.get_dimension();
3987  const int32_t rdim = rhs_type.get_dimension();
3988  CHECK(ldim != rdim);
3989  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3990  if (ldim > rdim) {
3991  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3992  return {true, chunk_min / scale, chunk_max / scale};
3993  }
3994 
3995  using checked_int64_t = boost::multiprecision::number<
3996  boost::multiprecision::cpp_int_backend<64,
3997  64,
3998  boost::multiprecision::signed_magnitude,
3999  boost::multiprecision::checked,
4000  void>>;
4001 
4002  try {
4003  auto ret =
4004  std::make_tuple(true,
4005  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
4006  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
4007  return ret;
4008  } catch (const std::overflow_error& e) {
4009  // noop
4010  }
4011  return std::make_tuple(false, chunk_min, chunk_max);
4012 }
4013 
4014 } // namespace
4015 
4017  const InputDescriptor& table_desc,
4018  const Fragmenter_Namespace::FragmentInfo& fragment) {
4019  // Skip temporary tables
4020  const auto& table_key = table_desc.getTableKey();
4021  if (table_key.table_id < 0) {
4022  return false;
4023  }
4024 
4025  const auto catalog =
4027  CHECK(catalog);
4028  const auto td = catalog->getMetadataForTable(fragment.physicalTableId);
4029  CHECK(td);
4030  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4031  if (!deleted_cd) {
4032  return false;
4033  }
4034 
4035  const auto& chunk_type = deleted_cd->columnType;
4036  CHECK(chunk_type.is_boolean());
4037 
4038  const auto deleted_col_id = deleted_cd->columnId;
4039  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4040  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4041  const int64_t chunk_min =
4042  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4043  const int64_t chunk_max =
4044  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4045  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4046  // is true (signifying all rows deleted)
4047  return true;
4048  }
4049  }
4050  return false;
4051 }
4052 
4054  const Analyzer::BinOper* comp_expr,
4055  const Analyzer::ColumnVar* lhs_col,
4056  const Fragmenter_Namespace::FragmentInfo& fragment,
4057  const Analyzer::Constant* rhs_const) const {
4058  auto col_id = lhs_col->getColumnKey().column_id;
4059  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4060  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4062  }
4063  double chunk_min{0.};
4064  double chunk_max{0.};
4065  const auto& chunk_type = lhs_col->get_type_info();
4066  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4067  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4068  if (chunk_min > chunk_max) {
4070  }
4071 
4072  const auto datum_fp = rhs_const->get_constval();
4073  const auto rhs_type = rhs_const->get_type_info().get_type();
4074  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4075 
4076  // Do we need to codegen the constant like the integer path does?
4077  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4078 
4079  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4080  // slightly tricky due to do cleanly as we do not have rowid on this path
4081  switch (comp_expr->get_optype()) {
4082  case kGE:
4083  if (chunk_max < rhs_val) {
4085  }
4086  break;
4087  case kGT:
4088  if (chunk_max <= rhs_val) {
4090  }
4091  break;
4092  case kLE:
4093  if (chunk_min > rhs_val) {
4095  }
4096  break;
4097  case kLT:
4098  if (chunk_min >= rhs_val) {
4100  }
4101  break;
4102  case kEQ:
4103  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4105  }
4106  break;
4107  default:
4108  break;
4109  }
4111 }
4112 
4113 std::pair<bool, int64_t> Executor::skipFragment(
4114  const InputDescriptor& table_desc,
4115  const Fragmenter_Namespace::FragmentInfo& fragment,
4116  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4117  const std::vector<uint64_t>& frag_offsets,
4118  const size_t frag_idx) {
4119  // First check to see if all of fragment is deleted, in which case we know we can skip
4120  if (isFragmentFullyDeleted(table_desc, fragment)) {
4121  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4122  << ", fragment id: " << frag_idx;
4123  return {true, -1};
4124  }
4125 
4126  for (const auto& simple_qual : simple_quals) {
4127  const auto comp_expr =
4128  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4129  if (!comp_expr) {
4130  // is this possible?
4131  return {false, -1};
4132  }
4133  const auto lhs = comp_expr->get_left_operand();
4134  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4135  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4136  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4137  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4138  if (lhs_uexpr) {
4139  CHECK(lhs_uexpr->get_optype() ==
4140  kCAST); // We should have only been passed a cast expression
4141  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4142  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4143  continue;
4144  }
4145  } else {
4146  continue;
4147  }
4148  }
4149  const auto rhs = comp_expr->get_right_operand();
4150  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4151  if (!rhs_const) {
4152  // is this possible?
4153  return {false, -1};
4154  }
4155  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4156  !lhs->get_type_info().is_fp()) {
4157  continue;
4158  }
4159  if (lhs->get_type_info().is_fp()) {
4160  const auto fragment_skip_status =
4161  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4162  switch (fragment_skip_status) {
4164  return {true, -1};
4166  return {false, -1};
4168  continue;
4169  default:
4170  UNREACHABLE();
4171  }
4172  }
4173 
4174  // Everything below is logic for integer and integer-backed timestamps
4175  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4176 
4177  if (lhs_col->get_type_info().is_timestamp() &&
4178  rhs_const->get_type_info().is_any<kTIME>()) {
4179  // when casting from a timestamp to time
4180  // is not possible to get a valid range
4181  // so we can't skip any fragment
4182  continue;
4183  }
4184 
4185  const int col_id = lhs_col->getColumnKey().column_id;
4186  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4187  int64_t chunk_min{0};
4188  int64_t chunk_max{0};
4189  bool is_rowid{false};
4190  size_t start_rowid{0};
4191  const auto& table_key = table_desc.getTableKey();
4192  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4193  auto cd = get_column_descriptor({table_key, col_id});
4194  if (cd->isVirtualCol) {
4195  CHECK(cd->columnName == "rowid");
4196  const auto& table_generation = getTableGeneration(table_key);
4197  start_rowid = table_generation.start_rowid;
4198  chunk_min = frag_offsets[frag_idx] + start_rowid;
4199  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4200  is_rowid = true;
4201  }
4202  } else {
4203  const auto& chunk_type = lhs_col->get_type_info();
4204  chunk_min =
4205  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4206  chunk_max =
4207  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4208  }
4209  if (chunk_min > chunk_max) {
4210  // invalid metadata range, do not skip fragment
4211  return {false, -1};
4212  }
4213  if (lhs->get_type_info().is_timestamp() &&
4214  (lhs_col->get_type_info().get_dimension() !=
4215  rhs_const->get_type_info().get_dimension()) &&
4216  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4217  rhs_const->get_type_info().is_high_precision_timestamp())) {
4218  // If original timestamp lhs col has different precision,
4219  // column metadata holds value in original precision
4220  // therefore adjust rhs value to match lhs precision
4221 
4222  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4223  // artificially limit the lhs column range. RHS overflow/underflow is already
4224  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4225  bool is_valid;
4226  std::tie(is_valid, chunk_min, chunk_max) =
4228  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4229  if (!is_valid) {
4230  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4231  "value: "
4232  << std::to_string(chunk_min)
4233  << "\nChunk max value: " << std::to_string(chunk_max)
4234  << "\nLHS col precision is: "
4235  << std::to_string(lhs_col->get_type_info().get_dimension())
4236  << "\nRHS precision is: "
4237  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4238  return {false, -1};
4239  }
4240  }
4241  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4242  // It is obvious that a cast from timestamp to date is happening here,
4243  // so we have to correct the chunk min and max values to lower the precision as of
4244  // the date
4245  chunk_min = DateTruncateHighPrecisionToDate(
4246  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4247  chunk_max = DateTruncateHighPrecisionToDate(
4248  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4249  }
4250  llvm::LLVMContext local_context;
4251  CgenState local_cgen_state(local_context);
4252  CodeGenerator code_generator(&local_cgen_state, nullptr);
4253 
4254  const auto rhs_val =
4255  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4256 
4257  switch (comp_expr->get_optype()) {
4258  case kGE:
4259  if (chunk_max < rhs_val) {
4260  return {true, -1};
4261  }
4262  break;
4263  case kGT:
4264  if (chunk_max <= rhs_val) {
4265  return {true, -1};
4266  }
4267  break;
4268  case kLE:
4269  if (chunk_min > rhs_val) {
4270  return {true, -1};
4271  }
4272  break;
4273  case kLT:
4274  if (chunk_min >= rhs_val) {
4275  return {true, -1};
4276  }
4277  break;
4278  case kEQ:
4279  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4280  return {true, -1};
4281  } else if (is_rowid) {
4282  return {false, rhs_val - start_rowid};
4283  }
4284  break;
4285  default:
4286  break;
4287  }
4288  }
4289  return {false, -1};
4290 }
4291 
4292 /*
4293  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4294  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4295  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4296  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4297  * if at least one of these skipFragment calls return a true statment in its first value.
4298  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4299  * second value only if its first value is "false".
4300  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4301  * i.e., we only skip if none of the quals trigger the code to update the
4302  * rowid_lookup_key
4303  * - Only AND operations are valid and considered:
4304  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4305  * the skip
4306  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4307  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4308  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4309  * implemented differently, which cause the first one to skip correctly, but the second
4310  * one will not skip.
4311  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4312  * possible
4313  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4314  * projection, no skipping
4315  */
4316 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4317  const InputDescriptor& table_desc,
4318  const RelAlgExecutionUnit& ra_exe_unit,
4319  const Fragmenter_Namespace::FragmentInfo& fragment,
4320  const std::vector<uint64_t>& frag_offsets,
4321  const size_t frag_idx) {
4322  std::pair<bool, int64_t> skip_frag{false, -1};
4323  for (auto& inner_join : ra_exe_unit.join_quals) {
4324  if (inner_join.type != JoinType::INNER) {
4325  continue;
4326  }
4327 
4328  // extracting all the conjunctive simple_quals from the quals stored for the inner
4329  // join
4330  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4331  for (auto& qual : inner_join.quals) {
4332  auto temp_qual = qual_to_conjunctive_form(qual);
4333  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4334  temp_qual.simple_quals.begin(),
4335  temp_qual.simple_quals.end());
4336  }
4337  auto temp_skip_frag = skipFragment(
4338  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4339  if (temp_skip_frag.second != -1) {
4340  skip_frag.second = temp_skip_frag.second;
4341  return skip_frag;
4342  } else {
4343  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4344  }
4345  }
4346  return skip_frag;
4347 }
4348 
4350  const std::unordered_set<PhysicalInput>& phys_inputs) {
4351  AggregatedColRange agg_col_range_cache;
4352  std::unordered_set<shared::TableKey> phys_table_keys;
4353  for (const auto& phys_input : phys_inputs) {
4354  phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4355  }
4356  std::vector<InputTableInfo> query_infos;
4357  for (const auto& table_key : phys_table_keys) {
4358  query_infos.emplace_back(InputTableInfo{table_key, getTableInfo(table_key)});
4359  }
4360  for (const auto& phys_input : phys_inputs) {
4361  auto db_id = phys_input.db_id;
4362  auto table_id = phys_input.table_id;
4363  auto column_id = phys_input.col_id;
4364  const auto cd =
4365  Catalog_Namespace::get_metadata_for_column({db_id, table_id, column_id});
4366  CHECK(cd);
4367  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4368  const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4369  cd->columnType, shared::ColumnKey{db_id, table_id, column_id}, 0);
4370  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4371  agg_col_range_cache.setColRange(phys_input, col_range);
4372  }
4373  }
4374  return agg_col_range_cache;
4375 }
4376 
4378  const std::unordered_set<PhysicalInput>& phys_inputs) {
4379  StringDictionaryGenerations string_dictionary_generations;
4380  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4381  // the case then we need to populate them here to make sure that the generations are set
4382  // correctly.
4383  prepare_string_dictionaries(phys_inputs);
4384  for (const auto& phys_input : phys_inputs) {
4385  const auto catalog =
4387  CHECK(catalog);
4388  const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4389  CHECK(cd);
4390  const auto& col_ti =
4391  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4392  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4393  const auto& dict_key = col_ti.getStringDictKey();
4394  const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4395  CHECK(dd && dd->stringDict);
4396  string_dictionary_generations.setGeneration(dict_key,
4397  dd->stringDict->storageEntryCount());
4398  }
4399  }
4400  return string_dictionary_generations;
4401 }
4402 
4404  const std::unordered_set<shared::TableKey>& phys_table_keys) {
4405  TableGenerations table_generations;
4406  for (const auto& table_key : phys_table_keys) {
4407  const auto table_info = getTableInfo(table_key);
4408  table_generations.setGeneration(
4409  table_key,
4410  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4411  }
4412  return table_generations;
4413 }
4414 
4415 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4416  const std::unordered_set<shared::TableKey>& phys_table_ids) {
4418  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
4419  row_set_mem_owner_->setDictionaryGenerations(
4420  computeStringDictionaryGenerations(phys_inputs));
4422  table_generations_ = computeTableGenerations(phys_table_ids);
4423 }
4424 
4426  return recycler_mutex_;
4427 }
4428 
4430  return query_plan_dag_cache_;
4431 }
4432 
4435 }
4436 
4438  return executor_session_mutex_;
4439 }
4440 
4443  return current_query_session_;
4444 }
4445 
4447  const QuerySessionId& candidate_query_session,
4449  // if current_query_session is equal to the candidate_query_session,
4450  // or it is empty session we consider
4451  return !candidate_query_session.empty() &&
4452  (current_query_session_ == candidate_query_session);
4453 }
4454 
4455 // used only for testing
4457  const QuerySessionId& candidate_query_session,
4459  if (queries_session_map_.count(candidate_query_session) &&
4460  !queries_session_map_.at(candidate_query_session).empty()) {
4461  return queries_session_map_.at(candidate_query_session)
4462  .begin()
4463  ->second.getQueryStatus();
4464  }
4465  return QuerySessionStatus::QueryStatus::UNDEFINED;
4466 }
4467 
4471 }
4472 
4474  const QuerySessionId& query_session_id,
4475  const std::string& query_str,
4476  const std::string& query_submitted_time) {
4477  if (!query_session_id.empty()) {
4478  // if session is valid, do update 1) the exact executor id and 2) query status
4481  query_session_id, query_submitted_time, executor_id_, write_lock);
4482  updateQuerySessionStatusWithLock(query_session_id,
4483  query_submitted_time,
4484  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4485  write_lock);
4486  }
4487  return {query_session_id, query_str};
4488 }
4489 
4491  // check whether we are okay to execute the "pending" query
4492  // i.e., before running the query check if this query session is "ALREADY" interrupted
4494  if (query_session.empty()) {
4495  return;
4496  }
4497  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
4498  // something goes wrong since we assume this is caller's responsibility
4499  // (call this function only for enrolled query session)
4500  if (!queries_session_map_.count(query_session)) {
4501  VLOG(1) << "Interrupting pending query is not available since the query session is "
4502  "not enrolled";
4503  } else {
4504  // here the query session is enrolled but the interrupt flag is not registered
4505  VLOG(1)
4506  << "Interrupting pending query is not available since its interrupt flag is "
4507  "not registered";
4508  }
4509  return;
4510  }
4511  if (queries_interrupt_flag_[query_session]) {
4513  }
4514 }
4515 
4517  const std::string& submitted_time_str) {
4519  // clear the interrupt-related info for a finished query
4520  if (query_session.empty()) {
4521  return;
4522  }
4523  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
4524  if (query_session.compare(current_query_session_) == 0) {
4525  invalidateRunningQuerySession(session_write_lock);
4526  resetInterrupt();
4527  }
4528 }
4529 
4531  const QuerySessionId& query_session,
4532  const std::string& submitted_time_str,
4533  const QuerySessionStatus::QueryStatus new_query_status) {
4534  // update the running query session's the current status
4536  if (query_session.empty()) {
4537  return;
4538  }
4539  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4540  current_query_session_ = query_session;
4541  }
4543  query_session, submitted_time_str, new_query_status, session_write_lock);
4544 }
4545 
4547  const QuerySessionId& query_session,
4548  const std::string& query_str,
4549  const std::string& submitted_time_str,
4550  const size_t executor_id,
4551  const QuerySessionStatus::QueryStatus query_session_status) {
4552  // enroll the query session into the Executor's session map
4554  if (query_session.empty()) {
4555  return;
4556  }
4557 
4558  addToQuerySessionList(query_session,
4559  query_str,
4560  submitted_time_str,
4561  executor_id,
4562  query_session_status,
4563  session_write_lock);
4564 
4565  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4566  current_query_session_ = query_session;
4567  }
4568 }
4569 
4572  return queries_session_map_.size();
4573 }
4574 
4576  const QuerySessionId& query_session,
4577  const std::string& query_str,
4578  const std::string& submitted_time_str,
4579  const size_t executor_id,
4580  const QuerySessionStatus::QueryStatus query_status,
4582  // an internal API that enrolls the query session into the Executor's session map
4583  if (queries_session_map_.count(query_session)) {
4584  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4585  queries_session_map_.at(query_session).erase(submitted_time_str);
4586  queries_session_map_.at(query_session)
4587  .emplace(submitted_time_str,
4588  QuerySessionStatus(query_session,
4589  executor_id,
4590  query_str,
4591  submitted_time_str,
4592  query_status));
4593  } else {
4594  queries_session_map_.at(query_session)
4595  .emplace(submitted_time_str,
4596  QuerySessionStatus(query_session,
4597  executor_id,
4598  query_str,
4599  submitted_time_str,
4600  query_status));
4601  }
4602  } else {
4603  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4604  executor_per_query_map.emplace(
4605  submitted_time_str,
4607  query_session, executor_id, query_str, submitted_time_str, query_status));
4608  queries_session_map_.emplace(query_session, executor_per_query_map);
4609  }
4610  return queries_interrupt_flag_.emplace(query_session, false).second;
4611 }
4612 
4614  const QuerySessionId& query_session,
4615  const std::string& submitted_time_str,
4616  const QuerySessionStatus::QueryStatus updated_query_status,
4618  // an internal API that updates query session status
4619  if (query_session.empty()) {
4620  return false;
4621  }
4622  if (queries_session_map_.count(query_session)) {
4623  for (auto& query_status : queries_session_map_.at(query_session)) {
4624  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4625  // no time difference --> found the target query status
4626  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4627  auto prev_status = query_status.second.getQueryStatus();
4628  if (prev_status == updated_query_status) {
4629  return false;
4630  }
4631  query_status.second.setQueryStatus(updated_query_status);
4632  return true;
4633  }
4634  }
4635  }
4636  return false;
4637 }
4638 
4640  const QuerySessionId& query_session,
4641  const std::string& submitted_time_str,
4642  const size_t executor_id,
4644  // update the executor id of the query session
4645  if (query_session.empty()) {
4646  return false;
4647  }
4648  if (queries_session_map_.count(query_session)) {
4649  auto storage = queries_session_map_.at(query_session);
4650  for (auto it = storage.begin(); it != storage.end(); it++) {
4651  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4652  // no time difference --> found the target query status
4653  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4654  queries_session_map_.at(query_session)
4655  .at(submitted_time_str)
4656  .setExecutorId(executor_id);
4657  return true;
4658  }
4659  }
4660  }
4661  return false;
4662 }
4663 
4665  const QuerySessionId& query_session,
4666  const std::string& submitted_time_str,
4668  if (query_session.empty()) {
4669  return false;
4670  }
4671  if (queries_session_map_.count(query_session)) {
4672  auto& storage = queries_session_map_.at(query_session);
4673  if (storage.size() > 1) {
4674  // in this case we only remove query executor info
4675  for (auto it = storage.begin(); it != storage.end(); it++) {
4676  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4677  // no time difference && have the same executor id--> found the target query
4678  if (it->second.getExecutorId() == executor_id_ &&
4679  submitted_time_str.compare(target_submitted_t_str) == 0) {
4680  storage.erase(it);
4681  return true;
4682  }
4683  }
4684  } else if (storage.size() == 1) {
4685  // here this session only has a single query executor
4686  // so we clear both executor info and its interrupt flag
4687  queries_session_map_.erase(query_session);
4688  queries_interrupt_flag_.erase(query_session);
4689  if (interrupted_.load()) {
4690  interrupted_.store(false);
4691  }
4692  return true;
4693  }
4694  }
4695  return false;
4696 }
4697 
4699  const QuerySessionId& query_session,
4701  if (query_session.empty()) {
4702  return;
4703  }
4704  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4705  queries_interrupt_flag_[query_session] = true;
4706  }
4707 }
4708 
4710  const QuerySessionId& query_session,
4712  if (query_session.empty()) {
4713  return false;
4714  }
4715  auto flag_it = queries_interrupt_flag_.find(query_session);
4716  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4717  flag_it->second;
4718 }
4719 
4721  const QuerySessionId& query_session,
4723  if (query_session.empty()) {
4724  return false;
4725  }
4726  return !query_session.empty() && queries_session_map_.count(query_session);
4727 }
4728 
4730  const double runtime_query_check_freq,
4731  const unsigned pending_query_check_freq) const {
4732  // The only one scenario that we intentionally call this function is
4733  // to allow runtime query interrupt in QueryRunner for test cases.
4734  // Because test machine's default setting does not allow runtime query interrupt,
4735  // so we have to turn it on within test code if necessary.
4737  g_pending_query_interrupt_freq = pending_query_check_freq;
4738  g_running_query_interrupt_freq = runtime_query_check_freq;
4741  }
4742 }
4743 
4744 void Executor::addToCardinalityCache(const std::string& cache_key,
4745  const size_t cache_value) {
4748  cardinality_cache_[cache_key] = cache_value;
4749  VLOG(1) << "Put estimated cardinality to the cache";
4750  }
4751 }
4752 
4756  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4757  VLOG(1) << "Reuse cached cardinality";
4758  return {true, cardinality_cache_[cache_key]};
4759  }
4760  return {false, -1};
4761 }
4762 
4763 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4764  const QuerySessionId& query_session,
4766  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4767  auto& query_infos = queries_session_map_.at(query_session);
4768  std::vector<QuerySessionStatus> ret;
4769  for (auto& info : query_infos) {
4770  ret.push_back(QuerySessionStatus(query_session,
4771  info.second.getExecutorId(),
4772  info.second.getQueryStr(),
4773  info.second.getQuerySubmittedTime(),
4774  info.second.getQueryStatus()));
4775  }
4776  return ret;
4777  }
4778  return {};
4779 }
4780 
4781 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
4782  const QuerySessionId& interrupt_session) const {
4783  std::vector<size_t> res;
4785  auto it = queries_session_map_.find(interrupt_session);
4786  if (it != queries_session_map_.end()) {
4787  for (auto& kv : it->second) {
4788  if (kv.second.getQueryStatus() ==
4789  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4790  res.push_back(kv.second.getExecutorId());
4791  }
4792  }
4793  }
4794  return res;
4795 }
4796 
4798  // this function should be called within an executor which is assigned
4799  // to the specific query thread (that indicates we already enroll the session)
4800  // check whether this is called from non unitary executor
4802  return false;
4803  };
4805  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
4806  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
4807  flag_it->second;
4808 }
4809 
4811  // this function is called under the recycler lock
4812  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
4813  latest_query_plan_extracted_ = query_plan_dag;
4814 }
4815 
4819 }
4820 
4821 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4822 
4823 // contain the interrupt flag's status per query session
4825 // contain a list of queries per query session
4827 // session lock
4829 
4832 
4836 
4838 std::mutex Executor::kernel_mutex_;
4839 
4842 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
4843 // Executor has a single global result set recycler holder
4844 // which contains two recyclers related to query resultset
4847 
4848 // Useful for debugging.
4849 std::string Executor::dumpCache() const {
4850  std::stringstream ss;
4851  ss << "colRangeCache: ";
4852  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
4853  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
4854  << "} = " << exp_range.toString() << ", ";
4855  }
4856  ss << "stringDictGenerations: ";
4857  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4858  ss << key << " = " << val << ", ";
4859  }
4860  ss << "tableGenerations: ";
4861  for (auto& [key, val] : table_generations_.asMap()) {
4862  ss << key << " = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
4863  }
4864  ss << "\n";
4865  return ss.str();
4866 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:224
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1972
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4349
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4729
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
Definition: Execute.cpp:4816
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2507
int32_t getNestLevel() const
std::vector< int > ChunkKey
Definition: types.h:36
double g_running_query_interrupt_freq
Definition: Execute.cpp:129
ExtModuleKinds
Definition: Execute.h:466
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::string get_cuda_libdevice_dir(void)
Definition: CudaMgr.cpp:494
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
Definition: Execute.h:1399
static QuerySessionMap queries_session_map_
Definition: Execute.h:1394
CudaMgr_Namespace::CudaMgr * cudaMgr() const
Definition: Execute.h:754
unsigned ceil_div(unsigned const dividend, unsigned const divisor)
Definition: misc.h:329
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4709
int64_t kernel_queue_time_ms_
Definition: Execute.h:1376
JoinType
Definition: sqldefs.h:165
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3847
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
bool useCudaBuffers() const
Definition: RenderInfo.cpp:53
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::DataMgr * data_mgr_
Definition: Execute.h:1372
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< shared::TableKey, const TableFragments * > &selected_tables_fragments, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2694
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
Definition: Execute.h:1377
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:83
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
Definition: Execute.cpp:1252
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
Definition: Execute.cpp:266
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:4490
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
Definition: Execute.cpp:582
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1616
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
Definition: sqltypes.h:66
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3709
std::unordered_map< shared::TableKey, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:43
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:261
heavyai::shared_lock< heavyai::shared_mutex > read_lock
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
Definition: Execute.cpp:3087
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1516
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:668
std::atomic< bool > interrupted_
Definition: Execute.h:1357
void setGeneration(const shared::StringDictKey &dict_key, const uint64_t generation)
std::map< shared::TableKey, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2806
static ResultSetRecyclerHolder resultset_recycler_holder_
Definition: Execute.h:1421
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3981
ExecutorDeviceType
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:613
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
double extract_max_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static const int max_gpu_count
Definition: Execute.h:1350
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:3282
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:128
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:154
#define LOG(tag)
Definition: Logger.h:285
void freeLinearizedBuf()
std::string QueryPlanDAG
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:761
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
bool is_fp() const
Definition: sqltypes.h:584
HOST DEVICE int get_scale() const
Definition: sqltypes.h:386
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:86
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:189
Definition: sqldefs.h:34
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:734
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:4530
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:434
Definition: sqldefs.h:35
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1439
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3930
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:723
#define UNREACHABLE()
Definition: Logger.h:337
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
Definition: Execute.cpp:195
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1570
ExpressionRange getColRange(const PhysicalInput &) const
debug_dir_(debug_dir)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
#define CHECK_GE(x, y)
Definition: Logger.h:306
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3102
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:1054
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:48
Definition: sqldefs.h:29
Functions to support geospatial operations used by the executor.
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:567
QuerySessionId current_query_session_
Definition: Execute.h:1390
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:416
heavyai::shared_mutex & getSessionLock()
Definition: Execute.cpp:4437
static const int32_t ERR_GEOS
Definition: Execute.h:1442
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1386
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1355
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1317
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:2221
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4546
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1465
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:102
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3294
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1354
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2641
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
Definition: Execute.cpp:4053
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:1039
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:638
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3883
void reset(bool discard_runtime_modules_only=false)
Definition: Execute.cpp:295
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1454
bool g_is_test_env
Definition: Execute.cpp:141
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3816
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key, const bool with_generation) const
Definition: Execute.h:526
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:585
#define CHECK_GT(x, y)
Definition: Logger.h:305
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:646
void resetGridSize()
Definition: Execute.cpp:3835
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4446
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:2132
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2411
std::vector< FragmentsPerTable > FragmentsList
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
bool is_time() const
Definition: sqltypes.h:586
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2874
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:88
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:410
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4613
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1741
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:102
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:4797
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:497
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:91
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4664
std::vector< Analyzer::Expr * > target_exprs_union
constexpr double a
Definition: Utm.h:32
bool g_enable_string_functions
std::unordered_map< shared::TableKey, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2482
static const size_t high_scan_limit
Definition: Execute.h:603
Definition: sqldefs.h:75
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:81
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:475
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3744
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2825
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
SQLOps get_optype() const
Definition: Analyzer.h:452
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1440
const ExecutorId executor_id_
Definition: Execute.h:1291
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:234
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4639
int8_t warpSize() const
Definition: Execute.cpp:3799
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:87
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:630
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:1368
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
Definition: Execute.cpp:1273
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1412
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:2317
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:656
int8_t groupColWidth(const size_t key_idx) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1428
void setGeneration(const shared::TableKey &table_key, const TableGeneration &generation)
static SysCatalog & instance()
Definition: SysCatalog.h:343
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
Classes representing a parse tree.
int getDeviceCount() const
Definition: