OmniSciDB  b28c0d5765
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
41 #include "Parser/ParserNode.h"
72 #include "Shared/checked_alloc.h"
73 #include "Shared/measure.h"
74 #include "Shared/misc.h"
75 #include "Shared/scope.h"
76 #include "Shared/shard_key.h"
77 #include "Shared/threading.h"
78 
79 bool g_enable_watchdog{false};
83 size_t g_cpu_sub_task_size{500'000};
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118  256}; // minimum memory allocation required for projection query output buffer
119  // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131  4096}; // GPU shared memory threshold (in bytes), if larger
132  // buffer sizes are required we do not use GPU shared
133  // memory optimizations Setting this to 0 means unlimited
134  // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136  true}; // enable use of shared memory when performing group-by with select non-count
137  // aggregates
138 bool g_enable_smem_non_grouped_agg{
139  true}; // enable optimizations for using GPU shared memory in implementation of
140  // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142  // limits the allocation for the output buffer arena
143  // and data recycler test
144 size_t g_enable_parallel_linearization{
145  10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
157 bool g_optimize_cuda_block_and_grid_sizes{false};
158 
159 size_t g_approx_quantile_buffer{1000};
160 size_t g_approx_quantile_centroids{300};
161 
162 bool g_enable_automatic_ir_metadata{true};
163 
164 size_t g_max_log_length{500};
165 
166 extern bool g_cache_string_hash;
167 
168 int const Executor::max_gpu_count;
169 
170 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
171 
172 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
173 
174 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
175  const std::string& udf_ir_filename,
176  llvm::LLVMContext& ctx);
177 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
178  const std::string& udf_ir_filename,
179  llvm::LLVMContext& ctx,
180  bool is_gpu = false);
181 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
182  const std::string& udf_ir_string,
183  llvm::LLVMContext& ctx,
184  bool is_gpu = false);
185 
186 namespace {
187 // This function is notably different from that in RelAlgExecutor because it already
188 // expects SPI values and therefore needs to avoid that transformation.
189 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs,
190  const Catalog_Namespace::Catalog& catalog) {
191  for (const auto [col_id, table_id] : phys_inputs) {
192  foreign_storage::populate_string_dictionary(table_id, col_id, catalog);
193  }
194 }
195 
196 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
197  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
198  // The fragmenter always returns at least one fragment, even when the table is empty.
199  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
200 }
201 } // namespace
202 
203 namespace foreign_storage {
204 // Foreign tables skip the population of dictionaries during metadata scan. This function
205 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
206 void populate_string_dictionary(const int32_t table_id,
207  const int32_t col_id,
208  const Catalog_Namespace::Catalog& catalog) {
209  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
210  catalog.getMetadataForTable(table_id, false))) {
211  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
212  if (col_desc->columnType.is_dict_encoded_type()) {
213  auto& fragmenter = foreign_table->fragmenter;
214  CHECK(fragmenter != nullptr);
215  if (is_empty_table(fragmenter.get())) {
216  return;
217  }
218  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
219  ChunkKey chunk_key = {catalog.getDatabaseId(), table_id, col_id, frag.fragmentId};
220  // If the key is sharded across leaves, only populate fragments that are sharded
221  // to this leaf.
222  if (key_does_not_shard_to_leaf(chunk_key)) {
223  continue;
224  }
225 
226  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
227  CHECK(metadata_map.find(col_id) != metadata_map.end());
228  if (auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
229  // When this goes out of scope it will stay in CPU cache but become
230  // evictable
231  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
232  &(catalog.getDataMgr()),
233  chunk_key,
235  0,
236  0,
237  0);
238  }
239  }
240  }
241  }
242 }
243 } // namespace foreign_storage
244 
245 Executor::Executor(const ExecutorId executor_id,
246  Data_Namespace::DataMgr* data_mgr,
247  const size_t block_size_x,
248  const size_t grid_size_x,
249  const size_t max_gpu_slab_size,
250  const std::string& debug_dir,
251  const std::string& debug_file)
252  : executor_id_(executor_id)
253  , context_(new llvm::LLVMContext())
254  , cgen_state_(new CgenState({}, false, this))
255  , block_size_x_(block_size_x)
256  , grid_size_x_(grid_size_x)
257  , max_gpu_slab_size_(max_gpu_slab_size)
258  , debug_dir_(debug_dir)
259  , debug_file_(debug_file)
260  , catalog_(nullptr)
261  , data_mgr_(data_mgr)
262  , temporary_tables_(nullptr)
266  update_extension_modules();
267 }
268 
273  auto root_path = heavyai::get_root_abs_path();
274  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
275  CHECK(boost::filesystem::exists(template_path));
277  template_path;
278 #ifdef ENABLE_GEOS
279  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
280  CHECK(boost::filesystem::exists(rt_geos_path));
282  rt_geos_path;
283 #endif
284 #ifdef HAVE_CUDA
285  auto rt_libdevice_path = get_cuda_libdevice_dir() + "/libdevice.10.bc";
286  if (boost::filesystem::exists(rt_libdevice_path)) {
288  rt_libdevice_path;
289  } else {
290  LOG(WARNING) << "File " << rt_libdevice_path
291  << " does not exist; support for some UDF "
292  "functions might not be available.";
293  }
294 #endif
295  }
296 }
297 
298 void Executor::reset(bool discard_runtime_modules_only) {
299  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
300  auto qe = QueryEngine::getInstance();
301  qe->s_code_accessor->clear();
302  qe->s_stubs_accessor->clear();
303  qe->cpu_code_accessor->clear();
304  qe->gpu_code_accessor->clear();
305  qe->tf_code_accessor->clear();
306 
307  if (discard_runtime_modules_only) {
308  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
309 #ifdef HAVE_CUDA
310  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
311 #endif
312  cgen_state_->module_ = nullptr;
313  } else {
314  extension_modules_.clear();
315  cgen_state_.reset();
316  context_.reset(new llvm::LLVMContext());
317  cgen_state_.reset(new CgenState({}, false, this));
318  }
319 }
320 
321 void Executor::update_extension_modules(bool update_runtime_modules_only) {
322  auto read_module = [&](Executor::ExtModuleKinds module_kind,
323  const std::string& source) {
324  /*
325  source can be either a filename of a LLVM IR
326  or LLVM BC source, or a string containing
327  LLVM IR code.
328  */
329  CHECK(!source.empty());
330  switch (module_kind) {
334  return read_llvm_module_from_bc_file(source, getContext());
335  }
337  return read_llvm_module_from_ir_file(source, getContext(), false);
338  }
340  return read_llvm_module_from_ir_file(source, getContext(), true);
341  }
343  return read_llvm_module_from_ir_string(source, getContext(), false);
344  }
346  return read_llvm_module_from_ir_string(source, getContext(), true);
347  }
348  default: {
349  UNREACHABLE();
350  return std::unique_ptr<llvm::Module>();
351  }
352  }
353  };
354  auto update_module = [&](Executor::ExtModuleKinds module_kind,
355  bool erase_not_found = false) {
356  auto it = Executor::extension_module_sources.find(module_kind);
357  if (it != Executor::extension_module_sources.end()) {
358  auto llvm_module = read_module(module_kind, it->second);
359  if (llvm_module) {
360  extension_modules_[module_kind] = std::move(llvm_module);
361  } else if (erase_not_found) {
362  extension_modules_.erase(module_kind);
363  } else {
364  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
365  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
366  << " LLVM module. The module will be unavailable.";
367  } else {
368  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
369  << " LLVM module. Using the existing module.";
370  }
371  }
372  } else {
373  if (erase_not_found) {
374  extension_modules_.erase(module_kind);
375  } else {
376  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
377  LOG(WARNING) << "Source of " << ::toString(module_kind)
378  << " LLVM module is unavailable. The module will be unavailable.";
379  } else {
380  LOG(WARNING) << "Source of " << ::toString(module_kind)
381  << " LLVM module is unavailable. Using the existing module.";
382  }
383  }
384  }
385  };
386 
387  if (!update_runtime_modules_only) {
388  // required compile-time modules, their requirements are enforced
389  // by Executor::initialize_extension_module_sources():
391 #ifdef ENABLE_GEOS
393 #endif
394  // load-time modules, these are optional:
395  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
396 #ifdef HAVE_CUDA
397  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
399 #endif
400  }
401  // run-time modules, these are optional and erasable:
402  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
403 #ifdef HAVE_CUDA
404  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
405 #endif
406 }
407 
408 // Used by StubGenerator::generateStub
410  : executor_(executor)
411  , lock_queue_clock_(timer_start())
412  , lock_(executor_.compilation_mutex_)
413  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
414 {
415  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
416  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
417 }
418 
420  Executor& executor,
421  const bool allow_lazy_fetch,
422  const std::vector<InputTableInfo>& query_infos,
423  const PlanState::DeletedColumnsMap& deleted_cols_map,
424  const RelAlgExecutionUnit* ra_exe_unit)
425  : executor_(executor)
426  , lock_queue_clock_(timer_start())
427  , lock_(executor_.compilation_mutex_)
428  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
429 {
430  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
431  // nukeOldState creates new CgenState and PlanState instances for
432  // the subsequent code generation. It also resets
433  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
434  // not currently restore.. should we accumulate these timings?
435  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
436 }
437 
439  // prevent memory leak from hoisted literals
440  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
441  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
442  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
443  // The llvm::Value instance stored in p.first is created by the
444  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
445  p.first->deleteValue();
446  }
447  }
448  executor_.cgen_state_->row_func_hoisted_literals_.clear();
449 
450  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
451  // to the old CgenState instance as the execution of the generated
452  // code uses these bitmaps
453 
454  for (auto& str_dict_translation_mgr :
455  executor_.cgen_state_->str_dict_translation_mgrs_) {
456  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
457  }
458  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
459 
460  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
461  cgen_state_->moveInValuesBitmap(bm);
462  }
463  executor_.cgen_state_->in_values_bitmaps_.clear();
464 
465  // Delete worker module that may have been set by
466  // set_module_shallow_copy. If QueryMustRunOnCpu is thrown, the
467  // worker module is not instantiated, so the worker module needs to
468  // be deleted conditionally [see "Managing LLVM modules" comment in
469  // CgenState.h]:
470  if (executor_.cgen_state_->module_) {
471  delete executor_.cgen_state_->module_;
472  }
473 
474  // restore the old CgenState instance
475  executor_.cgen_state_.reset(cgen_state_.release());
476 }
477 
478 std::shared_ptr<Executor> Executor::getExecutor(
479  const ExecutorId executor_id,
480  const std::string& debug_dir,
481  const std::string& debug_file,
482  const SystemParameters& system_parameters) {
484  auto it = executors_.find(executor_id);
485  if (it != executors_.end()) {
486  return it->second;
487  }
489  auto executor = std::make_shared<Executor>(executor_id,
490  &data_mgr,
491  system_parameters.cuda_block_size,
492  system_parameters.cuda_grid_size,
493  system_parameters.max_gpu_slab_size,
494  debug_dir,
495  debug_file);
496  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
497  return executor;
498 }
499 
501  switch (memory_level) {
505  execute_mutex_); // Don't flush memory while queries are running
506 
507  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
508  // The hash table cache uses CPU memory not managed by the buffer manager. In the
509  // future, we should manage these allocations with the buffer manager directly.
510  // For now, assume the user wants to purge the hash table cache when they clear
511  // CPU memory (currently used in ExecuteTest to lower memory pressure)
513  }
516  break;
517  }
518  default: {
519  throw std::runtime_error(
520  "Clearing memory levels other than the CPU level or GPU level is not "
521  "supported.");
522  }
523  }
524 }
525 
527  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
528 }
529 
531  const int dict_id_in,
532  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
533  const bool with_generation) const {
534  CHECK(row_set_mem_owner);
535  std::lock_guard<std::mutex> lock(
536  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
537  return row_set_mem_owner->getOrAddStringDictProxy(
538  dict_id_in, with_generation, catalog_);
539 }
540 
542  const int dict_id_in,
543  const bool with_generation,
544  const Catalog_Namespace::Catalog* catalog) {
545  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
546  CHECK(catalog);
547  const auto dd = catalog->getMetadataForDict(dict_id);
548  if (dd) {
549  CHECK(dd->stringDict);
550  CHECK_LE(dd->dictNBits, 32);
551  const int64_t generation =
552  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
553  return addStringDict(dd->stringDict, dict_id, generation);
554  }
556  if (!lit_str_dict_proxy_) {
557  DictRef literal_dict_ref(catalog->getDatabaseId(), DictRef::literalsDictId);
558  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
559  literal_dict_ref, "", false, true, g_cache_string_hash);
560  lit_str_dict_proxy_ =
561  std::make_shared<StringDictionaryProxy>(tsd, literal_dict_ref.dictId, 0);
562  }
563  return lit_str_dict_proxy_.get();
564 }
565 
567  const int source_dict_id,
568  const int dest_dict_id,
569  const RowSetMemoryOwner::StringTranslationType translation_type,
570  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
571  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
572  const bool with_generation) const {
573  CHECK(row_set_mem_owner);
574  std::lock_guard<std::mutex> lock(
575  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
576  return row_set_mem_owner->getOrAddStringProxyTranslationMap(source_dict_id,
577  dest_dict_id,
578  with_generation,
579  translation_type,
580  string_op_infos,
581  catalog_);
582 }
583 
586  const StringDictionaryProxy* source_proxy,
587  StringDictionaryProxy* dest_proxy,
588  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
589  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
590  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
591  CHECK(row_set_mem_owner);
592  std::lock_guard<std::mutex> lock(
593  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
594  // First translate lhs onto itself if there are string ops
595  if (!dest_string_op_infos.empty()) {
596  row_set_mem_owner->addStringProxyUnionTranslationMap(
597  dest_proxy, dest_proxy, dest_string_op_infos);
598  }
599  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
600  source_proxy, dest_proxy, source_string_op_infos);
601 }
602 
605  const int source_dict_id,
606  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
607  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
608  const bool with_generation) const {
609  CHECK(row_set_mem_owner);
610  std::lock_guard<std::mutex> lock(
611  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
612  return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
613  source_dict_id, with_generation, string_op_infos, catalog_);
614 }
615 
617  const int source_dict_id_in,
618  const int dest_dict_id_in,
619  const bool with_generation,
620  const RowSetMemoryOwner::StringTranslationType translation_type,
621  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
622  const Catalog_Namespace::Catalog* catalog) {
623  const auto source_proxy =
624  getOrAddStringDictProxy(source_dict_id_in, with_generation, catalog);
625  auto dest_proxy = getOrAddStringDictProxy(dest_dict_id_in, with_generation, catalog);
627  return addStringProxyIntersectionTranslationMap(
628  source_proxy, dest_proxy, string_op_infos);
629  } else {
630  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
631  }
632 }
633 
636  const int source_dict_id_in,
637  const bool with_generation,
638  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
639  const Catalog_Namespace::Catalog* catalog) {
640  const auto source_proxy =
641  getOrAddStringDictProxy(source_dict_id_in, with_generation, catalog);
642  return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
643 }
644 
646  std::lock_guard<std::mutex> lock(state_mutex_);
647  return t_digests_
648  .emplace_back(std::make_unique<quantile::TDigest>(
650  .get();
651 }
652 
653 bool Executor::isCPUOnly() const {
654  CHECK(data_mgr_);
655  return !data_mgr_->getCudaMgr();
656 }
657 
659  const Analyzer::ColumnVar* col_var) const {
661  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
662 }
663 
665  const Analyzer::ColumnVar* col_var,
666  int n) const {
667  const auto cd = getColumnDescriptor(col_var);
668  if (!cd || n > cd->columnType.get_physical_cols()) {
669  return nullptr;
670  }
672  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
673 }
674 
676  return catalog_;
677 }
678 
680  catalog_ = catalog;
681 }
682 
683 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
684  return row_set_mem_owner_;
685 }
686 
688  return temporary_tables_;
689 }
690 
692  return input_table_info_cache_.getTableInfo(table_id);
693 }
694 
695 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
696  return table_generations_.getGeneration(table_id);
697 }
698 
700  return agg_col_range_cache_.getColRange(phys_input);
701 }
702 
703 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
704  size_t num_bytes = 0;
705  if (!plan_state_) {
706  return 0;
707  }
708  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
709  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
710  continue;
711  }
712 
713  if (fetched_col_pair.first < 0) {
714  num_bytes += 8;
715  } else {
716  const auto cd =
717  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
718  const auto& ti = cd->columnType;
719  const auto sz = ti.get_size();
720  if (sz < 0) {
721  // for varlen types, only account for the pointer/size for each row, for now
722  if (!ti.is_logical_geo_type()) {
723  // Don't count size for logical geo types, as they are
724  // backed by physical columns
725  num_bytes += 16;
726  }
727  } else {
728  num_bytes += sz;
729  }
730  }
731  }
732  return num_bytes;
733 }
734 
736  const std::vector<Analyzer::Expr*>& target_exprs) const {
738  for (const auto target_expr : target_exprs) {
739  if (plan_state_->isLazyFetchColumn(target_expr)) {
740  return true;
741  }
742  }
743  return false;
744 }
745 
746 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
747  const std::vector<Analyzer::Expr*>& target_exprs) const {
749  CHECK(catalog_);
750  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
751  for (const auto target_expr : target_exprs) {
752  if (!plan_state_->isLazyFetchColumn(target_expr)) {
753  col_lazy_fetch_info.emplace_back(
754  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
755  } else {
756  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
757  CHECK(col_var);
758  auto col_id = col_var->get_column_id();
759  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
760  auto cd = (col_var->get_table_id() > 0)
761  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
762  : nullptr;
763  if (cd && IS_GEO(cd->columnType.get_type())) {
764  // Geo coords cols will be processed in sequence. So we only need to track the
765  // first coords col in lazy fetch info.
766  {
767  auto cd0 =
768  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
769  auto col0_ti = cd0->columnType;
770  CHECK(!cd0->isVirtualCol);
771  auto col0_var = makeExpr<Analyzer::ColumnVar>(
772  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
773  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
774  col_lazy_fetch_info.emplace_back(
775  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
776  }
777  } else {
778  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
779  const auto& col_ti = col_var->get_type_info();
780  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
781  }
782  }
783  }
784  return col_lazy_fetch_info;
785 }
786 
791 }
792 
793 std::vector<int8_t> Executor::serializeLiterals(
794  const std::unordered_map<int, CgenState::LiteralValues>& literals,
795  const int device_id) {
796  if (literals.empty()) {
797  return {};
798  }
799  const auto dev_literals_it = literals.find(device_id);
800  CHECK(dev_literals_it != literals.end());
801  const auto& dev_literals = dev_literals_it->second;
802  size_t lit_buf_size{0};
803  std::vector<std::string> real_strings;
804  std::vector<std::vector<double>> double_array_literals;
805  std::vector<std::vector<int8_t>> align64_int8_array_literals;
806  std::vector<std::vector<int32_t>> int32_array_literals;
807  std::vector<std::vector<int8_t>> align32_int8_array_literals;
808  std::vector<std::vector<int8_t>> int8_array_literals;
809  for (const auto& lit : dev_literals) {
810  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
811  if (lit.which() == 7) {
812  const auto p = boost::get<std::string>(&lit);
813  CHECK(p);
814  real_strings.push_back(*p);
815  } else if (lit.which() == 8) {
816  const auto p = boost::get<std::vector<double>>(&lit);
817  CHECK(p);
818  double_array_literals.push_back(*p);
819  } else if (lit.which() == 9) {
820  const auto p = boost::get<std::vector<int32_t>>(&lit);
821  CHECK(p);
822  int32_array_literals.push_back(*p);
823  } else if (lit.which() == 10) {
824  const auto p = boost::get<std::vector<int8_t>>(&lit);
825  CHECK(p);
826  int8_array_literals.push_back(*p);
827  } else if (lit.which() == 11) {
828  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
829  CHECK(p);
830  if (p->second == 64) {
831  align64_int8_array_literals.push_back(p->first);
832  } else if (p->second == 32) {
833  align32_int8_array_literals.push_back(p->first);
834  } else {
835  CHECK(false);
836  }
837  }
838  }
839  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
840  throw TooManyLiterals();
841  }
842  int16_t crt_real_str_off = lit_buf_size;
843  for (const auto& real_str : real_strings) {
844  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
845  lit_buf_size += real_str.size();
846  }
847  if (double_array_literals.size() > 0) {
848  lit_buf_size = align(lit_buf_size, sizeof(double));
849  }
850  int16_t crt_double_arr_lit_off = lit_buf_size;
851  for (const auto& double_array_literal : double_array_literals) {
852  CHECK_LE(double_array_literal.size(),
853  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
854  lit_buf_size += double_array_literal.size() * sizeof(double);
855  }
856  if (align64_int8_array_literals.size() > 0) {
857  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
858  }
859  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
860  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
861  CHECK_LE(align64_int8_array_literals.size(),
862  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
863  lit_buf_size += align64_int8_array_literal.size();
864  }
865  if (int32_array_literals.size() > 0) {
866  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
867  }
868  int16_t crt_int32_arr_lit_off = lit_buf_size;
869  for (const auto& int32_array_literal : int32_array_literals) {
870  CHECK_LE(int32_array_literal.size(),
871  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
872  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
873  }
874  if (align32_int8_array_literals.size() > 0) {
875  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
876  }
877  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
878  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
879  CHECK_LE(align32_int8_array_literals.size(),
880  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
881  lit_buf_size += align32_int8_array_literal.size();
882  }
883  int16_t crt_int8_arr_lit_off = lit_buf_size;
884  for (const auto& int8_array_literal : int8_array_literals) {
885  CHECK_LE(int8_array_literal.size(),
886  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
887  lit_buf_size += int8_array_literal.size();
888  }
889  unsigned crt_real_str_idx = 0;
890  unsigned crt_double_arr_lit_idx = 0;
891  unsigned crt_align64_int8_arr_lit_idx = 0;
892  unsigned crt_int32_arr_lit_idx = 0;
893  unsigned crt_align32_int8_arr_lit_idx = 0;
894  unsigned crt_int8_arr_lit_idx = 0;
895  std::vector<int8_t> serialized(lit_buf_size);
896  size_t off{0};
897  for (const auto& lit : dev_literals) {
898  const auto lit_bytes = CgenState::literalBytes(lit);
899  off = CgenState::addAligned(off, lit_bytes);
900  switch (lit.which()) {
901  case 0: {
902  const auto p = boost::get<int8_t>(&lit);
903  CHECK(p);
904  serialized[off - lit_bytes] = *p;
905  break;
906  }
907  case 1: {
908  const auto p = boost::get<int16_t>(&lit);
909  CHECK(p);
910  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
911  break;
912  }
913  case 2: {
914  const auto p = boost::get<int32_t>(&lit);
915  CHECK(p);
916  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
917  break;
918  }
919  case 3: {
920  const auto p = boost::get<int64_t>(&lit);
921  CHECK(p);
922  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
923  break;
924  }
925  case 4: {
926  const auto p = boost::get<float>(&lit);
927  CHECK(p);
928  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
929  break;
930  }
931  case 5: {
932  const auto p = boost::get<double>(&lit);
933  CHECK(p);
934  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
935  break;
936  }
937  case 6: {
938  const auto p = boost::get<std::pair<std::string, int>>(&lit);
939  CHECK(p);
940  const auto str_id =
942  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
943  ->getOrAddTransient(p->first)
944  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
945  ->getIdOfString(p->first);
946  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
947  break;
948  }
949  case 7: {
950  const auto p = boost::get<std::string>(&lit);
951  CHECK(p);
952  int32_t off_and_len = crt_real_str_off << 16;
953  const auto& crt_real_str = real_strings[crt_real_str_idx];
954  off_and_len |= static_cast<int16_t>(crt_real_str.size());
955  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
956  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
957  ++crt_real_str_idx;
958  crt_real_str_off += crt_real_str.size();
959  break;
960  }
961  case 8: {
962  const auto p = boost::get<std::vector<double>>(&lit);
963  CHECK(p);
964  int32_t off_and_len = crt_double_arr_lit_off << 16;
965  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
966  int32_t len = crt_double_arr_lit.size();
967  CHECK_EQ((len >> 16), 0);
968  off_and_len |= static_cast<int16_t>(len);
969  int32_t double_array_bytesize = len * sizeof(double);
970  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
971  memcpy(&serialized[crt_double_arr_lit_off],
972  crt_double_arr_lit.data(),
973  double_array_bytesize);
974  ++crt_double_arr_lit_idx;
975  crt_double_arr_lit_off += double_array_bytesize;
976  break;
977  }
978  case 9: {
979  const auto p = boost::get<std::vector<int32_t>>(&lit);
980  CHECK(p);
981  int32_t off_and_len = crt_int32_arr_lit_off << 16;
982  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
983  int32_t len = crt_int32_arr_lit.size();
984  CHECK_EQ((len >> 16), 0);
985  off_and_len |= static_cast<int16_t>(len);
986  int32_t int32_array_bytesize = len * sizeof(int32_t);
987  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
988  memcpy(&serialized[crt_int32_arr_lit_off],
989  crt_int32_arr_lit.data(),
990  int32_array_bytesize);
991  ++crt_int32_arr_lit_idx;
992  crt_int32_arr_lit_off += int32_array_bytesize;
993  break;
994  }
995  case 10: {
996  const auto p = boost::get<std::vector<int8_t>>(&lit);
997  CHECK(p);
998  int32_t off_and_len = crt_int8_arr_lit_off << 16;
999  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
1000  int32_t len = crt_int8_arr_lit.size();
1001  CHECK_EQ((len >> 16), 0);
1002  off_and_len |= static_cast<int16_t>(len);
1003  int32_t int8_array_bytesize = len;
1004  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1005  memcpy(&serialized[crt_int8_arr_lit_off],
1006  crt_int8_arr_lit.data(),
1007  int8_array_bytesize);
1008  ++crt_int8_arr_lit_idx;
1009  crt_int8_arr_lit_off += int8_array_bytesize;
1010  break;
1011  }
1012  case 11: {
1013  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
1014  CHECK(p);
1015  if (p->second == 64) {
1016  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1017  const auto& crt_align64_int8_arr_lit =
1018  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1019  int32_t len = crt_align64_int8_arr_lit.size();
1020  CHECK_EQ((len >> 16), 0);
1021  off_and_len |= static_cast<int16_t>(len);
1022  int32_t align64_int8_array_bytesize = len;
1023  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1024  memcpy(&serialized[crt_align64_int8_arr_lit_off],
1025  crt_align64_int8_arr_lit.data(),
1026  align64_int8_array_bytesize);
1027  ++crt_align64_int8_arr_lit_idx;
1028  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1029  } else if (p->second == 32) {
1030  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1031  const auto& crt_align32_int8_arr_lit =
1032  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1033  int32_t len = crt_align32_int8_arr_lit.size();
1034  CHECK_EQ((len >> 16), 0);
1035  off_and_len |= static_cast<int16_t>(len);
1036  int32_t align32_int8_array_bytesize = len;
1037  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1038  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1039  crt_align32_int8_arr_lit.data(),
1040  align32_int8_array_bytesize);
1041  ++crt_align32_int8_arr_lit_idx;
1042  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1043  } else {
1044  CHECK(false);
1045  }
1046  break;
1047  }
1048  default:
1049  CHECK(false);
1050  }
1051  }
1052  return serialized;
1053 }
1054 
1055 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1056  if (device_type == ExecutorDeviceType::GPU) {
1057  return cudaMgr()->getDeviceCount();
1058  } else {
1059  return 1;
1060  }
1061 }
1062 
1064  const Data_Namespace::MemoryLevel memory_level) const {
1065  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1067 }
1068 
1069 // TODO(alex): remove or split
1070 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1071  const SQLTypeInfo& ti,
1072  const int64_t agg_init_val,
1073  const int8_t out_byte_width,
1074  const int64_t* out_vec,
1075  const size_t out_vec_sz,
1076  const bool is_group_by,
1077  const bool float_argument_input) {
1078  switch (agg) {
1079  case kAVG:
1080  case kSUM:
1081  if (0 != agg_init_val) {
1082  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1083  int64_t agg_result = agg_init_val;
1084  for (size_t i = 0; i < out_vec_sz; ++i) {
1085  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1086  }
1087  return {agg_result, 0};
1088  } else {
1089  CHECK(ti.is_fp());
1090  switch (out_byte_width) {
1091  case 4: {
1092  int agg_result = static_cast<int32_t>(agg_init_val);
1093  for (size_t i = 0; i < out_vec_sz; ++i) {
1095  &agg_result,
1096  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1097  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1098  }
1099  const int64_t converted_bin =
1100  float_argument_input
1101  ? static_cast<int64_t>(agg_result)
1102  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1103  return {converted_bin, 0};
1104  break;
1105  }
1106  case 8: {
1107  int64_t agg_result = agg_init_val;
1108  for (size_t i = 0; i < out_vec_sz; ++i) {
1110  &agg_result,
1111  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1112  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1113  }
1114  return {agg_result, 0};
1115  break;
1116  }
1117  default:
1118  CHECK(false);
1119  }
1120  }
1121  }
1122  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1123  int64_t agg_result = 0;
1124  for (size_t i = 0; i < out_vec_sz; ++i) {
1125  agg_result += out_vec[i];
1126  }
1127  return {agg_result, 0};
1128  } else {
1129  CHECK(ti.is_fp());
1130  switch (out_byte_width) {
1131  case 4: {
1132  float r = 0.;
1133  for (size_t i = 0; i < out_vec_sz; ++i) {
1134  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1135  }
1136  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1137  const int64_t converted_bin =
1138  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1139  return {converted_bin, 0};
1140  }
1141  case 8: {
1142  double r = 0.;
1143  for (size_t i = 0; i < out_vec_sz; ++i) {
1144  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1145  }
1146  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1147  }
1148  default:
1149  CHECK(false);
1150  }
1151  }
1152  break;
1153  case kCOUNT:
1154  case kCOUNT_IF: {
1155  uint64_t agg_result = 0;
1156  for (size_t i = 0; i < out_vec_sz; ++i) {
1157  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1158  agg_result += out;
1159  }
1160  return {static_cast<int64_t>(agg_result), 0};
1161  }
1162  case kMIN: {
1163  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1164  int64_t agg_result = agg_init_val;
1165  for (size_t i = 0; i < out_vec_sz; ++i) {
1166  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1167  }
1168  return {agg_result, 0};
1169  } else {
1170  switch (out_byte_width) {
1171  case 4: {
1172  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1173  for (size_t i = 0; i < out_vec_sz; ++i) {
1175  &agg_result,
1176  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1177  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1178  }
1179  const int64_t converted_bin =
1180  float_argument_input
1181  ? static_cast<int64_t>(agg_result)
1182  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1183  return {converted_bin, 0};
1184  }
1185  case 8: {
1186  int64_t agg_result = agg_init_val;
1187  for (size_t i = 0; i < out_vec_sz; ++i) {
1189  &agg_result,
1190  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1191  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1192  }
1193  return {agg_result, 0};
1194  }
1195  default:
1196  CHECK(false);
1197  }
1198  }
1199  }
1200  case kMAX:
1201  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1202  int64_t agg_result = agg_init_val;
1203  for (size_t i = 0; i < out_vec_sz; ++i) {
1204  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1205  }
1206  return {agg_result, 0};
1207  } else {
1208  switch (out_byte_width) {
1209  case 4: {
1210  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1211  for (size_t i = 0; i < out_vec_sz; ++i) {
1213  &agg_result,
1214  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1215  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1216  }
1217  const int64_t converted_bin =
1218  float_argument_input ? static_cast<int64_t>(agg_result)
1219  : float_to_double_bin(agg_result, !ti.get_notnull());
1220  return {converted_bin, 0};
1221  }
1222  case 8: {
1223  int64_t agg_result = agg_init_val;
1224  for (size_t i = 0; i < out_vec_sz; ++i) {
1226  &agg_result,
1227  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1228  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1229  }
1230  return {agg_result, 0};
1231  }
1232  default:
1233  CHECK(false);
1234  }
1235  }
1236  case kSINGLE_VALUE: {
1237  int64_t agg_result = agg_init_val;
1238  for (size_t i = 0; i < out_vec_sz; ++i) {
1239  if (out_vec[i] != agg_init_val) {
1240  if (agg_result == agg_init_val) {
1241  agg_result = out_vec[i];
1242  } else if (out_vec[i] != agg_result) {
1244  }
1245  }
1246  }
1247  return {agg_result, 0};
1248  }
1249  case kSAMPLE: {
1250  int64_t agg_result = agg_init_val;
1251  for (size_t i = 0; i < out_vec_sz; ++i) {
1252  if (out_vec[i] != agg_init_val) {
1253  agg_result = out_vec[i];
1254  break;
1255  }
1256  }
1257  return {agg_result, 0};
1258  }
1259  default:
1260  UNREACHABLE() << "Unsupported SQLAgg: " << agg;
1261  }
1262  abort();
1263 }
1264 
1265 namespace {
1266 
1268  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1269  std::vector<TargetInfo> const& targets) {
1270  auto& first = results_per_device.front().first;
1271  CHECK(first);
1272  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1273  if (first_target_idx) {
1274  first->translateDictEncodedColumns(targets, *first_target_idx);
1275  }
1276  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1277  const auto& next = results_per_device[dev_idx].first;
1278  CHECK(next);
1279  if (first_target_idx) {
1280  next->translateDictEncodedColumns(targets, *first_target_idx);
1281  }
1282  first->append(*next);
1283  }
1284  return std::move(first);
1285 }
1286 
1288  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1289  return get_target_info(target_expr, g_bigint_count);
1290  }
1291 };
1292 
1293 } // namespace
1294 
1296  const RelAlgExecutionUnit& ra_exe_unit) {
1297  auto timer = DEBUG_TIMER(__func__);
1298  auto& results_per_device = shared_context.getFragmentResults();
1299  auto const targets = shared::transform<std::vector<TargetInfo>>(
1300  ra_exe_unit.target_exprs, GetTargetInfo{});
1301  if (results_per_device.empty()) {
1302  return std::make_shared<ResultSet>(targets,
1306  catalog_,
1307  blockSize(),
1308  gridSize());
1309  }
1310  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1311  std::sort(results_per_device.begin(),
1312  results_per_device.end(),
1313  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1314  CHECK_GE(lhs.second.size(), size_t(1));
1315  CHECK_GE(rhs.second.size(), size_t(1));
1316  return lhs.second.front() < rhs.second.front();
1317  });
1318 
1319  return get_merged_result(results_per_device, targets);
1320 }
1321 
1323  const RelAlgExecutionUnit& ra_exe_unit,
1324  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1325  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1326  const QueryMemoryDescriptor& query_mem_desc) const {
1327  auto timer = DEBUG_TIMER(__func__);
1328  if (ra_exe_unit.estimator) {
1329  return reduce_estimator_results(ra_exe_unit, results_per_device);
1330  }
1331 
1332  if (results_per_device.empty()) {
1333  auto const targets = shared::transform<std::vector<TargetInfo>>(
1334  ra_exe_unit.target_exprs, GetTargetInfo{});
1335  return std::make_shared<ResultSet>(targets,
1338  nullptr,
1339  catalog_,
1340  blockSize(),
1341  gridSize());
1342  }
1343 
1345  results_per_device,
1346  row_set_mem_owner,
1347  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1348 }
1349 
1350 namespace {
1351 
1353  const size_t executor_id,
1354  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1355  int64_t* compilation_queue_time) {
1356  auto clock_begin = timer_start();
1357  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1358  *compilation_queue_time = timer_stop(clock_begin);
1359  const auto& this_result_set = results_per_device[0].first;
1360  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1361  this_result_set->getTargetInfos(),
1362  this_result_set->getTargetInitVals(),
1363  executor_id);
1364  return reduction_jit.codegen();
1365 };
1366 
1367 } // namespace
1368 
1370  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1371  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1372  const QueryMemoryDescriptor& query_mem_desc) const {
1373  auto timer = DEBUG_TIMER(__func__);
1374  std::shared_ptr<ResultSet> reduced_results;
1375 
1376  const auto& first = results_per_device.front().first;
1377 
1378  if (query_mem_desc.getQueryDescriptionType() ==
1380  results_per_device.size() > 1) {
1381  const auto total_entry_count = std::accumulate(
1382  results_per_device.begin(),
1383  results_per_device.end(),
1384  size_t(0),
1385  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1386  const auto& r = rs.first;
1387  return init + r->getQueryMemDesc().getEntryCount();
1388  });
1389  CHECK(total_entry_count);
1390  auto query_mem_desc = first->getQueryMemDesc();
1391  query_mem_desc.setEntryCount(total_entry_count);
1392  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1395  row_set_mem_owner,
1396  catalog_,
1397  blockSize(),
1398  gridSize());
1399  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1400  reduced_results->initializeStorage();
1401  switch (query_mem_desc.getEffectiveKeyWidth()) {
1402  case 4:
1403  first->getStorage()->moveEntriesToBuffer<int32_t>(
1404  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1405  break;
1406  case 8:
1407  first->getStorage()->moveEntriesToBuffer<int64_t>(
1408  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1409  break;
1410  default:
1411  CHECK(false);
1412  }
1413  } else {
1414  reduced_results = first;
1415  }
1416 
1417  int64_t compilation_queue_time = 0;
1418  const auto reduction_code =
1419  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1420 
1421  for (size_t i = 1; i < results_per_device.size(); ++i) {
1422  reduced_results->getStorage()->reduce(
1423  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1424  }
1425  reduced_results->addCompilationQueueTime(compilation_queue_time);
1426  reduced_results->invalidateCachedRowCount();
1427  return reduced_results;
1428 }
1429 
1431  const RelAlgExecutionUnit& ra_exe_unit,
1432  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1433  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1434  const QueryMemoryDescriptor& query_mem_desc) const {
1435  if (results_per_device.size() == 1) {
1436  return std::move(results_per_device.front().first);
1437  }
1438  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1440  for (const auto& result : results_per_device) {
1441  auto rows = result.first;
1442  CHECK(rows);
1443  if (!rows) {
1444  continue;
1445  }
1446  SpeculativeTopNMap that(
1447  *rows,
1448  ra_exe_unit.target_exprs,
1449  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1450  m.reduce(that);
1451  }
1452  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1453  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1454  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1455 }
1456 
1457 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1458  CHECK(data_mgr);
1459  std::unordered_set<int> available_gpus;
1460  if (data_mgr->gpusPresent()) {
1461  CHECK(data_mgr->getCudaMgr());
1462  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1463  CHECK_GT(gpu_count, 0);
1464  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1465  available_gpus.insert(gpu_id);
1466  }
1467  }
1468  return available_gpus;
1469 }
1470 
1471 size_t get_context_count(const ExecutorDeviceType device_type,
1472  const size_t cpu_count,
1473  const size_t gpu_count) {
1474  return device_type == ExecutorDeviceType::GPU ? gpu_count
1475  : static_cast<size_t>(cpu_count);
1476 }
1477 
1478 namespace {
1479 
1480 // Compute a very conservative entry count for the output buffer entry count using no
1481 // other information than the number of tuples in each table and multiplying them
1482 // together.
1483 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1485  // Check for overflows since we're multiplying potentially big table sizes.
1486  using checked_size_t = boost::multiprecision::number<
1487  boost::multiprecision::cpp_int_backend<64,
1488  64,
1489  boost::multiprecision::unsigned_magnitude,
1490  boost::multiprecision::checked,
1491  void>>;
1492  checked_size_t max_groups_buffer_entry_guess = 1;
1493  for (const auto& query_info : query_infos) {
1494  CHECK(!query_info.info.fragments.empty());
1495  auto it = std::max_element(query_info.info.fragments.begin(),
1496  query_info.info.fragments.end(),
1497  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1498  return f1.getNumTuples() < f2.getNumTuples();
1499  });
1500  max_groups_buffer_entry_guess *= it->getNumTuples();
1501  }
1502  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1503  // baseline group layout with that many entries anyway.
1504  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1505  try {
1506  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1507  max_groups_buffer_entry_guess_cap);
1508  } catch (...) {
1509  return max_groups_buffer_entry_guess_cap;
1510  }
1511 }
1512 
1513 std::string get_table_name(const InputDescriptor& input_desc,
1515  const auto source_type = input_desc.getSourceType();
1516  if (source_type == InputSourceType::TABLE) {
1517  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1518  CHECK(td);
1519  return td->tableName;
1520  } else {
1521  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1522  }
1523 }
1524 
1525 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1526  const int device_count) {
1527  if (device_type == ExecutorDeviceType::GPU) {
1528  return device_count * Executor::high_scan_limit;
1529  }
1531 }
1532 
1534  const std::vector<InputTableInfo>& table_infos,
1536  const ExecutorDeviceType device_type,
1537  const int device_count) {
1538  for (const auto target_expr : ra_exe_unit.target_exprs) {
1539  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1540  return;
1541  }
1542  }
1543  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1544  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1545  // Allow a query with no scan limit to run on small tables
1546  return;
1547  }
1548  if (ra_exe_unit.use_bump_allocator) {
1549  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1550  // relative to the size of the input), so we bypass this check for now
1551  return;
1552  }
1553  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1554  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1555  (!ra_exe_unit.scan_limit ||
1556  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1557  std::vector<std::string> table_names;
1558  const auto& input_descs = ra_exe_unit.input_descs;
1559  for (const auto& input_desc : input_descs) {
1560  table_names.push_back(get_table_name(input_desc, cat));
1561  }
1562  if (!ra_exe_unit.scan_limit) {
1563  throw WatchdogException(
1564  "Projection query would require a scan without a limit on table(s): " +
1565  boost::algorithm::join(table_names, ", "));
1566  } else {
1567  throw WatchdogException(
1568  "Projection query output result set on table(s): " +
1569  boost::algorithm::join(table_names, ", ") + " would contain " +
1570  std::to_string(ra_exe_unit.scan_limit) +
1571  " rows, which is more than the current system limit of " +
1572  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1573  }
1574  }
1575 }
1576 
1577 } // namespace
1578 
1579 size_t get_loop_join_size(const std::vector<InputTableInfo>& query_infos,
1580  const RelAlgExecutionUnit& ra_exe_unit) {
1581  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1582 
1583  std::optional<size_t> inner_table_idx;
1584  for (size_t i = 0; i < query_infos.size(); ++i) {
1585  if (query_infos[i].table_id == inner_table_id) {
1586  inner_table_idx = i;
1587  break;
1588  }
1589  }
1590  CHECK(inner_table_idx);
1591  return query_infos[*inner_table_idx].info.getNumTuples();
1592 }
1593 
1594 namespace {
1595 
1596 template <typename T>
1597 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1598  std::vector<std::string> expr_strs;
1599  for (const auto& expr : expr_container) {
1600  if (!expr) {
1601  expr_strs.emplace_back("NULL");
1602  } else {
1603  expr_strs.emplace_back(expr->toString());
1604  }
1605  }
1606  return expr_strs;
1607 }
1608 
1609 template <>
1610 std::vector<std::string> expr_container_to_string(
1611  const std::list<Analyzer::OrderEntry>& expr_container) {
1612  std::vector<std::string> expr_strs;
1613  for (const auto& expr : expr_container) {
1614  expr_strs.emplace_back(expr.toString());
1615  }
1616  return expr_strs;
1617 }
1618 
1619 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1620  switch (algorithm) {
1622  return "ResultSet";
1624  return "Speculative Top N";
1626  return "Streaming Top N";
1627  }
1628  UNREACHABLE();
1629  return "";
1630 }
1631 
1632 } // namespace
1633 
1634 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1635  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1636  // instead of ra_exec_unit description if possible
1637  std::ostringstream os;
1638  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1639  const auto& scan_desc = input_col_desc->getScanDesc();
1640  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1641  << scan_desc.getNestLevel();
1642  }
1643  if (!ra_exe_unit.simple_quals.empty()) {
1644  for (const auto& qual : ra_exe_unit.simple_quals) {
1645  if (qual) {
1646  os << qual->toString() << ",";
1647  }
1648  }
1649  }
1650  if (!ra_exe_unit.quals.empty()) {
1651  for (const auto& qual : ra_exe_unit.quals) {
1652  if (qual) {
1653  os << qual->toString() << ",";
1654  }
1655  }
1656  }
1657  if (!ra_exe_unit.join_quals.empty()) {
1658  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1659  const auto& join_condition = ra_exe_unit.join_quals[i];
1660  os << std::to_string(i) << ::toString(join_condition.type);
1661  for (const auto& qual : join_condition.quals) {
1662  if (qual) {
1663  os << qual->toString() << ",";
1664  }
1665  }
1666  }
1667  }
1668  if (!ra_exe_unit.groupby_exprs.empty()) {
1669  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1670  if (qual) {
1671  os << qual->toString() << ",";
1672  }
1673  }
1674  }
1675  for (const auto& expr : ra_exe_unit.target_exprs) {
1676  if (expr) {
1677  os << expr->toString() << ",";
1678  }
1679  }
1680  os << ::toString(ra_exe_unit.estimator == nullptr);
1681  os << std::to_string(ra_exe_unit.scan_limit);
1682  return os.str();
1683 }
1684 
1685 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1686  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
1687  os << "\n\tTable/Col/Levels: ";
1688  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1689  const auto& scan_desc = input_col_desc->getScanDesc();
1690  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1691  << scan_desc.getNestLevel() << ") ";
1692  }
1693  if (!ra_exe_unit.simple_quals.empty()) {
1694  os << "\n\tSimple Quals: "
1696  ", ");
1697  }
1698  if (!ra_exe_unit.quals.empty()) {
1699  os << "\n\tQuals: "
1700  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1701  }
1702  if (!ra_exe_unit.join_quals.empty()) {
1703  os << "\n\tJoin Quals: ";
1704  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1705  const auto& join_condition = ra_exe_unit.join_quals[i];
1706  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1707  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1708  }
1709  }
1710  if (!ra_exe_unit.groupby_exprs.empty()) {
1711  os << "\n\tGroup By: "
1713  ", ");
1714  }
1715  os << "\n\tProjected targets: "
1717  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1718  os << "\n\tSort Info: ";
1719  const auto& sort_info = ra_exe_unit.sort_info;
1720  os << "\n\t Order Entries: "
1721  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1722  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1723  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1724  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1725  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1726  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1727  if (ra_exe_unit.union_all) {
1728  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1729  }
1730  return os;
1731 }
1732 
1733 namespace {
1734 
1736  const size_t new_scan_limit) {
1737  return {ra_exe_unit_in.input_descs,
1738  ra_exe_unit_in.input_col_descs,
1739  ra_exe_unit_in.simple_quals,
1740  ra_exe_unit_in.quals,
1741  ra_exe_unit_in.join_quals,
1742  ra_exe_unit_in.groupby_exprs,
1743  ra_exe_unit_in.target_exprs,
1744  ra_exe_unit_in.target_exprs_original_type_infos,
1745  ra_exe_unit_in.estimator,
1746  ra_exe_unit_in.sort_info,
1747  new_scan_limit,
1748  ra_exe_unit_in.query_hint,
1749  ra_exe_unit_in.query_plan_dag_hash,
1750  ra_exe_unit_in.hash_table_build_plan_dag,
1751  ra_exe_unit_in.table_id_to_node_map,
1752  ra_exe_unit_in.use_bump_allocator,
1753  ra_exe_unit_in.union_all,
1754  ra_exe_unit_in.query_state};
1755 }
1756 
1757 } // namespace
1758 
1759 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1760  const bool is_agg,
1761  const std::vector<InputTableInfo>& query_infos,
1762  const RelAlgExecutionUnit& ra_exe_unit_in,
1763  const CompilationOptions& co,
1764  const ExecutionOptions& eo,
1766  RenderInfo* render_info,
1767  const bool has_cardinality_estimation,
1768  ColumnCacheMap& column_cache) {
1769  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1770 
1771  ScopeGuard cleanup_post_execution = [this] {
1772  // cleanup/unpin GPU buffer allocations
1773  // TODO: separate out this state into a single object
1774  plan_state_.reset(nullptr);
1775  if (cgen_state_) {
1776  cgen_state_->in_values_bitmaps_.clear();
1777  cgen_state_->str_dict_translation_mgrs_.clear();
1778  }
1779  };
1780 
1781  try {
1782  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1783  is_agg,
1784  true,
1785  query_infos,
1786  ra_exe_unit_in,
1787  co,
1788  eo,
1789  cat,
1791  render_info,
1792  has_cardinality_estimation,
1793  column_cache);
1794  if (result) {
1795  result->setKernelQueueTime(kernel_queue_time_ms_);
1796  result->addCompilationQueueTime(compilation_queue_time_ms_);
1797  if (eo.just_validate) {
1798  result->setValidationOnlyRes();
1799  }
1800  }
1801  return result;
1802  } catch (const CompilationRetryNewScanLimit& e) {
1803  auto result =
1804  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1805  is_agg,
1806  false,
1807  query_infos,
1808  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1809  co,
1810  eo,
1811  cat,
1813  render_info,
1814  has_cardinality_estimation,
1815  column_cache);
1816  if (result) {
1817  result->setKernelQueueTime(kernel_queue_time_ms_);
1818  result->addCompilationQueueTime(compilation_queue_time_ms_);
1819  if (eo.just_validate) {
1820  result->setValidationOnlyRes();
1821  }
1822  }
1823  return result;
1824  }
1825 }
1826 
1828  size_t& max_groups_buffer_entry_guess,
1829  const bool is_agg,
1830  const bool allow_single_frag_table_opt,
1831  const std::vector<InputTableInfo>& query_infos,
1832  const RelAlgExecutionUnit& ra_exe_unit_in,
1833  const CompilationOptions& co,
1834  const ExecutionOptions& eo,
1836  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1837  RenderInfo* render_info,
1838  const bool has_cardinality_estimation,
1839  ColumnCacheMap& column_cache) {
1840  INJECT_TIMER(Exec_executeWorkUnit);
1841  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1842  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1843  CHECK(!query_infos.empty());
1844  if (!max_groups_buffer_entry_guess) {
1845  // The query has failed the first execution attempt because of running out
1846  // of group by slots. Make the conservative choice: allocate fragment size
1847  // slots and run on the CPU.
1848  CHECK(device_type == ExecutorDeviceType::CPU);
1849  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1850  }
1851 
1852  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1853  do {
1854  SharedKernelContext shared_context(query_infos);
1855  ColumnFetcher column_fetcher(this, column_cache);
1856  ScopeGuard scope_guard = [&column_fetcher] {
1857  column_fetcher.freeLinearizedBuf();
1858  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1859  };
1860  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1861  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1862 
1863  if (eo.executor_type == ExecutorType::Native) {
1864  try {
1865  INJECT_TIMER(query_step_compilation);
1866  query_mem_desc_owned =
1867  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1868  crt_min_byte_width,
1869  has_cardinality_estimation,
1870  ra_exe_unit,
1871  query_infos,
1872  deleted_cols_map,
1873  column_fetcher,
1874  {device_type,
1875  co.hoist_literals,
1876  co.opt_level,
1878  co.allow_lazy_fetch,
1880  co.explain_type,
1882  eo,
1883  render_info,
1884  this);
1885  CHECK(query_mem_desc_owned);
1886  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1887  } catch (CompilationRetryNoCompaction&) {
1888  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1889  continue;
1890  }
1891  } else {
1892  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1893  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1894  CHECK(!query_mem_desc_owned);
1895  query_mem_desc_owned.reset(
1897  }
1898  if (eo.just_explain) {
1899  return executeExplain(*query_comp_desc_owned);
1900  }
1901 
1902  for (const auto target_expr : ra_exe_unit.target_exprs) {
1903  plan_state_->target_exprs_.push_back(target_expr);
1904  }
1905 
1906  if (!eo.just_validate) {
1907  int available_cpus = cpu_threads();
1908  auto available_gpus = get_available_gpus(data_mgr_);
1909 
1910  const auto context_count =
1911  get_context_count(device_type, available_cpus, available_gpus.size());
1912  try {
1913  auto kernels = createKernels(shared_context,
1914  ra_exe_unit,
1915  column_fetcher,
1916  query_infos,
1917  eo,
1918  is_agg,
1919  allow_single_frag_table_opt,
1920  context_count,
1921  *query_comp_desc_owned,
1922  *query_mem_desc_owned,
1923  render_info,
1924  available_gpus,
1925  available_cpus);
1926  launchKernels(
1927  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1928  } catch (QueryExecutionError& e) {
1929  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1930  e.getErrorCode() == ERR_OUT_OF_TIME) {
1932  }
1933  if (e.getErrorCode() == ERR_INTERRUPTED) {
1935  }
1937  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1938  crt_min_byte_width <<= 1;
1939  continue;
1940  }
1941  throw;
1942  }
1943  }
1944  if (is_agg) {
1945  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1946  // update query status to let user know we are now in the reduction phase
1947  std::string curRunningSession{""};
1948  std::string curRunningQuerySubmittedTime{""};
1949  bool sessionEnrolled = false;
1950  {
1953  curRunningSession = getCurrentQuerySession(session_read_lock);
1954  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1955  sessionEnrolled =
1956  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1957  }
1958  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1959  sessionEnrolled) {
1960  updateQuerySessionStatus(curRunningSession,
1961  curRunningQuerySubmittedTime,
1963  }
1964  }
1965  try {
1966  return collectAllDeviceResults(shared_context,
1967  ra_exe_unit,
1968  *query_mem_desc_owned,
1969  query_comp_desc_owned->getDeviceType(),
1970  row_set_mem_owner);
1971  } catch (ReductionRanOutOfSlots&) {
1973  } catch (OverflowOrUnderflow&) {
1974  crt_min_byte_width <<= 1;
1975  continue;
1976  } catch (QueryExecutionError& e) {
1977  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1978  << ", what(): " << e.what();
1979  throw QueryExecutionError(e.getErrorCode());
1980  }
1981  }
1982  return resultsUnion(shared_context, ra_exe_unit);
1983 
1984  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1985 
1986  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1989  nullptr,
1990  catalog_,
1991  blockSize(),
1992  gridSize());
1993 }
1994 
1996  const RelAlgExecutionUnit& ra_exe_unit_in,
1997  const InputTableInfo& table_info,
1998  const CompilationOptions& co,
1999  const ExecutionOptions& eo,
2001  PerFragmentCallBack& cb,
2002  const std::set<size_t>& fragment_indexes_param) {
2003  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
2004  ColumnCacheMap column_cache;
2005 
2006  std::vector<InputTableInfo> table_infos{table_info};
2007  SharedKernelContext kernel_context(table_infos);
2008 
2009  ColumnFetcher column_fetcher(this, column_cache);
2010  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2011  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2012  {
2013  query_mem_desc_owned =
2014  query_comp_desc_owned->compile(0,
2015  8,
2016  /*has_cardinality_estimation=*/false,
2017  ra_exe_unit,
2018  table_infos,
2019  deleted_cols_map,
2020  column_fetcher,
2021  co,
2022  eo,
2023  nullptr,
2024  this);
2025  }
2026  CHECK(query_mem_desc_owned);
2027  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2028  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
2029  const auto& outer_fragments = table_info.info.fragments;
2030 
2031  std::set<size_t> fragment_indexes;
2032  if (fragment_indexes_param.empty()) {
2033  // An empty `fragment_indexes_param` set implies executing
2034  // the query for all fragments in the table. In this
2035  // case, populate `fragment_indexes` with all fragment indexes.
2036  for (size_t i = 0; i < outer_fragments.size(); i++) {
2037  fragment_indexes.emplace(i);
2038  }
2039  } else {
2040  fragment_indexes = fragment_indexes_param;
2041  }
2042 
2043  {
2044  auto clock_begin = timer_start();
2045  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2046  kernel_queue_time_ms_ += timer_stop(clock_begin);
2047 
2048  for (auto fragment_index : fragment_indexes) {
2049  // We may want to consider in the future allowing this to execute on devices other
2050  // than CPU
2051  FragmentsList fragments_list{{table_id, {fragment_index}}};
2052  ExecutionKernel kernel(ra_exe_unit,
2053  co.device_type,
2054  /*device_id=*/0,
2055  eo,
2056  column_fetcher,
2057  *query_comp_desc_owned,
2058  *query_mem_desc_owned,
2059  fragments_list,
2061  /*render_info=*/nullptr,
2062  /*rowid_lookup_key=*/-1);
2063  kernel.run(this, 0, kernel_context);
2064  }
2065  }
2066 
2067  const auto& all_fragment_results = kernel_context.getFragmentResults();
2068 
2069  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2070  CHECK_EQ(result_fragment_indexes.size(), 1);
2071  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2072  }
2073 }
2074 
2076  const TableFunctionExecutionUnit exe_unit,
2077  const std::vector<InputTableInfo>& table_infos,
2078  const CompilationOptions& co,
2079  const ExecutionOptions& eo,
2081  INJECT_TIMER(Exec_executeTableFunction);
2082  if (eo.just_validate) {
2084  /*entry_count=*/0,
2086  /*is_table_function=*/true);
2087  query_mem_desc.setOutputColumnar(true);
2088  return std::make_shared<ResultSet>(
2089  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2090  co.device_type,
2091  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2092  this->getRowSetMemoryOwner(),
2093  this->getCatalog(),
2094  this->blockSize(),
2095  this->gridSize());
2096  }
2097 
2098  // Avoid compile functions that set the sizer at runtime if the device is GPU
2099  // This should be fixed in the python script as well to minimize the number of
2100  // QueryMustRunOnCpu exceptions
2103  throw QueryMustRunOnCpu();
2104  }
2105 
2106  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2107  // framework, we may want to move this up a level
2108 
2109  ColumnFetcher column_fetcher(this, column_cache);
2111 
2112  if (exe_unit.table_func.containsPreFlightFn()) {
2113  std::shared_ptr<CompilationContext> compilation_context;
2114  {
2115  Executor::CgenStateManager cgenstate_manager(*this,
2116  false,
2117  table_infos,
2119  nullptr); // locks compilation_mutex
2121  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2122  compilation_context =
2123  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2124  }
2125  exe_context.execute(exe_unit,
2126  table_infos,
2127  compilation_context,
2128  column_fetcher,
2130  this,
2131  true /* is_pre_launch_udtf */);
2132  }
2133  std::shared_ptr<CompilationContext> compilation_context;
2134  {
2135  Executor::CgenStateManager cgenstate_manager(*this,
2136  false,
2137  table_infos,
2139  nullptr); // locks compilation_mutex
2140  TableFunctionCompilationContext tf_compilation_context(this, co);
2141  compilation_context =
2142  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2143  }
2144  return exe_context.execute(exe_unit,
2145  table_infos,
2146  compilation_context,
2147  column_fetcher,
2148  co.device_type,
2149  this,
2150  false /* is_pre_launch_udtf */);
2151 }
2152 
2154  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2155 }
2156 
2158  const RelAlgExecutionUnit& ra_exe_unit,
2159  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2160  TransientDictIdVisitor dict_id_visitor;
2161 
2162  auto visit_expr =
2163  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2164  if (!expr) {
2165  return;
2166  }
2167  const auto dict_id = dict_id_visitor.visit(expr);
2168  if (dict_id >= 0) {
2169  auto sdp = getStringDictionaryProxy(dict_id, row_set_mem_owner, true);
2170  CHECK(sdp);
2171  TransientStringLiteralsVisitor visitor(sdp, this);
2172  visitor.visit(expr);
2173  }
2174  };
2175 
2176  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2177  visit_expr(group_expr.get());
2178  }
2179 
2180  for (const auto& group_expr : ra_exe_unit.quals) {
2181  visit_expr(group_expr.get());
2182  }
2183 
2184  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2185  visit_expr(group_expr.get());
2186  }
2187 
2188  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2189  const auto& target_type = target_expr->get_type_info();
2190  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2191  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2192  if (agg_expr) {
2193  if (agg_expr->get_aggtype() == kSINGLE_VALUE ||
2194  agg_expr->get_aggtype() == kSAMPLE) {
2195  visit_expr(agg_expr->get_arg());
2196  }
2197  } else {
2198  visit_expr(target_expr);
2199  }
2200  }
2201  };
2202  const auto& target_exprs = ra_exe_unit.target_exprs;
2203  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2204  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2205  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2206 }
2207 
2209  const RelAlgExecutionUnit& ra_exe_unit,
2210  const ExecutorDeviceType requested_device_type) {
2211  for (const auto target_expr : ra_exe_unit.target_exprs) {
2212  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2213  if (!ra_exe_unit.groupby_exprs.empty() &&
2214  !isArchPascalOrLater(requested_device_type)) {
2215  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
2216  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2217  return ExecutorDeviceType::CPU;
2218  }
2219  }
2220  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2221  return ExecutorDeviceType::CPU;
2222  }
2223  }
2224  return requested_device_type;
2225 }
2226 
2227 namespace {
2228 
2229 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2230  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2231  if (ti.is_fp()) {
2232  if (float_argument_input && ti.get_type() == kFLOAT) {
2233  int64_t float_null_val = 0;
2234  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2235  static_cast<float>(inline_fp_null_val(ti));
2236  return float_null_val;
2237  }
2238  const auto double_null_val = inline_fp_null_val(ti);
2239  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2240  }
2241  return inline_int_null_val(ti);
2242 }
2243 
2244 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2245  std::vector<int64_t>& entry,
2246  const std::vector<Analyzer::Expr*>& target_exprs,
2248  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2249  const auto target_expr = target_exprs[target_idx];
2250  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2251  CHECK(agg_info.is_agg);
2252  target_infos.push_back(agg_info);
2253  if (g_cluster) {
2254  const auto executor = query_mem_desc.getExecutor();
2255  CHECK(executor);
2256  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2257  CHECK(row_set_mem_owner);
2258  const auto& count_distinct_desc =
2259  query_mem_desc.getCountDistinctDescriptor(target_idx);
2260  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2261  CHECK(row_set_mem_owner);
2262  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2263  count_distinct_desc.bitmapPaddedSizeBytes(),
2264  /*thread_idx=*/0); // TODO: can we detect thread idx here?
2265  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2266  continue;
2267  }
2268  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2269  auto count_distinct_set = new CountDistinctSet();
2270  CHECK(row_set_mem_owner);
2271  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2272  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2273  continue;
2274  }
2275  }
2276  const bool float_argument_input = takes_float_argument(agg_info);
2277  if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2278  entry.push_back(0);
2279  } else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2280  entry.push_back(0);
2281  entry.push_back(0);
2282  } else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2283  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2284  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2285  entry.push_back(0);
2286  }
2287  } else if (agg_info.sql_type.is_varlen()) {
2288  entry.push_back(0);
2289  entry.push_back(0);
2290  } else {
2291  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2292  }
2293  } else {
2294  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2295  }
2296  }
2297 }
2298 
2300  const std::vector<Analyzer::Expr*>& target_exprs_in,
2302  const ExecutorDeviceType device_type) {
2303  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2304  std::vector<Analyzer::Expr*> target_exprs;
2305  for (const auto target_expr : target_exprs_in) {
2306  const auto target_expr_copy =
2307  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2308  CHECK(target_expr_copy);
2309  auto ti = target_expr->get_type_info();
2310  ti.set_notnull(false);
2311  target_expr_copy->set_type_info(ti);
2312  if (target_expr_copy->get_arg()) {
2313  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2314  arg_ti.set_notnull(false);
2315  target_expr_copy->get_arg()->set_type_info(arg_ti);
2316  }
2317  target_exprs_owned_copies.push_back(target_expr_copy);
2318  target_exprs.push_back(target_expr_copy.get());
2319  }
2320  std::vector<TargetInfo> target_infos;
2321  std::vector<int64_t> entry;
2322  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2323  const auto executor = query_mem_desc.getExecutor();
2324  CHECK(executor);
2325  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2326  CHECK(row_set_mem_owner);
2327  auto rs = std::make_shared<ResultSet>(target_infos,
2328  device_type,
2330  row_set_mem_owner,
2331  executor->getCatalog(),
2332  executor->blockSize(),
2333  executor->gridSize());
2334  rs->allocateStorage();
2335  rs->fillOneEntry(entry);
2336  return rs;
2337 }
2338 
2339 } // namespace
2340 
2342  SharedKernelContext& shared_context,
2343  const RelAlgExecutionUnit& ra_exe_unit,
2345  const ExecutorDeviceType device_type,
2346  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2347  auto timer = DEBUG_TIMER(__func__);
2348  auto& result_per_device = shared_context.getFragmentResults();
2349  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2352  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2353  }
2354  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2355  try {
2356  return reduceSpeculativeTopN(
2357  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2358  } catch (const std::bad_alloc&) {
2359  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2360  }
2361  }
2362  const auto shard_count =
2363  device_type == ExecutorDeviceType::GPU
2365  : 0;
2366 
2367  if (shard_count && !result_per_device.empty()) {
2368  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
2369  }
2370  return reduceMultiDeviceResults(
2371  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2372 }
2373 
2374 namespace {
2385 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2386  const QueryMemoryDescriptor& input_query_mem_desc,
2387  const ResultSetStorage* output_storage,
2388  size_t output_row_index,
2389  const QueryMemoryDescriptor& output_query_mem_desc,
2390  const std::vector<uint32_t>& top_permutation) {
2391  const auto output_buffer = output_storage->getUnderlyingBuffer();
2392  const auto input_buffer = input_storage->getUnderlyingBuffer();
2393  for (const auto sorted_idx : top_permutation) {
2394  // permuting all group-columns in this result set into the final buffer:
2395  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2396  group_idx++) {
2397  const auto input_column_ptr =
2398  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2399  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2400  const auto output_column_ptr =
2401  output_buffer +
2402  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2403  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2404  memcpy(output_column_ptr,
2405  input_column_ptr,
2406  output_query_mem_desc.groupColWidth(group_idx));
2407  }
2408  // permuting all agg-columns in this result set into the final buffer:
2409  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2410  slot_idx++) {
2411  const auto input_column_ptr =
2412  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2413  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2414  const auto output_column_ptr =
2415  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2416  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2417  memcpy(output_column_ptr,
2418  input_column_ptr,
2419  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2420  }
2421  ++output_row_index;
2422  }
2423  return output_row_index;
2424 }
2425 
2435 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2436  const ResultSetStorage* output_storage,
2437  size_t output_row_index,
2438  const QueryMemoryDescriptor& output_query_mem_desc,
2439  const std::vector<uint32_t>& top_permutation) {
2440  const auto output_buffer = output_storage->getUnderlyingBuffer();
2441  const auto input_buffer = input_storage->getUnderlyingBuffer();
2442  for (const auto sorted_idx : top_permutation) {
2443  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2444  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2445  row_ptr,
2446  output_query_mem_desc.getRowSize());
2447  ++output_row_index;
2448  }
2449  return output_row_index;
2450 }
2451 } // namespace
2452 
2453 // Collect top results from each device, stitch them together and sort. Partial
2454 // results from each device are guaranteed to be disjunct because we only go on
2455 // this path when one of the columns involved is a shard key.
2457  SharedKernelContext& shared_context,
2458  const RelAlgExecutionUnit& ra_exe_unit) const {
2459  auto& result_per_device = shared_context.getFragmentResults();
2460  const auto first_result_set = result_per_device.front().first;
2461  CHECK(first_result_set);
2462  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2463  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2464  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2465  top_query_mem_desc.setEntryCount(0);
2466  for (auto& result : result_per_device) {
2467  const auto result_set = result.first;
2468  CHECK(result_set);
2469  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2470  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2471  top_query_mem_desc.setEntryCount(new_entry_cnt);
2472  }
2473  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2474  first_result_set->getDeviceType(),
2475  top_query_mem_desc,
2476  first_result_set->getRowSetMemOwner(),
2477  catalog_,
2478  blockSize(),
2479  gridSize());
2480  auto top_storage = top_result_set->allocateStorage();
2481  size_t top_output_row_idx{0};
2482  for (auto& result : result_per_device) {
2483  const auto result_set = result.first;
2484  CHECK(result_set);
2485  const auto& top_permutation = result_set->getPermutationBuffer();
2486  CHECK_LE(top_permutation.size(), top_n);
2487  if (top_query_mem_desc.didOutputColumnar()) {
2488  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2489  result_set->getQueryMemDesc(),
2490  top_storage,
2491  top_output_row_idx,
2492  top_query_mem_desc,
2493  top_permutation);
2494  } else {
2495  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2496  top_storage,
2497  top_output_row_idx,
2498  top_query_mem_desc,
2499  top_permutation);
2500  }
2501  }
2502  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2503  return top_result_set;
2504 }
2505 
2506 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2507  const {
2508  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2509  const auto& join_info = plan_state_->join_info_;
2510  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2511  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2512  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2513  id_to_cond.insert(
2514  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2515  }
2516  return id_to_cond;
2517 }
2518 
2519 namespace {
2520 
2521 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2522  for (const auto& col : fetched_cols) {
2523  if (col.is_lazily_fetched) {
2524  return true;
2525  }
2526  }
2527  return false;
2528 }
2529 
2530 } // namespace
2531 
2532 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2533  SharedKernelContext& shared_context,
2534  const RelAlgExecutionUnit& ra_exe_unit,
2535  ColumnFetcher& column_fetcher,
2536  const std::vector<InputTableInfo>& table_infos,
2537  const ExecutionOptions& eo,
2538  const bool is_agg,
2539  const bool allow_single_frag_table_opt,
2540  const size_t context_count,
2541  const QueryCompilationDescriptor& query_comp_desc,
2543  RenderInfo* render_info,
2544  std::unordered_set<int>& available_gpus,
2545  int& available_cpus) {
2546  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2547 
2548  QueryFragmentDescriptor fragment_descriptor(
2549  ra_exe_unit,
2550  table_infos,
2551  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2553  : std::vector<Data_Namespace::MemoryInfo>{},
2556  CHECK(!ra_exe_unit.input_descs.empty());
2557 
2558  const auto device_type = query_comp_desc.getDeviceType();
2559  const bool uses_lazy_fetch =
2560  plan_state_->allow_lazy_fetch_ &&
2562  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2563  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2564  const auto device_count = deviceCount(device_type);
2565  CHECK_GT(device_count, 0);
2566 
2567  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2568  shared_context.getFragOffsets(),
2569  device_count,
2570  device_type,
2571  use_multifrag_kernel,
2573  this);
2574  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2575  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2576  }
2577 
2578  if (use_multifrag_kernel) {
2579  VLOG(1) << "Creating multifrag execution kernels";
2580  VLOG(1) << query_mem_desc.toString();
2581 
2582  // NB: We should never be on this path when the query is retried because of running
2583  // out of group by slots; also, for scan only queries on CPU we want the
2584  // high-granularity, fragment by fragment execution instead. For scan only queries on
2585  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2586  // buffer per fragment.
2587  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2588  &execution_kernels,
2589  &column_fetcher,
2590  &eo,
2591  &query_comp_desc,
2592  &query_mem_desc,
2593  render_info](const int device_id,
2594  const FragmentsList& frag_list,
2595  const int64_t rowid_lookup_key) {
2596  execution_kernels.emplace_back(
2597  std::make_unique<ExecutionKernel>(ra_exe_unit,
2599  device_id,
2600  eo,
2601  column_fetcher,
2602  query_comp_desc,
2603  query_mem_desc,
2604  frag_list,
2606  render_info,
2607  rowid_lookup_key));
2608  };
2609  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2610  } else {
2611  VLOG(1) << "Creating one execution kernel per fragment";
2612  VLOG(1) << query_mem_desc.toString();
2613 
2614  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2616  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2617  const auto max_frag_size =
2618  table_infos.front().info.getFragmentNumTuplesUpperBound();
2619  if (max_frag_size < query_mem_desc.getEntryCount()) {
2620  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2621  << " to match max fragment size " << max_frag_size
2622  << " for kernel per fragment execution path.";
2623  throw CompilationRetryNewScanLimit(max_frag_size);
2624  }
2625  }
2626 
2627  size_t frag_list_idx{0};
2628  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2629  &execution_kernels,
2630  &column_fetcher,
2631  &eo,
2632  &frag_list_idx,
2633  &device_type,
2634  &query_comp_desc,
2635  &query_mem_desc,
2636  render_info](const int device_id,
2637  const FragmentsList& frag_list,
2638  const int64_t rowid_lookup_key) {
2639  if (!frag_list.size()) {
2640  return;
2641  }
2642  CHECK_GE(device_id, 0);
2643 
2644  execution_kernels.emplace_back(
2645  std::make_unique<ExecutionKernel>(ra_exe_unit,
2646  device_type,
2647  device_id,
2648  eo,
2649  column_fetcher,
2650  query_comp_desc,
2651  query_mem_desc,
2652  frag_list,
2654  render_info,
2655  rowid_lookup_key));
2656  ++frag_list_idx;
2657  };
2658 
2659  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2660  ra_exe_unit);
2661  }
2662 
2663  return execution_kernels;
2664 }
2665 
2667  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2668  const ExecutorDeviceType device_type) {
2669  auto clock_begin = timer_start();
2670  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2671  kernel_queue_time_ms_ += timer_stop(clock_begin);
2672 
2674  // A hack to have unused unit for results collection.
2675  const RelAlgExecutionUnit* ra_exe_unit =
2676  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
2677 
2678 #ifdef HAVE_TBB
2679  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
2680  shared_context.setThreadPool(&tg);
2681  }
2682  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
2683 #endif // HAVE_TBB
2684 
2685  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
2686  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
2687  size_t kernel_idx = 1;
2688  for (auto& kernel : kernels) {
2689  CHECK(kernel.get());
2690  tg.run([this,
2691  &kernel,
2692  &shared_context,
2694  parent_thread_id = logger::thread_id(),
2695  crt_kernel_idx = kernel_idx++] {
2697  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2698  const size_t thread_i = crt_kernel_idx % cpu_threads();
2699  kernel->run(this, thread_i, shared_context);
2700  });
2701  }
2702  tg.wait();
2703 
2704  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2705  // The first arg is used for GPU only, it's not our case.
2706  // TODO: add QueryExecutionContext::getRowSet() interface
2707  // for our case.
2708  if (exec_ctx) {
2709  ResultSetPtr results;
2710  if (ra_exe_unit->estimator) {
2711  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2712  } else {
2713  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2714  }
2715  shared_context.addDeviceResults(std::move(results), {});
2716  }
2717  }
2718 }
2719 
2721  const RelAlgExecutionUnit& ra_exe_unit,
2722  const ExecutorDeviceType device_type,
2723  const size_t table_idx,
2724  const size_t outer_frag_idx,
2725  std::map<int, const TableFragments*>& selected_tables_fragments,
2726  const std::unordered_map<int, const Analyzer::BinOper*>&
2727  inner_table_id_to_join_condition) {
2728  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2729  auto table_frags_it = selected_tables_fragments.find(table_id);
2730  CHECK(table_frags_it != selected_tables_fragments.end());
2731  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2732  const auto outer_table_fragments_it =
2733  selected_tables_fragments.find(outer_input_desc.getTableId());
2734  const auto outer_table_fragments = outer_table_fragments_it->second;
2735  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2736  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2737  if (!table_idx) {
2738  return {outer_frag_idx};
2739  }
2740  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2741  auto& inner_frags = table_frags_it->second;
2742  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2743  std::vector<size_t> all_frag_ids;
2744  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2745  ++inner_frag_idx) {
2746  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2747  if (skipFragmentPair(outer_fragment_info,
2748  inner_frag_info,
2749  table_idx,
2750  inner_table_id_to_join_condition,
2751  ra_exe_unit,
2752  device_type)) {
2753  continue;
2754  }
2755  all_frag_ids.push_back(inner_frag_idx);
2756  }
2757  return all_frag_ids;
2758 }
2759 
2760 // Returns true iff the join between two fragments cannot yield any results, per
2761 // shard information. The pair can be skipped to avoid full broadcast.
2763  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2764  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2765  const int table_idx,
2766  const std::unordered_map<int, const Analyzer::BinOper*>&
2767  inner_table_id_to_join_condition,
2768  const RelAlgExecutionUnit& ra_exe_unit,
2769  const ExecutorDeviceType device_type) {
2770  if (device_type != ExecutorDeviceType::GPU) {
2771  return false;
2772  }
2773  CHECK(table_idx >= 0 &&
2774  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2775  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2776  // Both tables need to be sharded the same way.
2777  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2778  outer_fragment_info.shard == inner_fragment_info.shard) {
2779  return false;
2780  }
2781  const Analyzer::BinOper* join_condition{nullptr};
2782  if (ra_exe_unit.join_quals.empty()) {
2783  CHECK(!inner_table_id_to_join_condition.empty());
2784  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2785  CHECK(condition_it != inner_table_id_to_join_condition.end());
2786  join_condition = condition_it->second;
2787  CHECK(join_condition);
2788  } else {
2789  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2790  plan_state_->join_info_.join_hash_tables_.size());
2791  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2792  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2793  table_idx) {
2794  CHECK(!join_condition);
2795  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2796  }
2797  }
2798  }
2799  if (!join_condition) {
2800  return false;
2801  }
2802  // TODO(adb): support fragment skipping based on the overlaps operator
2803  if (join_condition->is_overlaps_oper()) {
2804  return false;
2805  }
2806  size_t shard_count{0};
2807  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2808  join_condition->get_left_operand())) {
2809  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
2810  join_condition, *getCatalog(), getTemporaryTables())
2811  .first;
2813  join_condition, this, inner_outer_pairs);
2814  } else {
2815  shard_count = get_shard_count(join_condition, this);
2816  }
2817  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2818  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2819  }
2820  return shard_count;
2821 }
2822 
2823 namespace {
2824 
2827  const int table_id = col_desc->getScanDesc().getTableId();
2828  const int col_id = col_desc->getColId();
2829  return get_column_descriptor_maybe(col_id, table_id, cat);
2830 }
2831 
2832 } // namespace
2833 
2834 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2835  const std::vector<InputDescriptor>& input_descs,
2836  const std::map<int, const TableFragments*>& all_tables_fragments) {
2837  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2838  for (auto& desc : input_descs) {
2839  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2840  CHECK(fragments_it != all_tables_fragments.end());
2841  const auto& fragments = *fragments_it->second;
2842  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2843  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2844  frag_offsets[i] = off;
2845  off += fragments[i].getNumTuples();
2846  }
2847  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2848  }
2849  return tab_id_to_frag_offsets;
2850 }
2851 
2852 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2854  const RelAlgExecutionUnit& ra_exe_unit,
2855  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2856  const std::vector<InputDescriptor>& input_descs,
2857  const std::map<int, const TableFragments*>& all_tables_fragments) {
2858  std::vector<std::vector<int64_t>> all_num_rows;
2859  std::vector<std::vector<uint64_t>> all_frag_offsets;
2860  const auto tab_id_to_frag_offsets =
2861  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2862  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2863  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2864  std::vector<int64_t> num_rows;
2865  std::vector<uint64_t> frag_offsets;
2866  if (!ra_exe_unit.union_all) {
2867  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2868  }
2869  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2870  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2871  const auto fragments_it =
2872  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2873  CHECK(fragments_it != all_tables_fragments.end());
2874  const auto& fragments = *fragments_it->second;
2875  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2876  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2877  const auto& fragment = fragments[frag_id];
2878  num_rows.push_back(fragment.getNumTuples());
2879  } else {
2880  size_t total_row_count{0};
2881  for (const auto& fragment : fragments) {
2882  total_row_count += fragment.getNumTuples();
2883  }
2884  num_rows.push_back(total_row_count);
2885  }
2886  const auto frag_offsets_it =
2887  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2888  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2889  const auto& offsets = frag_offsets_it->second;
2890  CHECK_LT(frag_id, offsets.size());
2891  frag_offsets.push_back(offsets[frag_id]);
2892  }
2893  all_num_rows.push_back(num_rows);
2894  // Fragment offsets of outer table should be ONLY used by rowid for now.
2895  all_frag_offsets.push_back(frag_offsets);
2896  }
2897  return {all_num_rows, all_frag_offsets};
2898 }
2899 
2900 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2901 // all the table fragments.
2903  const RelAlgExecutionUnit& ra_exe_unit,
2904  const FragmentsList& selected_fragments) const {
2905  const auto& input_descs = ra_exe_unit.input_descs;
2906  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2907  if (nest_level < 1 ||
2908  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2909  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2910  (ra_exe_unit.join_quals.empty() &&
2911  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2912  return false;
2913  }
2914  const int table_id = inner_col_desc.getScanDesc().getTableId();
2915  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2916  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2917  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2918  return fragments.size() > 1;
2919 }
2920 
2922  const ColumnDescriptor* cd,
2923  const InputColDescriptor& inner_col_desc,
2924  const RelAlgExecutionUnit& ra_exe_unit,
2925  const FragmentsList& selected_fragments,
2926  const Data_Namespace::MemoryLevel memory_level) const {
2927  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2928  const int table_id = inner_col_desc.getScanDesc().getTableId();
2929  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2930  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2931  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2932  auto need_linearize =
2933  cd->columnType.is_array() ||
2935  return table_id > 0 && need_linearize && fragments.size() > 1;
2936 }
2937 
2938 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2939  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2940  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2941  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2942 }
2943 
2945  const ColumnFetcher& column_fetcher,
2946  const RelAlgExecutionUnit& ra_exe_unit,
2947  const int device_id,
2948  const Data_Namespace::MemoryLevel memory_level,
2949  const std::map<int, const TableFragments*>& all_tables_fragments,
2950  const FragmentsList& selected_fragments,
2952  std::list<ChunkIter>& chunk_iterators,
2953  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2954  DeviceAllocator* device_allocator,
2955  const size_t thread_idx,
2956  const bool allow_runtime_interrupt) {
2957  auto timer = DEBUG_TIMER(__func__);
2959  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2960  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2961  std::vector<size_t> local_col_to_frag_pos;
2962  buildSelectedFragsMapping(selected_fragments_crossjoin,
2963  local_col_to_frag_pos,
2964  col_global_ids,
2965  selected_fragments,
2966  ra_exe_unit);
2967 
2969  selected_fragments_crossjoin);
2970  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2971  std::vector<std::vector<int64_t>> all_num_rows;
2972  std::vector<std::vector<uint64_t>> all_frag_offsets;
2973  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2974  std::vector<const int8_t*> frag_col_buffers(
2975  plan_state_->global_to_local_col_ids_.size());
2976  for (const auto& col_id : col_global_ids) {
2977  if (allow_runtime_interrupt) {
2978  bool isInterrupted = false;
2979  {
2982  const auto query_session = getCurrentQuerySession(session_read_lock);
2983  isInterrupted =
2984  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2985  }
2986  if (isInterrupted) {
2988  }
2989  }
2990  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2992  }
2993  CHECK(col_id);
2994  const int table_id = col_id->getScanDesc().getTableId();
2995  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2996  if (cd && cd->isVirtualCol) {
2997  CHECK_EQ("rowid", cd->columnName);
2998  continue;
2999  }
3000  const auto fragments_it = all_tables_fragments.find(table_id);
3001  CHECK(fragments_it != all_tables_fragments.end());
3002  const auto fragments = fragments_it->second;
3003  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3004  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3005  CHECK_LT(static_cast<size_t>(it->second),
3006  plan_state_->global_to_local_col_ids_.size());
3007  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3008  if (!fragments->size()) {
3009  return {};
3010  }
3011  CHECK_LT(frag_id, fragments->size());
3012  auto memory_level_for_column = memory_level;
3013  auto tbl_col_ids =
3014  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId());
3015  if (plan_state_->columns_to_fetch_.find(tbl_col_ids) ==
3016  plan_state_->columns_to_fetch_.end()) {
3017  memory_level_for_column = Data_Namespace::CPU_LEVEL;
3018  }
3019  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3020  frag_col_buffers[it->second] =
3021  column_fetcher.getResultSetColumn(col_id.get(),
3022  memory_level_for_column,
3023  device_id,
3024  device_allocator,
3025  thread_idx);
3026  } else {
3027  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3028  // determine if we need special treatment to linearlize multi-frag table
3029  // i.e., a column that is classified as varlen type, i.e., array
3030  // for now, we only support fixed-length array that contains
3031  // geo point coordianates but we can support more types in this way
3033  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3034  bool for_lazy_fetch = false;
3035  if (plan_state_->columns_to_not_fetch_.find(tbl_col_ids) !=
3036  plan_state_->columns_to_not_fetch_.end()) {
3037  for_lazy_fetch = true;
3038  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3039  << ", col_name: " << cd->columnName << ")";
3040  }
3041  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3042  table_id,
3043  col_id->getColId(),
3044  all_tables_fragments,
3045  chunks,
3046  chunk_iterators,
3047  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3048  for_lazy_fetch ? 0 : device_id,
3049  device_allocator,
3050  thread_idx);
3051  } else {
3052  frag_col_buffers[it->second] =
3053  column_fetcher.getAllTableColumnFragments(table_id,
3054  col_id->getColId(),
3055  all_tables_fragments,
3056  memory_level_for_column,
3057  device_id,
3058  device_allocator,
3059  thread_idx);
3060  }
3061  } else {
3062  frag_col_buffers[it->second] =
3063  column_fetcher.getOneTableColumnFragment(table_id,
3064  frag_id,
3065  col_id->getColId(),
3066  all_tables_fragments,
3067  chunks,
3068  chunk_iterators,
3069  memory_level_for_column,
3070  device_id,
3071  device_allocator);
3072  }
3073  }
3074  }
3075  all_frag_col_buffers.push_back(frag_col_buffers);
3076  }
3077  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3078  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3079  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3080 }
3081 
3082 namespace {
3083 size_t get_selected_input_descs_index(int const table_id,
3084  std::vector<InputDescriptor> const& input_descs) {
3085  auto const has_table_id = [table_id](InputDescriptor const& input_desc) {
3086  return table_id == input_desc.getTableId();
3087  };
3088  return std::find_if(input_descs.begin(), input_descs.end(), has_table_id) -
3089  input_descs.begin();
3090 }
3091 
3093  int const table_id,
3094  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3095  auto const has_table_id = [table_id](auto const& input_desc) {
3096  return table_id == input_desc->getScanDesc().getTableId();
3097  };
3098  return std::distance(
3099  input_col_descs.begin(),
3100  std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_id));
3101 }
3102 
3103 std::list<std::shared_ptr<const InputColDescriptor>> get_selected_input_col_descs(
3104  int const table_id,
3105  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3106  std::list<std::shared_ptr<const InputColDescriptor>> selected;
3107  for (auto const& input_col_desc : input_col_descs) {
3108  if (table_id == input_col_desc->getScanDesc().getTableId()) {
3109  selected.push_back(input_col_desc);
3110  }
3111  }
3112  return selected;
3113 }
3114 
3115 // Set N consecutive elements of frag_col_buffers to ptr in the range of local_col_id.
3116 void set_mod_range(std::vector<int8_t const*>& frag_col_buffers,
3117  int8_t const* const ptr,
3118  size_t const local_col_id,
3119  size_t const N) {
3120  size_t const begin = local_col_id - local_col_id % N; // N divides begin
3121  size_t const end = begin + N;
3122  CHECK_LE(end, frag_col_buffers.size()) << (void*)ptr << ' ' << local_col_id << ' ' << N;
3123  for (size_t i = begin; i < end; ++i) {
3124  frag_col_buffers[i] = ptr;
3125  }
3126 }
3127 } // namespace
3128 
3129 // fetchChunks() assumes that multiple inputs implies a JOIN.
3130 // fetchUnionChunks() assumes that multiple inputs implies a UNION ALL.
3132  const ColumnFetcher& column_fetcher,
3133  const RelAlgExecutionUnit& ra_exe_unit,
3134  const int device_id,
3135  const Data_Namespace::MemoryLevel memory_level,
3136  const std::map<int, const TableFragments*>& all_tables_fragments,
3137  const FragmentsList& selected_fragments,
3139  std::list<ChunkIter>& chunk_iterators,
3140  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3141  DeviceAllocator* device_allocator,
3142  const size_t thread_idx,
3143  const bool allow_runtime_interrupt) {
3144  auto timer = DEBUG_TIMER(__func__);
3146 
3147  CHECK_EQ(1u, selected_fragments.size());
3148  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3149  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3150  auto const& input_descs = ra_exe_unit.input_descs;
3151  using TableId = int;
3152  TableId const selected_table_id = selected_fragments.front().table_id;
3153  size_t const input_descs_index =
3154  get_selected_input_descs_index(selected_table_id, input_descs);
3155  CHECK_LT(input_descs_index, input_descs.size());
3156  size_t const input_col_descs_index =
3157  get_selected_input_col_descs_index(selected_table_id, ra_exe_unit.input_col_descs);
3158  CHECK_LT(input_col_descs_index, ra_exe_unit.input_col_descs.size());
3159  VLOG(2) << "selected_table_id=" << selected_table_id
3160  << " input_descs_index=" << input_descs_index
3161  << " input_col_descs_index=" << input_col_descs_index
3162  << " input_descs=" << shared::printContainer(input_descs)
3163  << " ra_exe_unit.input_col_descs="
3164  << shared::printContainer(ra_exe_unit.input_col_descs);
3165 
3166  std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3167  get_selected_input_col_descs(selected_table_id, ra_exe_unit.input_col_descs);
3168  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3169 
3171  selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3172 
3174  selected_fragments_crossjoin);
3175 
3176  if (allow_runtime_interrupt) {
3177  bool isInterrupted = false;
3178  {
3181  const auto query_session = getCurrentQuerySession(session_read_lock);
3182  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3183  }
3184  if (isInterrupted) {
3186  }
3187  }
3188  std::vector<const int8_t*> frag_col_buffers(
3189  plan_state_->global_to_local_col_ids_.size());
3190  for (const auto& col_id : selected_input_col_descs) {
3191  CHECK(col_id);
3192  const auto cd = try_get_column_descriptor(col_id.get(), cat);
3193  if (cd && cd->isVirtualCol) {
3194  CHECK_EQ("rowid", cd->columnName);
3195  continue;
3196  }
3197  const auto fragments_it = all_tables_fragments.find(selected_table_id);
3198  CHECK(fragments_it != all_tables_fragments.end());
3199  const auto fragments = fragments_it->second;
3200  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3201  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3202  size_t const local_col_id = it->second;
3203  CHECK_LT(local_col_id, plan_state_->global_to_local_col_ids_.size());
3204  constexpr size_t frag_id = 0;
3205  if (fragments->empty()) {
3206  return {};
3207  }
3208  MemoryLevel const memory_level_for_column =
3209  plan_state_->columns_to_fetch_.count({selected_table_id, col_id->getColId()})
3210  ? memory_level
3212  int8_t const* ptr;
3213  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3214  ptr = column_fetcher.getResultSetColumn(
3215  col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3216  } else if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3217  ptr = column_fetcher.getAllTableColumnFragments(selected_table_id,
3218  col_id->getColId(),
3219  all_tables_fragments,
3220  memory_level_for_column,
3221  device_id,
3222  device_allocator,
3223  thread_idx);
3224  } else {
3225  ptr = column_fetcher.getOneTableColumnFragment(selected_table_id,
3226  frag_id,
3227  col_id->getColId(),
3228  all_tables_fragments,
3229  chunks,
3230  chunk_iterators,
3231  memory_level_for_column,
3232  device_id,
3233  device_allocator);
3234  }
3235  // Set frag_col_buffers[i]=ptr for i in mod input_descs.size() range of local_col_id.
3236  set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3237  }
3238  auto const [num_rows, frag_offsets] = getRowCountAndOffsetForAllFrags(
3239  ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3240 
3241  VLOG(2) << "frag_col_buffers=" << shared::printContainer(frag_col_buffers)
3242  << " num_rows=" << shared::printContainer(num_rows)
3243  << " frag_offsets=" << shared::printContainer(frag_offsets)
3244  << " input_descs_index=" << input_descs_index
3245  << " input_col_descs_index=" << input_col_descs_index;
3246  return {{std::move(frag_col_buffers)},
3247  {{num_rows[0][input_descs_index]}},
3248  {{frag_offsets[0][input_descs_index]}}};
3249 }
3250 
3251 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3252  const size_t scan_idx,
3253  const RelAlgExecutionUnit& ra_exe_unit) {
3254  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3255  scan_idx > 0 &&
3256  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3257  !selected_fragments[scan_idx].fragment_ids.empty()) {
3258  // Fetch all fragments
3259  return {size_t(0)};
3260  }
3261 
3262  return selected_fragments[scan_idx].fragment_ids;
3263 }
3264 
3266  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3267  std::vector<size_t>& local_col_to_frag_pos,
3268  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3269  const FragmentsList& selected_fragments,
3270  const RelAlgExecutionUnit& ra_exe_unit) {
3271  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3272  size_t frag_pos{0};
3273  const auto& input_descs = ra_exe_unit.input_descs;
3274  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3275  const int table_id = input_descs[scan_idx].getTableId();
3276  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
3277  selected_fragments_crossjoin.push_back(
3278  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3279  for (const auto& col_id : col_global_ids) {
3280  CHECK(col_id);
3281  const auto& input_desc = col_id->getScanDesc();
3282  if (input_desc.getTableId() != table_id ||
3283  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3284  continue;
3285  }
3286  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3287  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3288  CHECK_LT(static_cast<size_t>(it->second),
3289  plan_state_->global_to_local_col_ids_.size());
3290  local_col_to_frag_pos[it->second] = frag_pos;
3291  }
3292  ++frag_pos;
3293  }
3294 }
3295 
3297  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3298  const FragmentsList& selected_fragments,
3299  const RelAlgExecutionUnit& ra_exe_unit) {
3300  const auto& input_descs = ra_exe_unit.input_descs;
3301  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3302  // selected_fragments is set in assignFragsToKernelDispatch execution_kernel.fragments
3303  if (selected_fragments[0].table_id == input_descs[scan_idx].getTableId()) {
3304  selected_fragments_crossjoin.push_back({size_t(1)});
3305  }
3306  }
3307 }
3308 
3309 namespace {
3310 
3312  public:
3313  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3315  for (auto out : out_vec_) {
3316  delete[] out;
3317  }
3318  }
3319 
3320  private:
3321  std::vector<int64_t*> out_vec_;
3322 };
3323 } // namespace
3324 
3326  const RelAlgExecutionUnit& ra_exe_unit,
3327  const CompilationResult& compilation_result,
3328  const bool hoist_literals,
3329  ResultSetPtr* results,
3330  const std::vector<Analyzer::Expr*>& target_exprs,
3331  const ExecutorDeviceType device_type,
3332  std::vector<std::vector<const int8_t*>>& col_buffers,
3333  QueryExecutionContext* query_exe_context,
3334  const std::vector<std::vector<int64_t>>& num_rows,
3335  const std::vector<std::vector<uint64_t>>& frag_offsets,
3336  Data_Namespace::DataMgr* data_mgr,
3337  const int device_id,
3338  const uint32_t start_rowid,
3339  const uint32_t num_tables,
3340  const bool allow_runtime_interrupt,
3341  RenderInfo* render_info,
3342  const bool optimize_cuda_block_and_grid_sizes,
3343  const int64_t rows_to_process) {
3345  auto timer = DEBUG_TIMER(__func__);
3346  CHECK(!results || !(*results));
3347  if (col_buffers.empty()) {
3348  return 0;
3349  }
3350 
3351  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3352  if (render_info) {
3353  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3354  // here, we are in non-insitu mode.
3355  CHECK(render_info->useCudaBuffers() || !render_info->isInSitu())
3356  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3357  "currently unsupported.";
3358  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3359  }
3360 
3361  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3362  std::vector<int64_t*> out_vec;
3363  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3364  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3365  std::unique_ptr<OutVecOwner> output_memory_scope;
3366  if (allow_runtime_interrupt) {
3367  bool isInterrupted = false;
3368  {
3371  const auto query_session = getCurrentQuerySession(session_read_lock);
3372  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3373  }
3374  if (isInterrupted) {
3376  }
3377  }
3378  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3380  }
3381  if (device_type == ExecutorDeviceType::CPU) {
3382  CpuCompilationContext* cpu_generated_code =
3383  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3384  CHECK(cpu_generated_code);
3385  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3386  cpu_generated_code,
3387  hoist_literals,
3388  hoist_buf,
3389  col_buffers,
3390  num_rows,
3391  frag_offsets,
3392  0,
3393  &error_code,
3394  num_tables,
3395  join_hash_table_ptrs,
3396  rows_to_process);
3397  output_memory_scope.reset(new OutVecOwner(out_vec));
3398  } else {
3399  GpuCompilationContext* gpu_generated_code =
3400  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3401  CHECK(gpu_generated_code);
3402  try {
3403  out_vec = query_exe_context->launchGpuCode(
3404  ra_exe_unit,
3405  gpu_generated_code,
3406  hoist_literals,
3407  hoist_buf,
3408  col_buffers,
3409  num_rows,
3410  frag_offsets,
3411  0,
3412  data_mgr,
3413  blockSize(),
3414  gridSize(),
3415  device_id,
3416  compilation_result.gpu_smem_context.getSharedMemorySize(),
3417  &error_code,
3418  num_tables,
3419  allow_runtime_interrupt,
3420  join_hash_table_ptrs,
3421  render_allocator_map_ptr,
3422  optimize_cuda_block_and_grid_sizes);
3423  output_memory_scope.reset(new OutVecOwner(out_vec));
3424  } catch (const OutOfMemory&) {
3425  return ERR_OUT_OF_GPU_MEM;
3426  } catch (const std::exception& e) {
3427  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3428  }
3429  }
3430  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3431  error_code == Executor::ERR_DIV_BY_ZERO ||
3432  error_code == Executor::ERR_OUT_OF_TIME ||
3433  error_code == Executor::ERR_INTERRUPTED ||
3435  error_code == Executor::ERR_GEOS ||
3437  return error_code;
3438  }
3439  if (ra_exe_unit.estimator) {
3440  CHECK(!error_code);
3441  if (results) {
3442  *results =
3443  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3444  }
3445  return 0;
3446  }
3447  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3448  CHECK(results);
3449  std::vector<int64_t> reduced_outs;
3450  const auto num_frags = col_buffers.size();
3451  const size_t entry_count =
3452  device_type == ExecutorDeviceType::GPU
3453  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3454  ? 1
3455  : blockSize() * gridSize() * num_frags)
3456  : num_frags;
3457  if (size_t(1) == entry_count) {
3458  for (auto out : out_vec) {
3459  CHECK(out);
3460  reduced_outs.push_back(*out);
3461  }
3462  } else {
3463  size_t out_vec_idx = 0;
3464 
3465  for (const auto target_expr : target_exprs) {
3466  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3467  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3468  << target_expr->toString();
3469 
3470  const int num_iterations = agg_info.sql_type.is_geometry()
3471  ? agg_info.sql_type.get_physical_coord_cols()
3472  : 1;
3473 
3474  for (int i = 0; i < num_iterations; i++) {
3475  int64_t val1;
3476  const bool float_argument_input = takes_float_argument(agg_info);
3477  if (is_distinct_target(agg_info) ||
3478  shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3479  bool const check = shared::
3480  is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3481  agg_info.agg_kind);
3482  CHECK(check) << agg_info.agg_kind;
3483  val1 = out_vec[out_vec_idx][0];
3484  error_code = 0;
3485  } else {
3486  const auto chosen_bytes = static_cast<size_t>(
3487  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3488  std::tie(val1, error_code) = Executor::reduceResults(
3489  agg_info.agg_kind,
3490  agg_info.sql_type,
3491  query_exe_context->getAggInitValForIndex(out_vec_idx),
3492  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3493  out_vec[out_vec_idx],
3494  entry_count,
3495  false,
3496  float_argument_input);
3497  }
3498  if (error_code) {
3499  break;
3500  }
3501  reduced_outs.push_back(val1);
3502  if (agg_info.agg_kind == kAVG ||
3503  (agg_info.agg_kind == kSAMPLE &&
3504  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3505  const auto chosen_bytes = static_cast<size_t>(
3506  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3507  1));
3508  int64_t val2;
3509  std::tie(val2, error_code) = Executor::reduceResults(
3510  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3511  agg_info.sql_type,
3512  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3513  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3514  out_vec[out_vec_idx + 1],
3515  entry_count,
3516  false,
3517  false);
3518  if (error_code) {
3519  break;
3520  }
3521  reduced_outs.push_back(val2);
3522  ++out_vec_idx;
3523  }
3524  ++out_vec_idx;
3525  }
3526  }
3527  }
3528 
3529  if (error_code) {
3530  return error_code;
3531  }
3532 
3533  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3534  auto rows_ptr = std::shared_ptr<ResultSet>(
3535  query_exe_context->query_buffers_->result_sets_[0].release());
3536  rows_ptr->fillOneEntry(reduced_outs);
3537  *results = std::move(rows_ptr);
3538  return error_code;
3539 }
3540 
3541 namespace {
3542 
3543 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3544  CHECK(scan_limit);
3545  return results && results->rowCount() < scan_limit;
3546 }
3547 
3548 } // namespace
3549 
3551  const RelAlgExecutionUnit& ra_exe_unit,
3552  const CompilationResult& compilation_result,
3553  const bool hoist_literals,
3554  ResultSetPtr* results,
3555  const ExecutorDeviceType device_type,
3556  std::vector<std::vector<const int8_t*>>& col_buffers,
3557  const std::vector<size_t> outer_tab_frag_ids,
3558  QueryExecutionContext* query_exe_context,
3559  const std::vector<std::vector<int64_t>>& num_rows,
3560  const std::vector<std::vector<uint64_t>>& frag_offsets,
3561  Data_Namespace::DataMgr* data_mgr,
3562  const int device_id,
3563  const int outer_table_id,
3564  const int64_t scan_limit,
3565  const uint32_t start_rowid,
3566  const uint32_t num_tables,
3567  const bool allow_runtime_interrupt,
3568  RenderInfo* render_info,
3569  const bool optimize_cuda_block_and_grid_sizes,
3570  const int64_t rows_to_process) {
3571  auto timer = DEBUG_TIMER(__func__);
3573  // TODO: get results via a separate method, but need to do something with literals.
3574  CHECK(!results || !(*results));
3575  if (col_buffers.empty()) {
3576  return 0;
3577  }
3578  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3579  // TODO(alex):
3580  // 1. Optimize size (make keys more compact).
3581  // 2. Resize on overflow.
3582  // 3. Optimize runtime.
3583  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3584  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3585  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3586  if (allow_runtime_interrupt) {
3587  bool isInterrupted = false;
3588  {
3591  const auto query_session = getCurrentQuerySession(session_read_lock);
3592  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3593  }
3594  if (isInterrupted) {
3596  }
3597  }
3598  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3599  return ERR_INTERRUPTED;
3600  }
3601 
3602  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3603  if (render_info && render_info->useCudaBuffers()) {
3604  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3605  }
3606 
3607  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3608  << " ra_exe_unit.input_descs="
3609  << shared::printContainer(ra_exe_unit.input_descs)
3610  << " ra_exe_unit.input_col_descs="
3611  << shared::printContainer(ra_exe_unit.input_col_descs)
3612  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3613  << " num_rows=" << shared::printContainer(num_rows)
3614  << " frag_offsets=" << shared::printContainer(frag_offsets)
3615  << " query_exe_context->query_buffers_->num_rows_="
3616  << query_exe_context->query_buffers_->num_rows_
3617  << " query_exe_context->query_mem_desc_.getEntryCount()="
3618  << query_exe_context->query_mem_desc_.getEntryCount()
3619  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3620  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3621  << " num_tables=" << num_tables;
3622 
3623  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3624  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3625  // with outer_table_id.
3626  if (ra_exe_unit_copy.union_all) {
3627  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3628  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3629  ra_exe_unit_copy.input_descs.end(),
3630  [outer_table_id](auto const& a, auto const& b) {
3631  return a.getTableId() == outer_table_id &&
3632  b.getTableId() != outer_table_id;
3633  });
3634  while (!ra_exe_unit_copy.input_descs.empty() &&
3635  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3636  ra_exe_unit_copy.input_descs.pop_back();
3637  }
3638  // Filter ra_exe_unit_copy.input_col_descs.
3639  ra_exe_unit_copy.input_col_descs.remove_if(
3640  [outer_table_id](auto const& input_col_desc) {
3641  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3642  });
3643  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3644  }
3645 
3646  if (device_type == ExecutorDeviceType::CPU) {
3647  const int32_t scan_limit_for_query =
3648  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3649  const int32_t max_matched = scan_limit_for_query == 0
3650  ? query_exe_context->query_mem_desc_.getEntryCount()
3651  : scan_limit_for_query;
3652  CpuCompilationContext* cpu_generated_code =
3653  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3654  CHECK(cpu_generated_code);
3655  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3656  cpu_generated_code,
3657  hoist_literals,
3658  hoist_buf,
3659  col_buffers,
3660  num_rows,
3661  frag_offsets,
3662  max_matched,
3663  &error_code,
3664  num_tables,
3665  join_hash_table_ptrs,
3666  rows_to_process);
3667  } else {
3668  try {
3669  GpuCompilationContext* gpu_generated_code =
3670  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3671  CHECK(gpu_generated_code);
3672  query_exe_context->launchGpuCode(
3673  ra_exe_unit_copy,
3674  gpu_generated_code,
3675  hoist_literals,
3676  hoist_buf,
3677  col_buffers,
3678  num_rows,
3679  frag_offsets,
3680  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3681  data_mgr,
3682  blockSize(),
3683  gridSize(),
3684  device_id,
3685  compilation_result.gpu_smem_context.getSharedMemorySize(),
3686  &error_code,
3687  num_tables,
3688  allow_runtime_interrupt,
3689  join_hash_table_ptrs,
3690  render_allocator_map_ptr,
3691  optimize_cuda_block_and_grid_sizes);
3692  } catch (const OutOfMemory&) {
3693  return ERR_OUT_OF_GPU_MEM;
3694  } catch (const OutOfRenderMemory&) {
3695  return ERR_OUT_OF_RENDER_MEM;
3696  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3698  } catch (const std::exception& e) {
3699  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3700  }
3701  }
3702 
3703  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3704  error_code == Executor::ERR_DIV_BY_ZERO ||
3705  error_code == Executor::ERR_OUT_OF_TIME ||
3706  error_code == Executor::ERR_INTERRUPTED ||
3708  error_code == Executor::ERR_GEOS ||
3710  return error_code;
3711  }
3712 
3713  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3714  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3715  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
3716  query_exe_context->query_mem_desc_);
3717  CHECK(*results);
3718  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
3719  (*results)->holdLiterals(hoist_buf);
3720  }
3721  if (error_code < 0 && render_allocator_map_ptr) {
3722  auto const adjusted_scan_limit =
3723  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3724  // More rows passed the filter than available slots. We don't have a count to check,
3725  // so assume we met the limit if a scan limit is set
3726  if (adjusted_scan_limit != 0) {
3727  return 0;
3728  } else {
3729  return error_code;
3730  }
3731  }
3732  if (results && error_code &&
3733  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
3734  return error_code; // unlucky, not enough results and we ran out of slots
3735  }
3736 
3737  return 0;
3738 }
3739 
3740 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3741  const int device_id) {
3742  std::vector<int8_t*> table_ptrs;
3743  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3744  for (auto hash_table : join_hash_tables) {
3745  if (!hash_table) {
3746  CHECK(table_ptrs.empty());
3747  return {};
3748  }
3749  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3750  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3751  }
3752  return table_ptrs;
3753 }
3754 
3755 void Executor::nukeOldState(const bool allow_lazy_fetch,
3756  const std::vector<InputTableInfo>& query_infos,
3757  const PlanState::DeletedColumnsMap& deleted_cols_map,
3758  const RelAlgExecutionUnit* ra_exe_unit) {
3761  const bool contains_left_deep_outer_join =
3762  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3763  ra_exe_unit->join_quals.end(),
3764  [](const JoinCondition& join_condition) {
3765  return join_condition.type == JoinType::LEFT;
3766  }) != ra_exe_unit->join_quals.end();
3767  cgen_state_.reset(
3768  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
3769  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3770  query_infos,
3771  deleted_cols_map,
3772  this));
3773 }
3774 
3775 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3776  const std::vector<InputTableInfo>& query_infos) {
3778  const auto ld_count = input_descs.size();
3779  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3780  for (size_t i = 0; i < ld_count; ++i) {
3781  CHECK_LT(i, query_infos.size());
3782  const auto frag_count = query_infos[i].info.fragments.size();
3783  if (i > 0) {
3784  cgen_state_->frag_offsets_.push_back(nullptr);
3785  } else {
3786  if (frag_count > 1) {
3787  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
3788  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3789  } else {
3790  cgen_state_->frag_offsets_.push_back(nullptr);
3791  }
3792  }
3793  }
3794 }
3795 
3797  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3798  const std::vector<InputTableInfo>& query_infos,
3799  const MemoryLevel memory_level,
3800  const JoinType join_type,
3801  const HashType preferred_hash_type,
3802  ColumnCacheMap& column_cache,
3803  const HashTableBuildDagMap& hashtable_build_dag_map,
3804  const RegisteredQueryHint& query_hint,
3805  const TableIdToNodeMap& table_id_to_node_map) {
3806  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3807  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3808  }
3809  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3811  }
3812  try {
3813  auto tbl = HashJoin::getInstance(qual_bin_oper,
3814  query_infos,
3815  memory_level,
3816  join_type,
3817  preferred_hash_type,
3818  deviceCountForMemoryLevel(memory_level),
3819  column_cache,
3820  this,
3821  hashtable_build_dag_map,
3822  query_hint,
3823  table_id_to_node_map);
3824  return {tbl, ""};
3825  } catch (const HashJoinFail& e) {
3826  return {nullptr, e.what()};
3827  }
3828 }
3829 
3830 int8_t Executor::warpSize() const {
3831  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3832  CHECK(!dev_props.empty());
3833  return dev_props.front().warpSize;
3834 }
3835 
3836 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3837 // exist?
3838 unsigned Executor::gridSize() const {
3839  CHECK(data_mgr_);
3840  const auto cuda_mgr = data_mgr_->getCudaMgr();
3841  if (!cuda_mgr) {
3842  return 0;
3843  }
3844  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3845 }
3846 
3847 unsigned Executor::numBlocksPerMP() const {
3848  return std::max((unsigned)2,
3849  shared::ceil_div(grid_size_x_, cudaMgr()->getMinNumMPsForAllDevices()));
3850 }
3851 
3852 unsigned Executor::blockSize() const {
3853  CHECK(data_mgr_);
3854  const auto cuda_mgr = data_mgr_->getCudaMgr();
3855  if (!cuda_mgr) {
3856  return 0;
3857  }
3858  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3859  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3860 }
3861 
3862 void Executor::setGridSize(unsigned grid_size) {
3863  grid_size_x_ = grid_size;
3864 }
3865 
3867  grid_size_x_ = 0;
3868 }
3869 
3870 void Executor::setBlockSize(unsigned block_size) {
3871  block_size_x_ = block_size;
3872 }
3873 
3875  block_size_x_ = 0;
3876 }
3877 
3879  return max_gpu_slab_size_;
3880 }
3881 
3882 int64_t Executor::deviceCycles(int milliseconds) const {
3883  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3884  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3885 }
3886 
3887 llvm::Value* Executor::castToFP(llvm::Value* value,
3888  SQLTypeInfo const& from_ti,
3889  SQLTypeInfo const& to_ti) {
3891  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3892  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3893  llvm::Type* fp_type{nullptr};
3894  switch (to_ti.get_size()) {
3895  case 4:
3896  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3897  break;
3898  case 8:
3899  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3900  break;
3901  default:
3902  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3903  }
3904  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3905  if (from_ti.get_scale()) {
3906  value = cgen_state_->ir_builder_.CreateFDiv(
3907  value,
3908  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3909  }
3910  }
3911  return value;
3912 }
3913 
3914 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3916  CHECK(val->getType()->isPointerTy());
3917 
3918  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3919  const auto val_type = val_ptr_type->getPointerElementType();
3920  size_t val_width = 0;
3921  if (val_type->isIntegerTy()) {
3922  val_width = val_type->getIntegerBitWidth();
3923  } else {
3924  if (val_type->isFloatTy()) {
3925  val_width = 32;
3926  } else {
3927  CHECK(val_type->isDoubleTy());
3928  val_width = 64;
3929  }
3930  }
3931  CHECK_LT(size_t(0), val_width);
3932  if (bitWidth == val_width) {
3933  return val;
3934  }
3935  return cgen_state_->ir_builder_.CreateBitCast(
3936  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3937 }
3938 
3939 #define EXECUTE_INCLUDE
3940 #include "ArrayOps.cpp"
3941 #include "DateAdd.cpp"
3942 #include "GeoOps.cpp"
3943 #include "RowFunctionOps.cpp"
3944 #include "StringFunctions.cpp"
3946 #undef EXECUTE_INCLUDE
3947 
3948 namespace {
3950  const ColumnDescriptor* deleted_cd) {
3951  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3952  if (deleted_cols_it == deleted_cols_map.end()) {
3953  CHECK(
3954  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3955  } else {
3956  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3957  }
3958 }
3959 } // namespace
3960 
3961 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3962  const RelAlgExecutionUnit& ra_exe_unit,
3963  const CompilationOptions& co) {
3964  if (!co.filter_on_deleted_column) {
3965  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3966  }
3967  auto ra_exe_unit_with_deleted = ra_exe_unit;
3968  PlanState::DeletedColumnsMap deleted_cols_map;
3969  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3970  if (input_table.getSourceType() != InputSourceType::TABLE) {
3971  continue;
3972  }
3973  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3974  CHECK(td);
3975  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3976  if (!deleted_cd) {
3977  continue;
3978  }
3979  CHECK(deleted_cd->columnType.is_boolean());
3980  // check deleted column is not already present
3981  bool found = false;
3982  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3983  if (input_col.get()->getColId() == deleted_cd->columnId &&
3984  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3985  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3986  found = true;
3987  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3988  break;
3989  }
3990  }
3991  if (!found) {
3992  // add deleted column
3993  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3994  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3995  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3996  }
3997  }
3998  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3999 }
4000 
4001 namespace {
4002 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
4003 // scaled epoch value and `false` for overflow/underflow values as the first argument of
4004 // return type.
4005 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
4006  const int64_t chunk_min,
4007  const int64_t chunk_max,
4008  const SQLTypeInfo& lhs_type,
4009  const SQLTypeInfo& rhs_type) {
4010  const int32_t ldim = lhs_type.get_dimension();
4011  const int32_t rdim = rhs_type.get_dimension();
4012  CHECK(ldim != rdim);
4013  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
4014  if (ldim > rdim) {
4015  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
4016  return {true, chunk_min / scale, chunk_max / scale};
4017  }
4018 
4019  using checked_int64_t = boost::multiprecision::number<
4020  boost::multiprecision::cpp_int_backend<64,
4021  64,
4022  boost::multiprecision::signed_magnitude,
4023  boost::multiprecision::checked,
4024  void>>;
4025 
4026  try {
4027  auto ret =
4028  std::make_tuple(true,
4029  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
4030  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
4031  return ret;
4032  } catch (const std::overflow_error& e) {
4033  // noop
4034  }
4035  return std::make_tuple(false, chunk_min, chunk_max);
4036 }
4037 
4038 } // namespace
4039 
4041  const int table_id,
4042  const Fragmenter_Namespace::FragmentInfo& fragment) {
4043  // Skip temporary tables
4044  if (table_id < 0) {
4045  return false;
4046  }
4047 
4048  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
4049  CHECK(td);
4050  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
4051  if (!deleted_cd) {
4052  return false;
4053  }
4054 
4055  const auto& chunk_type = deleted_cd->columnType;
4056  CHECK(chunk_type.is_boolean());
4057 
4058  const auto deleted_col_id = deleted_cd->columnId;
4059  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4060  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4061  const int64_t chunk_min =
4062  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4063  const int64_t chunk_max =
4064  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4065  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4066  // is true (signifying all rows deleted)
4067  return true;
4068  }
4069  }
4070  return false;
4071 }
4072 
4074  const Analyzer::BinOper* comp_expr,
4075  const Analyzer::ColumnVar* lhs_col,
4076  const Fragmenter_Namespace::FragmentInfo& fragment,
4077  const Analyzer::Constant* rhs_const) const {
4078  const int col_id = lhs_col->get_column_id();
4079  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4080  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4082  }
4083  double chunk_min{0.};
4084  double chunk_max{0.};
4085  const auto& chunk_type = lhs_col->get_type_info();
4086  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4087  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4088  if (chunk_min > chunk_max) {
4090  }
4091 
4092  const auto datum_fp = rhs_const->get_constval();
4093  const auto rhs_type = rhs_const->get_type_info().get_type();
4094  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4095 
4096  // Do we need to codegen the constant like the integer path does?
4097  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4098 
4099  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4100  // slightly tricky due to do cleanly as we do not have rowid on this path
4101  switch (comp_expr->get_optype()) {
4102  case kGE:
4103  if (chunk_max < rhs_val) {
4105  }
4106  break;
4107  case kGT:
4108  if (chunk_max <= rhs_val) {
4110  }
4111  break;
4112  case kLE:
4113  if (chunk_min > rhs_val) {
4115  }
4116  break;
4117  case kLT:
4118  if (chunk_min >= rhs_val) {
4120  }
4121  break;
4122  case kEQ:
4123  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4125  }
4126  break;
4127  default:
4128  break;
4129  }
4131 }
4132 
4133 std::pair<bool, int64_t> Executor::skipFragment(
4134  const InputDescriptor& table_desc,
4135  const Fragmenter_Namespace::FragmentInfo& fragment,
4136  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4137  const std::vector<uint64_t>& frag_offsets,
4138  const size_t frag_idx) {
4139  const int table_id = table_desc.getTableId();
4140 
4141  // First check to see if all of fragment is deleted, in which case we know we can skip
4142  if (isFragmentFullyDeleted(table_id, fragment)) {
4143  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4144  << ", fragment id: " << frag_idx;
4145  return {true, -1};
4146  }
4147 
4148  for (const auto& simple_qual : simple_quals) {
4149  const auto comp_expr =
4150  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4151  if (!comp_expr) {
4152  // is this possible?
4153  return {false, -1};
4154  }
4155  const auto lhs = comp_expr->get_left_operand();
4156  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4157  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4158  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4159  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4160  if (lhs_uexpr) {
4161  CHECK(lhs_uexpr->get_optype() ==
4162  kCAST); // We should have only been passed a cast expression
4163  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4164  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4165  continue;
4166  }
4167  } else {
4168  continue;
4169  }
4170  }
4171  const auto rhs = comp_expr->get_right_operand();
4172  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4173  if (!rhs_const) {
4174  // is this possible?
4175  return {false, -1};
4176  }
4177  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4178  !lhs->get_type_info().is_fp()) {
4179  continue;
4180  }
4181  if (lhs->get_type_info().is_fp()) {
4182  const auto fragment_skip_status =
4183  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4184  switch (fragment_skip_status) {
4186  return {true, -1};
4188  return {false, -1};
4190  continue;
4191  default:
4192  UNREACHABLE();
4193  }
4194  }
4195 
4196  // Everything below is logic for integer and integer-backed timestamps
4197  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4198 
4199  if (lhs_col->get_type_info().is_timestamp() &&
4200  rhs_const->get_type_info().is_any<kTIME>()) {
4201  // when casting from a timestamp to time
4202  // is not possible to get a valid range
4203  // so we can't skip any fragment
4204  continue;
4205  }
4206 
4207  const int col_id = lhs_col->get_column_id();
4208  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4209  int64_t chunk_min{0};
4210  int64_t chunk_max{0};
4211  bool is_rowid{false};
4212  size_t start_rowid{0};
4213  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4214  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
4215  if (cd->isVirtualCol) {
4216  CHECK(cd->columnName == "rowid");
4217  const auto& table_generation = getTableGeneration(table_id);
4218  start_rowid = table_generation.start_rowid;
4219  chunk_min = frag_offsets[frag_idx] + start_rowid;
4220  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4221  is_rowid = true;
4222  }
4223  } else {
4224  const auto& chunk_type = lhs_col->get_type_info();
4225  chunk_min =
4226  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4227  chunk_max =
4228  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4229  }
4230  if (chunk_min > chunk_max) {
4231  // invalid metadata range, do not skip fragment
4232  return {false, -1};
4233  }
4234  if (lhs->get_type_info().is_timestamp() &&
4235  (lhs_col->get_type_info().get_dimension() !=
4236  rhs_const->get_type_info().get_dimension()) &&
4237  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4238  rhs_const->get_type_info().is_high_precision_timestamp())) {
4239  // If original timestamp lhs col has different precision,
4240  // column metadata holds value in original precision
4241  // therefore adjust rhs value to match lhs precision
4242 
4243  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4244  // artificially limit the lhs column range. RHS overflow/underflow is already
4245  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4246  bool is_valid;
4247  std::tie(is_valid, chunk_min, chunk_max) =
4249  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4250  if (!is_valid) {
4251  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4252  "value: "
4253  << std::to_string(chunk_min)
4254  << "\nChunk max value: " << std::to_string(chunk_max)
4255  << "\nLHS col precision is: "
4256  << std::to_string(lhs_col->get_type_info().get_dimension())
4257  << "\nRHS precision is: "
4258  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4259  return {false, -1};
4260  }
4261  }
4262  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4263  // It is obvious that a cast from timestamp to date is happening here,
4264  // so we have to correct the chunk min and max values to lower the precision as of
4265  // the date
4266  chunk_min = DateTruncateHighPrecisionToDate(
4267  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4268  chunk_max = DateTruncateHighPrecisionToDate(
4269  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4270  }
4271  llvm::LLVMContext local_context;
4272  CgenState local_cgen_state(local_context);
4273  CodeGenerator code_generator(&local_cgen_state, nullptr);
4274 
4275  const auto rhs_val =
4276  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4277 
4278  switch (comp_expr->get_optype()) {
4279  case kGE:
4280  if (chunk_max < rhs_val) {
4281  return {true, -1};
4282  }
4283  break;
4284  case kGT:
4285  if (chunk_max <= rhs_val) {
4286  return {true, -1};
4287  }
4288  break;
4289  case kLE:
4290  if (chunk_min > rhs_val) {
4291  return {true, -1};
4292  }
4293  break;
4294  case kLT:
4295  if (chunk_min >= rhs_val) {
4296  return {true, -1};
4297  }
4298  break;
4299  case kEQ:
4300  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4301  return {true, -1};
4302  } else if (is_rowid) {
4303  return {false, rhs_val - start_rowid};
4304  }
4305  break;
4306  default:
4307  break;
4308  }
4309  }
4310  return {false, -1};
4311 }
4312 
4313 /*
4314  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4315  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4316  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4317  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4318  * if at least one of these skipFragment calls return a true statment in its first value.
4319  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4320  * second value only if its first value is "false".
4321  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4322  * i.e., we only skip if none of the quals trigger the code to update the
4323  * rowid_lookup_key
4324  * - Only AND operations are valid and considered:
4325  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4326  * the skip
4327  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4328  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4329  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4330  * implemented differently, which cause the first one to skip correctly, but the second
4331  * one will not skip.
4332  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4333  * possible
4334  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4335  * projection, no skipping
4336  */
4337 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4338  const InputDescriptor& table_desc,
4339  const RelAlgExecutionUnit& ra_exe_unit,
4340  const Fragmenter_Namespace::FragmentInfo& fragment,
4341  const std::vector<uint64_t>& frag_offsets,
4342  const size_t frag_idx) {
4343  std::pair<bool, int64_t> skip_frag{false, -1};
4344  for (auto& inner_join : ra_exe_unit.join_quals) {
4345  if (inner_join.type != JoinType::INNER) {
4346  continue;
4347  }
4348 
4349  // extracting all the conjunctive simple_quals from the quals stored for the inner
4350  // join
4351  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4352  for (auto& qual : inner_join.quals) {
4353  auto temp_qual = qual_to_conjunctive_form(qual);
4354  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4355  temp_qual.simple_quals.begin(),
4356  temp_qual.simple_quals.end());
4357  }
4358  auto temp_skip_frag = skipFragment(
4359  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4360  if (temp_skip_frag.second != -1) {
4361  skip_frag.second = temp_skip_frag.second;
4362  return skip_frag;
4363  } else {
4364  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4365  }
4366  }
4367  return skip_frag;
4368 }
4369 
4371  const std::unordered_set<PhysicalInput>& phys_inputs) {
4372  AggregatedColRange agg_col_range_cache;
4373  CHECK(catalog_);
4374  std::unordered_set<int> phys_table_ids;
4375  for (const auto& phys_input : phys_inputs) {
4376  phys_table_ids.insert(phys_input.table_id);
4377  }
4378  std::vector<InputTableInfo> query_infos;
4379  for (const int table_id : phys_table_ids) {
4380  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
4381  }
4382  for (const auto& phys_input : phys_inputs) {
4383  const auto cd =
4384  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4385  CHECK(cd);
4386  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4387  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
4388  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
4389  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4390  agg_col_range_cache.setColRange(phys_input, col_range);
4391  }
4392  }
4393  return agg_col_range_cache;
4394 }
4395 
4397  const std::unordered_set<PhysicalInput>& phys_inputs) {
4398  StringDictionaryGenerations string_dictionary_generations;
4399  CHECK(catalog_);
4400  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4401  // the case then we need to populate them here to make sure that the generations are set
4402  // correctly.
4403  prepare_string_dictionaries(phys_inputs, *catalog_);
4404  for (const auto& phys_input : phys_inputs) {
4405  const auto cd =
4406  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4407  CHECK(cd);
4408  const auto& col_ti =
4409  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4410  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4411  const int dict_id = col_ti.get_comp_param();
4412  const auto dd = catalog_->getMetadataForDict(dict_id);
4413  CHECK(dd && dd->stringDict);
4414  string_dictionary_generations.setGeneration(dict_id,
4415  dd->stringDict->storageEntryCount());
4416  }
4417  }
4418  return string_dictionary_generations;
4419 }
4420 
4422  std::unordered_set<int> phys_table_ids) {
4423  TableGenerations table_generations;
4424  for (const int table_id : phys_table_ids) {
4425  const auto table_info = getTableInfo(table_id);
4426  table_generations.setGeneration(
4427  table_id,
4428  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4429  }
4430  return table_generations;
4431 }
4432 
4433 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4434  const std::unordered_set<int>& phys_table_ids) {
4435  CHECK(catalog_);
4437  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
4438  row_set_mem_owner_->setDictionaryGenerations(
4439  computeStringDictionaryGenerations(phys_inputs));
4441  table_generations_ = computeTableGenerations(phys_table_ids);
4442 }
4443 
4445  return recycler_mutex_;
4446 }
4447 
4449  return query_plan_dag_cache_;
4450 }
4451 
4454 }
4455 
4457  return executor_session_mutex_;
4458 }
4459 
4462  return current_query_session_;
4463 }
4464 
4466  const QuerySessionId& candidate_query_session,
4468  // if current_query_session is equal to the candidate_query_session,
4469  // or it is empty session we consider
4470  return !candidate_query_session.empty() &&
4471  (current_query_session_ == candidate_query_session);
4472 }
4473 
4474 // used only for testing
4476  const QuerySessionId& candidate_query_session,
4478  if (queries_session_map_.count(candidate_query_session) &&
4479  !queries_session_map_.at(candidate_query_session).empty()) {
4480  return queries_session_map_.at(candidate_query_session)
4481  .begin()
4482  ->second.getQueryStatus();
4483  }
4484  return QuerySessionStatus::QueryStatus::UNDEFINED;
4485 }
4486 
4490 }
4491 
4493  const QuerySessionId& query_session_id,
4494  const std::string& query_str,
4495  const std::string& query_submitted_time) {
4496  if (!query_session_id.empty()) {
4497  // if session is valid, do update 1) the exact executor id and 2) query status
4500  query_session_id, query_submitted_time, executor_id_, write_lock);
4501  updateQuerySessionStatusWithLock(query_session_id,
4502  query_submitted_time,
4503  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4504  write_lock);
4505  }
4506  return {query_session_id, query_str};
4507 }
4508 
4510  // check whether we are okay to execute the "pending" query
4511  // i.e., before running the query check if this query session is "ALREADY" interrupted
4513  if (query_session.empty()) {
4514  return;
4515  }
4516  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
4517  // something goes wrong since we assume this is caller's responsibility
4518  // (call this function only for enrolled query session)
4519  if (!queries_session_map_.count(query_session)) {
4520  VLOG(1) << "Interrupting pending query is not available since the query session is "
4521  "not enrolled";
4522  } else {
4523  // here the query session is enrolled but the interrupt flag is not registered
4524  VLOG(1)
4525  << "Interrupting pending query is not available since its interrupt flag is "
4526  "not registered";
4527  }
4528  return;
4529  }
4530  if (queries_interrupt_flag_[query_session]) {
4532  }
4533 }
4534 
4536  const std::string& submitted_time_str) {
4538  // clear the interrupt-related info for a finished query
4539  if (query_session.empty()) {
4540  return;
4541  }
4542  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
4543  if (query_session.compare(current_query_session_) == 0) {
4544  invalidateRunningQuerySession(session_write_lock);
4545  resetInterrupt();
4546  }
4547 }
4548 
4550  const QuerySessionId& query_session,
4551  const std::string& submitted_time_str,
4552  const QuerySessionStatus::QueryStatus new_query_status) {
4553  // update the running query session's the current status
4555  if (query_session.empty()) {
4556  return;
4557  }
4558  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4559  current_query_session_ = query_session;
4560  }
4562  query_session, submitted_time_str, new_query_status, session_write_lock);
4563 }
4564 
4566  const QuerySessionId& query_session,
4567  const std::string& query_str,
4568  const std::string& submitted_time_str,
4569  const size_t executor_id,
4570  const QuerySessionStatus::QueryStatus query_session_status) {
4571  // enroll the query session into the Executor's session map
4573  if (query_session.empty()) {
4574  return;
4575  }
4576 
4577  addToQuerySessionList(query_session,
4578  query_str,
4579  submitted_time_str,
4580  executor_id,
4581  query_session_status,
4582  session_write_lock);
4583 
4584  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4585  current_query_session_ = query_session;
4586  }
4587 }
4588 
4591  return queries_session_map_.size();
4592 }
4593 
4595  const QuerySessionId& query_session,
4596  const std::string& query_str,
4597  const std::string& submitted_time_str,
4598  const size_t executor_id,
4599  const QuerySessionStatus::QueryStatus query_status,
4601  // an internal API that enrolls the query session into the Executor's session map
4602  if (queries_session_map_.count(query_session)) {
4603  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4604  queries_session_map_.at(query_session).erase(submitted_time_str);
4605  queries_session_map_.at(query_session)
4606  .emplace(submitted_time_str,
4607  QuerySessionStatus(query_session,
4608  executor_id,
4609  query_str,
4610  submitted_time_str,
4611  query_status));
4612  } else {
4613  queries_session_map_.at(query_session)
4614  .emplace(submitted_time_str,
4615  QuerySessionStatus(query_session,
4616  executor_id,
4617  query_str,
4618  submitted_time_str,
4619  query_status));
4620  }
4621  } else {
4622  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4623  executor_per_query_map.emplace(
4624  submitted_time_str,
4626  query_session, executor_id, query_str, submitted_time_str, query_status));
4627  queries_session_map_.emplace(query_session, executor_per_query_map);
4628  }
4629  return queries_interrupt_flag_.emplace(query_session, false).second;
4630 }
4631 
4633  const QuerySessionId& query_session,
4634  const std::string& submitted_time_str,
4635  const QuerySessionStatus::QueryStatus updated_query_status,
4637  // an internal API that updates query session status
4638  if (query_session.empty()) {
4639  return false;
4640  }
4641  if (queries_session_map_.count(query_session)) {
4642  for (auto& query_status : queries_session_map_.at(query_session)) {
4643  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4644  // no time difference --> found the target query status
4645  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4646  auto prev_status = query_status.second.getQueryStatus();
4647  if (prev_status == updated_query_status) {
4648  return false;
4649  }
4650  query_status.second.setQueryStatus(updated_query_status);
4651  return true;
4652  }
4653  }
4654  }
4655  return false;
4656 }
4657 
4659  const QuerySessionId& query_session,
4660  const std::string& submitted_time_str,
4661  const size_t executor_id,
4663  // update the executor id of the query session
4664  if (query_session.empty()) {
4665  return false;
4666  }
4667  if (queries_session_map_.count(query_session)) {
4668  auto storage = queries_session_map_.at(query_session);
4669  for (auto it = storage.begin(); it != storage.end(); it++) {
4670  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4671  // no time difference --> found the target query status
4672  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4673  queries_session_map_.at(query_session)
4674  .at(submitted_time_str)
4675  .setExecutorId(executor_id);
4676  return true;
4677  }
4678  }
4679  }
4680  return false;
4681 }
4682 
4684  const QuerySessionId& query_session,
4685  const std::string& submitted_time_str,
4687  if (query_session.empty()) {
4688  return false;
4689  }
4690  if (queries_session_map_.count(query_session)) {
4691  auto& storage = queries_session_map_.at(query_session);
4692  if (storage.size() > 1) {
4693  // in this case we only remove query executor info
4694  for (auto it = storage.begin(); it != storage.end(); it++) {
4695  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4696  // no time difference && have the same executor id--> found the target query
4697  if (it->second.getExecutorId() == executor_id_ &&
4698  submitted_time_str.compare(target_submitted_t_str) == 0) {
4699  storage.erase(it);
4700  return true;
4701  }
4702  }
4703  } else if (storage.size() == 1) {
4704  // here this session only has a single query executor
4705  // so we clear both executor info and its interrupt flag
4706  queries_session_map_.erase(query_session);
4707  queries_interrupt_flag_.erase(query_session);
4708  if (interrupted_.load()) {
4709  interrupted_.store(false);
4710  }
4711  return true;
4712  }
4713  }
4714  return false;
4715 }
4716 
4718  const QuerySessionId& query_session,
4720  if (query_session.empty()) {
4721  return;
4722  }
4723  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4724  queries_interrupt_flag_[query_session] = true;
4725  }
4726 }
4727 
4729  const QuerySessionId& query_session,
4731  if (query_session.empty()) {
4732  return false;
4733  }
4734  auto flag_it = queries_interrupt_flag_.find(query_session);
4735  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4736  flag_it->second;
4737 }
4738 
4740  const QuerySessionId& query_session,
4742  if (query_session.empty()) {
4743  return false;
4744  }
4745  return !query_session.empty() && queries_session_map_.count(query_session);
4746 }
4747 
4749  const double runtime_query_check_freq,
4750  const unsigned pending_query_check_freq) const {
4751  // The only one scenario that we intentionally call this function is
4752  // to allow runtime query interrupt in QueryRunner for test cases.
4753  // Because test machine's default setting does not allow runtime query interrupt,
4754  // so we have to turn it on within test code if necessary.
4756  g_pending_query_interrupt_freq = pending_query_check_freq;
4757  g_running_query_interrupt_freq = runtime_query_check_freq;
4760  }
4761 }
4762 
4763 void Executor::addToCardinalityCache(const std::string& cache_key,
4764  const size_t cache_value) {
4767  cardinality_cache_[cache_key] = cache_value;
4768  VLOG(1) << "Put estimated cardinality to the cache";
4769  }
4770 }
4771 
4775  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4776  VLOG(1) << "Reuse cached cardinality";
4777  return {true, cardinality_cache_[cache_key]};
4778  }
4779  return {false, -1};
4780 }
4781 
4782 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4783  const QuerySessionId& query_session,
4785  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4786  auto& query_infos = queries_session_map_.at(query_session);
4787  std::vector<QuerySessionStatus> ret;
4788  for (auto& info : query_infos) {
4789  ret.push_back(QuerySessionStatus(query_session,
4790  info.second.getExecutorId(),
4791  info.second.getQueryStr(),
4792  info.second.getQuerySubmittedTime(),
4793  info.second.getQueryStatus()));
4794  }
4795  return ret;
4796  }
4797  return {};
4798 }
4799 
4800 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
4801  const QuerySessionId& interrupt_session) const {
4802  std::vector<size_t> res;
4804  auto it = queries_session_map_.find(interrupt_session);
4805  if (it != queries_session_map_.end()) {
4806  for (auto& kv : it->second) {
4807  if (kv.second.getQueryStatus() ==
4808  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4809  res.push_back(kv.second.getExecutorId());
4810  }
4811  }
4812  }
4813  return res;
4814 }
4815 
4817  // this function should be called within an executor which is assigned
4818  // to the specific query thread (that indicates we already enroll the session)
4819  // check whether this is called from non unitary executor
4821  return false;
4822  };
4824  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
4825  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
4826  flag_it->second;
4827 }
4828 
4830  // this function is called under the recycler lock
4831  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
4832  latest_query_plan_extracted_ = query_plan_dag;
4833 }
4834 
4838 }
4839 
4840 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4841 
4842 // contain the interrupt flag's status per query session
4844 // contain a list of queries per query session
4846 // session lock
4848 
4851 
4855 
4857 std::mutex Executor::kernel_mutex_;
4858 
4861 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
4862 // Executor has a single global result set recycler holder
4863 // which contains two recyclers related to query resultset
4866 
4867 // Useful for debugging.
4868 std::string Executor::dumpCache() const {
4869  std::stringstream ss;
4870  ss << "colRangeCache: ";
4871  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
4872  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
4873  << "} = " << exp_range.toString() << ", ";
4874  }
4875  ss << "stringDictGenerations: ";
4876  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4877  ss << "{" << key << "} = " << val << ", ";
4878  }
4879  ss << "tableGenerations: ";
4880  for (auto& [key, val] : table_generations_.asMap()) {
4881  ss << "{" << key << "} = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
4882  }
4883  ss << "\n";
4884  return ss.str();
4885 }
QidScopeGuard set_thread_local_query_id(QueryId const query_id)
Definition: Logger.cpp:486
int get_table_id() const
Definition: Analyzer.h:201
const TableGeneration & getGeneration(const uint32_t id) const
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:224
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1995
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
std::vector< Analyzer::Expr * > target_exprs
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4370
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4748
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:230
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
Definition: Execute.cpp:4835
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2532
std::vector< int > ChunkKey
Definition: types.h:36
double g_running_query_interrupt_freq
Definition: Execute.cpp:129
ExtModuleKinds
Definition: Execute.h:469
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::string get_cuda_libdevice_dir(void)
Definition: CudaMgr.cpp:494
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
Definition: Execute.h:1402
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2825
static QuerySessionMap queries_session_map_
Definition: Execute.h:1397
CudaMgr_Namespace::CudaMgr * cudaMgr() const
Definition: Execute.h:755
unsigned ceil_div(unsigned const dividend, unsigned const divisor)
Definition: misc.h:329
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4728
int64_t kernel_queue_time_ms_
Definition: Execute.h:1379
JoinType
Definition: sqldefs.h:164
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3878
HOST DEVICE int get_size() const
Definition: sqltypes.h:390
bool useCudaBuffers() const
Definition: RenderInfo.cpp:53
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::DataMgr * data_mgr_
Definition: Execute.h:1375
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
Definition: Execute.h:1380
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:83
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
Definition: Execute.cpp:1267
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
Definition: Execute.cpp:269
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:616
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:4509
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
Definition: Execute.cpp:585
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1634
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1439
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
Definition: sqltypes.h:64
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3740
void setEntryCount(const size_t val)
input_table_info_cache_(this)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
Definition: Execute.cpp:3116
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:683
std::atomic< bool > interrupted_
Definition: Execute.h:1359
static ResultSetRecyclerHolder resultset_recycler_holder_
Definition: Execute.h:1424
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2720
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:4005
ExecutorDeviceType
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:2075
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
double extract_max_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static const int max_gpu_count
Definition: Execute.h:1352
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:3313
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:128
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:153
#define LOG(tag)
Definition: Logger.h:216
void freeLinearizedBuf()
std::string QueryPlanDAG
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:762
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:996
bool is_fp() const
Definition: sqltypes.h:580
HOST DEVICE int get_scale() const
Definition: sqltypes.h:385
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:85
Definition: sqldefs.h:34
const std::list< Analyzer::OrderEntry > order_entries
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:189
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:746
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:4549
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:434
Definition: sqldefs.h:35
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1457
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3961
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:4421
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:735
#define UNREACHABLE()
Definition: Logger.h:266
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
Definition: Execute.cpp:196
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1572
ExpressionRange getColRange(const PhysicalInput &) const
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:235
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:1070
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:48
Definition: sqldefs.h:29
Functions to support geospatial operations used by the executor.
const StringDictionaryProxy::TranslationMap< Datum > * getStringProxyNumericTranslationMap(const int source_dict_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:604
QuerySessionId current_query_session_
Definition: Execute.h:1393
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:379
heavyai::shared_mutex & getSessionLock()
Definition: Execute.cpp:4456
static const int32_t ERR_GEOS
Definition: Execute.h:1445
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1389
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1357
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1319
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:2244
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4565
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1483
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:102
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3325
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1356
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2666
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:380
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
Definition: Execute.cpp:4073
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:1055
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:645
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3914
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:703
void reset(bool discard_runtime_modules_only=false)
Definition: Execute.cpp:298
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1466
bool g_is_test_env
Definition: Execute.cpp:141
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3847
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:581
#define CHECK_GT(x, y)
Definition: Logger.h:234
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:653
void resetGridSize()
Definition: Execute.cpp:3866
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4465
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:2157
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2435
std::vector< FragmentsPerTable > FragmentsList
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
bool is_time() const
Definition: sqltypes.h:582
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2902
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3550
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:88
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:408
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4632
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const int source_dict_id, const int dest_dict_id, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:566
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:102
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:4816
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:500
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:91
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4683
std::vector< Analyzer::Expr * > target_exprs_union
void populate_string_dictionary(const int32_t table_id, const int32_t col_id, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:206
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:691
constexpr double a
Definition: Utm.h:32
bool g_enable_string_functions
static const size_t high_scan_limit
Definition: Execute.h:608
Definition: sqldefs.h:75
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:81
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:478
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3775
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:4040
SQLOps get_optype() const
Definition: Analyzer.h:447
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3131
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1443
const ExecutorId executor_id_
Definition: Execute.h:1293
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:232
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4658
int8_t warpSize() const
Definition: Execute.cpp:3830
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:86
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:1370
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
Definition: Execute.cpp:1288
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1430
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:2341
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:664
int8_t groupColWidth(const size_t key_idx) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)