OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
41 #include "Parser/ParserNode.h"
73 #include "Shared/checked_alloc.h"
74 #include "Shared/measure.h"
75 #include "Shared/misc.h"
76 #include "Shared/scope.h"
77 #include "Shared/shard_key.h"
78 #include "Shared/threading.h"
79 
80 bool g_enable_watchdog{false};
86 size_t g_cpu_sub_task_size{500'000};
87 bool g_enable_filter_function{true};
88 unsigned g_dynamic_watchdog_time_limit{10000};
89 bool g_allow_cpu_retry{true};
90 bool g_allow_query_step_cpu_retry{true};
91 bool g_null_div_by_zero{false};
92 unsigned g_trivial_loop_join_threshold{1000};
93 bool g_from_table_reordering{true};
94 bool g_inner_join_fragment_skipping{true};
95 extern bool g_enable_smem_group_by;
96 extern std::unique_ptr<llvm::Module> udf_gpu_module;
97 extern std::unique_ptr<llvm::Module> udf_cpu_module;
98 bool g_enable_filter_push_down{false};
99 float g_filter_push_down_low_frac{-1.0f};
100 float g_filter_push_down_high_frac{-1.0f};
101 size_t g_filter_push_down_passing_row_ubound{0};
102 bool g_enable_columnar_output{false};
103 bool g_enable_left_join_filter_hoisting{true};
104 bool g_optimize_row_initialization{true};
105 bool g_enable_bbox_intersect_hashjoin{true};
106 size_t g_num_tuple_threshold_switch_to_baseline{100000};
107 size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline{100};
108 bool g_enable_distance_rangejoin{true};
109 bool g_enable_hashjoin_many_to_many{true};
110 size_t g_bbox_intersect_max_table_size_bytes{1024 * 1024 * 1024};
111 double g_bbox_intersect_target_entries_per_bin{1.3};
112 bool g_strip_join_covered_quals{false};
113 size_t g_constrained_by_in_threshold{10};
114 size_t g_default_max_groups_buffer_entry_guess{16384};
115 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
116 bool g_enable_window_functions{true};
117 bool g_enable_table_functions{true};
118 bool g_enable_ml_functions{true};
119 bool g_restrict_ml_model_metadata_to_superusers{false};
120 bool g_enable_dev_table_functions{false};
121 bool g_enable_geo_ops_on_uncompressed_coords{true};
122 bool g_enable_rf_prop_table_functions{true};
123 bool g_allow_memory_status_log{true};
124 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
125 size_t g_min_memory_allocation_size{
126  256}; // minimum memory allocation required for projection query output buffer
127  // without pre-flight count
128 bool g_enable_bump_allocator{false};
129 double g_bump_allocator_step_reduction{0.75};
130 bool g_enable_direct_columnarization{true};
131 extern bool g_enable_string_functions;
132 bool g_enable_lazy_fetch{true};
133 bool g_enable_runtime_query_interrupt{true};
134 bool g_enable_non_kernel_time_query_interrupt{true};
135 bool g_use_estimator_result_cache{true};
136 unsigned g_pending_query_interrupt_freq{1000};
137 double g_running_query_interrupt_freq{0.1};
138 size_t g_gpu_smem_threshold{
139  4096}; // GPU shared memory threshold (in bytes), if larger
140  // buffer sizes are required we do not use GPU shared
141  // memory optimizations Setting this to 0 means unlimited
142  // (subject to other dynamically calculated caps)
143 bool g_enable_smem_grouped_non_count_agg{
144  true}; // enable use of shared memory when performing group-by with select non-count
145  // aggregates
146 bool g_enable_smem_non_grouped_agg{
147  true}; // enable optimizations for using GPU shared memory in implementation of
148  // non-grouped aggregates
149 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
150  // limits the allocation for the output buffer arena
151  // and data recycler test
152 size_t g_enable_parallel_linearization{
153  10000}; // # rows that we are trying to linearize varlen col in parallel
154 bool g_enable_data_recycler{true};
155 bool g_use_hashtable_cache{true};
156 bool g_use_query_resultset_cache{true};
157 bool g_use_chunk_metadata_cache{true};
158 bool g_allow_auto_resultset_caching{false};
159 bool g_allow_query_step_skipping{true};
160 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
161 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
162 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
163 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
164 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
165 bool g_optimize_cuda_block_and_grid_sizes{false};
166 
167 size_t g_approx_quantile_buffer{1000};
168 size_t g_approx_quantile_centroids{300};
169 
170 bool g_enable_automatic_ir_metadata{true};
171 
172 size_t g_max_log_length{500};
173 
174 bool g_enable_executor_resource_mgr{true};
175 
176 double g_executor_resource_mgr_cpu_result_mem_ratio{0.8};
177 size_t g_executor_resource_mgr_cpu_result_mem_bytes{Executor::auto_cpu_mem_bytes};
178 double g_executor_resource_mgr_per_query_max_cpu_slots_ratio{0.9};
179 double g_executor_resource_mgr_per_query_max_cpu_result_mem_ratio{0.8};
180 
181 // Todo: rework ConcurrentResourceGrantPolicy and ExecutorResourcePool to allow
182 // thresholds for concurrent oversubscription, rather than just boolean allowed/disallowed
183 bool g_executor_resource_mgr_allow_cpu_kernel_concurrency{true};
184 bool g_executor_resource_mgr_allow_cpu_gpu_kernel_concurrency{true};
185 // Whether a single query can oversubscribe CPU slots should be controlled with
186 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
187 bool g_executor_resource_mgr_allow_cpu_slot_oversubscription_concurrency{false};
188 // Whether a single query can oversubscribe CPU memory should be controlled with
189 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
190 bool g_executor_resource_mgr_allow_cpu_result_mem_oversubscription_concurrency{false};
191 double g_executor_resource_mgr_max_available_resource_use_ratio{0.8};
192 
193 extern bool g_cache_string_hash;
194 extern bool g_allow_memory_status_log;
195 
196 int const Executor::max_gpu_count;
197 
198 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
199 
200 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
201 
202 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
203  const std::string& udf_ir_filename,
204  llvm::LLVMContext& ctx);
205 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
206  const std::string& udf_ir_filename,
207  llvm::LLVMContext& ctx,
208  bool is_gpu = false);
209 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
210  const std::string& udf_ir_string,
211  llvm::LLVMContext& ctx,
212  bool is_gpu = false);
213 
214 namespace {
215 // This function is notably different from that in RelAlgExecutor because it already
216 // expects SPI values and therefore needs to avoid that transformation.
217 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
218  for (const auto [col_id, table_id, db_id] : phys_inputs) {
219  foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
220  }
221 }
222 
223 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
224  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
225  // The fragmenter always returns at least one fragment, even when the table is empty.
226  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
227 }
228 } // namespace
229 
230 namespace foreign_storage {
231 // Foreign tables skip the population of dictionaries during metadata scan. This function
232 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
233 void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id) {
234  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
235  CHECK(catalog);
236  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
237  catalog->getMetadataForTable(table_id, false))) {
238  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
239  if (col_desc->columnType.is_dict_encoded_type()) {
240  auto& fragmenter = foreign_table->fragmenter;
241  CHECK(fragmenter != nullptr);
242  if (is_empty_table(fragmenter.get())) {
243  return;
244  }
245  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
246  ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
247  // If the key is sharded across leaves, only populate fragments that are sharded
248  // to this leaf.
249  if (key_does_not_shard_to_leaf(chunk_key)) {
250  continue;
251  }
252 
253  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
254  CHECK(metadata_map.find(col_id) != metadata_map.end());
255  if (auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
256  // When this goes out of scope it will stay in CPU cache but become
257  // evictable
258  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
259  &(catalog->getDataMgr()),
260  chunk_key,
262  0,
263  0,
264  0);
265  }
266  }
267  }
268  }
269 }
270 } // namespace foreign_storage
271 
272 Executor::Executor(const ExecutorId executor_id,
273  Data_Namespace::DataMgr* data_mgr,
274  const size_t block_size_x,
275  const size_t grid_size_x,
276  const size_t max_gpu_slab_size,
277  const std::string& debug_dir,
278  const std::string& debug_file)
279  : executor_id_(executor_id)
280  , context_(new llvm::LLVMContext())
281  , cgen_state_(new CgenState({}, false, this))
282  , block_size_x_(block_size_x)
283  , grid_size_x_(grid_size_x)
284  , max_gpu_slab_size_(max_gpu_slab_size)
285  , debug_dir_(debug_dir)
286  , debug_file_(debug_file)
287  , data_mgr_(data_mgr)
288  , temporary_tables_(nullptr)
291  update_extension_modules();
292 }
293 
298  auto root_path = heavyai::get_root_abs_path();
299  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
300  CHECK(boost::filesystem::exists(template_path));
302  template_path;
303 #ifdef ENABLE_GEOS
304  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
305  CHECK(boost::filesystem::exists(rt_geos_path));
307  rt_geos_path;
308 #endif
309 #ifdef HAVE_CUDA
310  auto rt_libdevice_path = get_cuda_libdevice_dir() + "/libdevice.10.bc";
311  if (boost::filesystem::exists(rt_libdevice_path)) {
313  rt_libdevice_path;
314  } else {
315  LOG(WARNING) << "File " << rt_libdevice_path
316  << " does not exist; support for some UDF "
317  "functions might not be available.";
318  }
319 #endif
320  }
321 }
322 
323 void Executor::reset(bool discard_runtime_modules_only) {
324  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
325  auto qe = QueryEngine::getInstance();
326  qe->s_code_accessor->clear();
327  qe->s_stubs_accessor->clear();
328  qe->cpu_code_accessor->clear();
329  qe->gpu_code_accessor->clear();
330  qe->tf_code_accessor->clear();
331 
332  if (discard_runtime_modules_only) {
333  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
334 #ifdef HAVE_CUDA
335  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
336 #endif
337  cgen_state_->module_ = nullptr;
338  } else {
339  extension_modules_.clear();
340  cgen_state_.reset();
341  context_.reset(new llvm::LLVMContext());
342  cgen_state_.reset(new CgenState({}, false, this));
343  }
344 }
345 
346 void Executor::update_extension_modules(bool update_runtime_modules_only) {
347  auto read_module = [&](Executor::ExtModuleKinds module_kind,
348  const std::string& source) {
349  /*
350  source can be either a filename of a LLVM IR
351  or LLVM BC source, or a string containing
352  LLVM IR code.
353  */
354  CHECK(!source.empty());
355  switch (module_kind) {
359  return read_llvm_module_from_bc_file(source, getContext());
360  }
362  return read_llvm_module_from_ir_file(source, getContext(), false);
363  }
365  return read_llvm_module_from_ir_file(source, getContext(), true);
366  }
368  return read_llvm_module_from_ir_string(source, getContext(), false);
369  }
371  return read_llvm_module_from_ir_string(source, getContext(), true);
372  }
373  default: {
374  UNREACHABLE();
375  return std::unique_ptr<llvm::Module>();
376  }
377  }
378  };
379  auto update_module = [&](Executor::ExtModuleKinds module_kind,
380  bool erase_not_found = false) {
381  auto it = Executor::extension_module_sources.find(module_kind);
382  if (it != Executor::extension_module_sources.end()) {
383  auto llvm_module = read_module(module_kind, it->second);
384  if (llvm_module) {
385  extension_modules_[module_kind] = std::move(llvm_module);
386  } else if (erase_not_found) {
387  extension_modules_.erase(module_kind);
388  } else {
389  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
390  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
391  << " LLVM module. The module will be unavailable.";
392  } else {
393  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
394  << " LLVM module. Using the existing module.";
395  }
396  }
397  } else {
398  if (erase_not_found) {
399  extension_modules_.erase(module_kind);
400  } else {
401  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
402  LOG(WARNING) << "Source of " << ::toString(module_kind)
403  << " LLVM module is unavailable. The module will be unavailable.";
404  } else {
405  LOG(WARNING) << "Source of " << ::toString(module_kind)
406  << " LLVM module is unavailable. Using the existing module.";
407  }
408  }
409  }
410  };
411 
412  if (!update_runtime_modules_only) {
413  // required compile-time modules, their requirements are enforced
414  // by Executor::initialize_extension_module_sources():
416 #ifdef ENABLE_GEOS
418 #endif
419  // load-time modules, these are optional:
420  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
421 #ifdef HAVE_CUDA
422  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
424 #endif
425  }
426  // run-time modules, these are optional and erasable:
427  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
428 #ifdef HAVE_CUDA
429  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
430 #endif
431 }
432 
433 // Used by StubGenerator::generateStub
435  : executor_(executor)
436  , lock_queue_clock_(timer_start())
437  , lock_(executor_.compilation_mutex_)
438  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
439 {
440  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
441  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
442 }
443 
445  Executor& executor,
446  const bool allow_lazy_fetch,
447  const std::vector<InputTableInfo>& query_infos,
448  const PlanState::DeletedColumnsMap& deleted_cols_map,
449  const RelAlgExecutionUnit* ra_exe_unit)
450  : executor_(executor)
451  , lock_queue_clock_(timer_start())
453  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
454 {
455  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
456  // nukeOldState creates new CgenState and PlanState instances for
457  // the subsequent code generation. It also resets
458  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
459  // not currently restore.. should we accumulate these timings?
460  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
461 }
462 
464  // prevent memory leak from hoisted literals
465  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
466  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
467  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
468  // The llvm::Value instance stored in p.first is created by the
469  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
470  p.first->deleteValue();
471  }
472  }
473  executor_.cgen_state_->row_func_hoisted_literals_.clear();
474 
475  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
476  // to the old CgenState instance as the execution of the generated
477  // code uses these bitmaps
478 
479  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
480  cgen_state_->moveInValuesBitmap(bm);
481  }
482  executor_.cgen_state_->in_values_bitmaps_.clear();
483 
484  for (auto& str_dict_translation_mgr :
485  executor_.cgen_state_->str_dict_translation_mgrs_) {
486  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
487  }
488  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
489 
490  for (auto& tree_model_prediction_mgr :
491  executor_.cgen_state_->tree_model_prediction_mgrs_) {
492  cgen_state_->moveTreeModelPredictionMgr(std::move(tree_model_prediction_mgr));
493  }
494  executor_.cgen_state_->tree_model_prediction_mgrs_.clear();
495 
496  // Delete worker module that may have been set by
497  // set_module_shallow_copy. If QueryMustRunOnCpu is thrown, the
498  // worker module is not instantiated, so the worker module needs to
499  // be deleted conditionally [see "Managing LLVM modules" comment in
500  // CgenState.h]:
501  if (executor_.cgen_state_->module_) {
502  delete executor_.cgen_state_->module_;
503  }
504 
505  // restore the old CgenState instance
506  executor_.cgen_state_.reset(cgen_state_.release());
507 }
508 
509 std::shared_ptr<Executor> Executor::getExecutor(
510  const ExecutorId executor_id,
511  const std::string& debug_dir,
512  const std::string& debug_file,
513  const SystemParameters& system_parameters) {
515  auto it = executors_.find(executor_id);
516  if (it != executors_.end()) {
517  return it->second;
518  }
520  auto executor = std::make_shared<Executor>(executor_id,
521  &data_mgr,
522  system_parameters.cuda_block_size,
523  system_parameters.cuda_grid_size,
524  system_parameters.max_gpu_slab_size,
525  debug_dir,
526  debug_file);
527  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
528  return executor;
529 }
530 
532  switch (memory_level) {
536  execute_mutex_); // Don't flush memory while queries are running
537 
538  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
539  // The hash table cache uses CPU memory not managed by the buffer manager. In the
540  // future, we should manage these allocations with the buffer manager directly.
541  // For now, assume the user wants to purge the hash table cache when they clear
542  // CPU memory (currently used in ExecuteTest to lower memory pressure)
543  // TODO: Move JoinHashTableCacheInvalidator to Executor::clearExternalCaches();
545  }
546  Executor::clearExternalCaches(true, nullptr, 0);
548  break;
549  }
550  default: {
551  throw std::runtime_error(
552  "Clearing memory levels other than the CPU level or GPU level is not "
553  "supported.");
554  }
555  }
556 }
557 
559  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
560 }
561 
563  const shared::StringDictKey& dict_id_in,
564  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
565  const bool with_generation) const {
566  CHECK(row_set_mem_owner);
567  std::lock_guard<std::mutex> lock(
568  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
569  return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
570 }
571 
573  const shared::StringDictKey& dict_key_in,
574  const bool with_generation) {
575  const int dict_id{dict_key_in.dict_id < 0 ? REGULAR_DICT(dict_key_in.dict_id)
576  : dict_key_in.dict_id};
577  const auto catalog =
579  if (catalog) {
580  const auto dd = catalog->getMetadataForDict(dict_id);
581  if (dd) {
582  auto dict_key = dict_key_in;
583  dict_key.dict_id = dict_id;
584  CHECK(dd->stringDict);
585  CHECK_LE(dd->dictNBits, 32);
586  const int64_t generation =
587  with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
588  return addStringDict(dd->stringDict, dict_key, generation);
589  }
590  }
592  if (!lit_str_dict_proxy_) {
593  DictRef literal_dict_ref(dict_key_in.db_id, DictRef::literalsDictId);
594  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
595  literal_dict_ref, "", false, true, g_cache_string_hash);
596  lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
597  tsd, shared::StringDictKey{literal_dict_ref.dbId, literal_dict_ref.dictId}, 0);
598  }
599  return lit_str_dict_proxy_.get();
600 }
601 
603  const shared::StringDictKey& source_dict_key,
604  const shared::StringDictKey& dest_dict_key,
605  const RowSetMemoryOwner::StringTranslationType translation_type,
606  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
607  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
608  const bool with_generation) const {
609  CHECK(row_set_mem_owner);
610  std::lock_guard<std::mutex> lock(
611  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
612  return row_set_mem_owner->getOrAddStringProxyTranslationMap(
613  source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
614 }
615 
618  const StringDictionaryProxy* source_proxy,
619  StringDictionaryProxy* dest_proxy,
620  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
621  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
622  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
623  CHECK(row_set_mem_owner);
624  std::lock_guard<std::mutex> lock(
625  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
626  // First translate lhs onto itself if there are string ops
627  if (!dest_string_op_infos.empty()) {
628  row_set_mem_owner->addStringProxyUnionTranslationMap(
629  dest_proxy, dest_proxy, dest_string_op_infos);
630  }
631  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
632  source_proxy, dest_proxy, source_string_op_infos);
633 }
634 
637  const shared::StringDictKey& source_dict_key,
638  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
639  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
640  const bool with_generation) const {
641  CHECK(row_set_mem_owner);
642  std::lock_guard<std::mutex> lock(
643  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
644  return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
645  source_dict_key, with_generation, string_op_infos);
646 }
647 
649  const shared::StringDictKey& source_dict_key_in,
650  const shared::StringDictKey& dest_dict_key_in,
651  const bool with_generation,
652  const RowSetMemoryOwner::StringTranslationType translation_type,
653  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
654  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
655  const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
657  return addStringProxyIntersectionTranslationMap(
658  source_proxy, dest_proxy, string_op_infos);
659  } else {
660  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
661  }
662 }
663 
666  const shared::StringDictKey& source_dict_key_in,
667  const bool with_generation,
668  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
669  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
670  return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
671 }
672 
674  std::lock_guard<std::mutex> lock(state_mutex_);
675  return t_digests_
676  .emplace_back(std::make_unique<quantile::TDigest>(
678  .get();
679 }
680 
681 bool Executor::isCPUOnly() const {
682  CHECK(data_mgr_);
683  return !data_mgr_->getCudaMgr();
684 }
685 
687  const Analyzer::ColumnVar* col_var) const {
688  return get_column_descriptor_maybe(col_var->getColumnKey());
689 }
690 
692  const Analyzer::ColumnVar* col_var,
693  int n) const {
694  const auto cd = getColumnDescriptor(col_var);
695  if (!cd || n > cd->columnType.get_physical_cols()) {
696  return nullptr;
697  }
698  auto column_key = col_var->getColumnKey();
699  column_key.column_id += n;
700  return get_column_descriptor_maybe(column_key);
701 }
702 
703 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
704  return row_set_mem_owner_;
705 }
706 
708  return temporary_tables_;
709 }
710 
712  const shared::TableKey& table_key) const {
713  return input_table_info_cache_.getTableInfo(table_key);
714 }
715 
717  const shared::TableKey& table_key) const {
718  return table_generations_.getGeneration(table_key);
719 }
720 
722  return agg_col_range_cache_.getColRange(phys_input);
723 }
724 
725 namespace {
726 
727 void log_system_memory_info_impl(std::string const& mem_log,
728  size_t executor_id,
729  size_t log_time_ms,
730  std::string const& log_tag,
731  size_t const thread_idx) {
732  std::ostringstream oss;
733  oss << mem_log;
734  oss << " (" << log_tag << ", EXECUTOR-" << executor_id << ", THREAD-" << thread_idx
735  << ", TOOK: " << log_time_ms << " ms)";
736  VLOG(1) << oss.str();
737 }
738 } // namespace
739 
740 void Executor::logSystemCPUMemoryStatus(std::string const& log_tag,
741  size_t const thread_idx) const {
743  auto timer = timer_start();
744  std::ostringstream oss;
745  oss << getDataMgr()->getSystemMemoryUsage();
747  oss.str(), executor_id_, timer_stop(timer), log_tag, thread_idx);
748  }
749 }
750 
751 void Executor::logSystemGPUMemoryStatus(std::string const& log_tag,
752  size_t const thread_idx) const {
753 #ifdef HAVE_CUDA
754  if (g_allow_memory_status_log && getDataMgr() && getDataMgr()->gpusPresent() &&
755  getDataMgr()->getCudaMgr()) {
756  auto timer = timer_start();
757  auto mem_log = getDataMgr()->getCudaMgr()->getCudaMemoryUsageInString();
759  mem_log, executor_id_, timer_stop(timer), log_tag, thread_idx);
760  }
761 #endif
762 }
763 
764 namespace {
765 
766 size_t get_col_byte_width(const shared::ColumnKey& column_key) {
767  if (column_key.table_id < 0) {
768  // We have an intermediate results table
769 
770  // Todo(todd): Get more accurate representation of column width
771  // for intermediate tables
772  return size_t(8);
773  } else {
774  const auto cd = Catalog_Namespace::get_metadata_for_column(column_key);
775  const auto& ti = cd->columnType;
776  const auto sz = ti.get_size();
777  if (sz < 0) {
778  // for varlen types, only account for the pointer/size for each row, for now
779  if (ti.is_logical_geo_type()) {
780  // Don't count size for logical geo types, as they are
781  // backed by physical columns
782  return size_t(0);
783  } else {
784  return size_t(16);
785  }
786  } else {
787  return sz;
788  }
789  }
790 }
791 
792 } // anonymous namespace
793 
794 std::map<shared::ColumnKey, size_t> Executor::getColumnByteWidthMap(
795  const std::set<shared::TableKey>& table_ids_to_fetch,
796  const bool include_lazy_fetched_cols) const {
797  std::map<shared::ColumnKey, size_t> col_byte_width_map;
798 
799  for (const auto& fetched_col : plan_state_->getColumnsToFetch()) {
800  if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
801  continue;
802  }
803  const size_t col_byte_width = get_col_byte_width(fetched_col);
804  CHECK(col_byte_width_map.insert({fetched_col, col_byte_width}).second);
805  }
806  if (include_lazy_fetched_cols) {
807  for (const auto& lazy_fetched_col : plan_state_->getColumnsToNotFetch()) {
808  if (table_ids_to_fetch.count({lazy_fetched_col.db_id, lazy_fetched_col.table_id}) ==
809  0) {
810  continue;
811  }
812  const size_t col_byte_width = get_col_byte_width(lazy_fetched_col);
813  CHECK(col_byte_width_map.insert({lazy_fetched_col, col_byte_width}).second);
814  }
815  }
816  return col_byte_width_map;
817 }
818 
820  const std::set<shared::TableKey>& table_ids_to_fetch) const {
821  size_t num_bytes = 0;
822  if (!plan_state_) {
823  return 0;
824  }
825  for (const auto& fetched_col : plan_state_->getColumnsToFetch()) {
826  if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
827  continue;
828  }
829 
830  if (fetched_col.table_id < 0) {
831  num_bytes += 8;
832  } else {
834  {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
835  const auto& ti = cd->columnType;
836  const auto sz = ti.get_size();
837  if (sz < 0) {
838  // for varlen types, only account for the pointer/size for each row, for now
839  if (!ti.is_logical_geo_type()) {
840  // Don't count size for logical geo types, as they are
841  // backed by physical columns
842  num_bytes += 16;
843  }
844  } else {
845  num_bytes += sz;
846  }
847  }
848  }
849  return num_bytes;
850 }
851 
853  const ExecutorDeviceType device_type,
854  const std::vector<InputDescriptor>& input_descs,
855  const std::vector<InputTableInfo>& query_infos,
856  const std::vector<std::pair<int32_t, FragmentsList>>& kernel_fragment_lists) const {
857  using TableFragmentId = std::pair<shared::TableKey, int32_t>;
858  using TableFragmentSizeMap = std::map<TableFragmentId, size_t>;
859 
860  /* Calculate bytes per column */
861 
862  // Only fetch lhs table ids for now...
863  // Allows us to cleanly lower number of kernels in flight to save
864  // buffer pool space, but is not a perfect estimate when big rhs
865  // join tables are involved. Will revisit.
866 
867  std::set<shared::TableKey> lhs_table_keys;
868  for (const auto& input_desc : input_descs) {
869  if (input_desc.getNestLevel() == 0) {
870  lhs_table_keys.insert(input_desc.getTableKey());
871  }
872  }
873 
874  const bool include_lazy_fetch_cols = device_type == ExecutorDeviceType::CPU;
875  const auto column_byte_width_map =
876  getColumnByteWidthMap(lhs_table_keys, include_lazy_fetch_cols);
877 
878  /* Calculate the byte width per row (sum of all columns widths)
879  Assumes each fragment touches the same columns, which is a DB-wide
880  invariant for now */
881 
882  size_t const byte_width_per_row =
883  std::accumulate(column_byte_width_map.begin(),
884  column_byte_width_map.end(),
885  size_t(0),
886  [](size_t sum, auto& col_entry) { return sum + col_entry.second; });
887 
888  /* Calculate num tuples for all fragments */
889 
890  TableFragmentSizeMap all_table_fragments_size_map;
891 
892  for (auto& query_info : query_infos) {
893  const auto& table_key = query_info.table_key;
894  for (const auto& frag : query_info.info.fragments) {
895  const int32_t frag_id = frag.fragmentId;
896  const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
897  const size_t fragment_num_tuples = frag.getNumTuples(); // num_tuples;
898  all_table_fragments_size_map.insert(
899  std::make_pair(table_frag_id, fragment_num_tuples));
900  }
901  }
902 
903  /* Calculate num tuples only for fragments actually touched by query
904  Also calculate the num bytes needed for each kernel */
905 
906  TableFragmentSizeMap query_table_fragments_size_map;
907  std::vector<size_t> bytes_per_kernel;
908  bytes_per_kernel.reserve(kernel_fragment_lists.size());
909 
910  size_t max_kernel_bytes{0};
911 
912  for (auto& kernel_frag_list : kernel_fragment_lists) {
913  size_t kernel_bytes{0};
914  const auto frag_list = kernel_frag_list.second;
915  for (const auto& table_frags : frag_list) {
916  const auto& table_key = table_frags.table_key;
917  for (const size_t frag_id : table_frags.fragment_ids) {
918  const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
919  const size_t fragment_num_tuples = all_table_fragments_size_map[table_frag_id];
920  kernel_bytes += fragment_num_tuples * byte_width_per_row;
921  query_table_fragments_size_map.insert(
922  std::make_pair(table_frag_id, fragment_num_tuples));
923  }
924  }
925  bytes_per_kernel.emplace_back(kernel_bytes);
926  if (kernel_bytes > max_kernel_bytes) {
927  max_kernel_bytes = kernel_bytes;
928  }
929  }
930 
931  /* Calculate bytes per chunk touched by the query */
932 
933  std::map<ChunkKey, size_t> all_chunks_byte_sizes_map;
934  constexpr int32_t subkey_min = std::numeric_limits<int32_t>::min();
935 
936  for (const auto& col_byte_width_entry : column_byte_width_map) {
937  // Build a chunk key prefix of (db_id, table_id, column_id)
938  const int32_t db_id = col_byte_width_entry.first.db_id;
939  const int32_t table_id = col_byte_width_entry.first.table_id;
940  const int32_t col_id = col_byte_width_entry.first.column_id;
941  const size_t col_byte_width = col_byte_width_entry.second;
942  const shared::TableKey table_key(db_id, table_id);
943 
944  const auto frag_start =
945  query_table_fragments_size_map.lower_bound({table_key, subkey_min});
946  for (auto frag_itr = frag_start; frag_itr != query_table_fragments_size_map.end() &&
947  frag_itr->first.first == table_key;
948  frag_itr++) {
949  const ChunkKey chunk_key = {db_id, table_id, col_id, frag_itr->first.second};
950  const size_t chunk_byte_size = col_byte_width * frag_itr->second;
951  all_chunks_byte_sizes_map.insert({chunk_key, chunk_byte_size});
952  }
953  }
954 
955  size_t total_chunk_bytes{0};
956  const size_t num_chunks = all_chunks_byte_sizes_map.size();
957  std::vector<std::pair<ChunkKey, size_t>> chunks_with_byte_sizes;
958  chunks_with_byte_sizes.reserve(num_chunks);
959  for (const auto& chunk_byte_size_entry : all_chunks_byte_sizes_map) {
960  chunks_with_byte_sizes.emplace_back(
961  std::make_pair(chunk_byte_size_entry.first, chunk_byte_size_entry.second));
962  // Add here, post mapping of the chunks, to make sure chunks are deduped and we get an
963  // accurate size estimate
964  total_chunk_bytes += chunk_byte_size_entry.second;
965  }
966  // Don't allow scaling of bytes per kernel launches for GPU yet as we're not set up for
967  // this at this point
968  const bool bytes_scales_per_kernel = device_type == ExecutorDeviceType::CPU;
969 
970  // Return ChunkRequestInfo
971 
972  return {device_type,
973  chunks_with_byte_sizes,
974  num_chunks,
975  total_chunk_bytes,
976  bytes_per_kernel,
977  max_kernel_bytes,
978  bytes_scales_per_kernel};
979 }
980 
982  const std::vector<Analyzer::Expr*>& target_exprs) const {
984  for (const auto target_expr : target_exprs) {
985  if (plan_state_->isLazyFetchColumn(target_expr)) {
986  return true;
987  }
988  }
989  return false;
990 }
991 
992 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
993  const std::vector<Analyzer::Expr*>& target_exprs) const {
995  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
996  for (const auto target_expr : target_exprs) {
997  if (!plan_state_->isLazyFetchColumn(target_expr)) {
998  col_lazy_fetch_info.emplace_back(
999  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
1000  } else {
1001  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
1002  CHECK(col_var);
1003  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
1004  const auto cd = get_column_descriptor_maybe(col_var->getColumnKey());
1005  if (cd && IS_GEO(cd->columnType.get_type())) {
1006  // Geo coords cols will be processed in sequence. So we only need to track the
1007  // first coords col in lazy fetch info.
1008  {
1009  auto col_key = col_var->getColumnKey();
1010  col_key.column_id += 1;
1011  const auto cd0 = get_column_descriptor(col_key);
1012  const auto col0_ti = cd0->columnType;
1013  CHECK(!cd0->isVirtualCol);
1014  const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
1015  const auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
1016  col_lazy_fetch_info.emplace_back(
1017  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
1018  }
1019  } else {
1020  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
1021  const auto& col_ti = col_var->get_type_info();
1022  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
1023  }
1024  }
1025  }
1026  return col_lazy_fetch_info;
1027 }
1028 
1033 }
1034 
1035 std::vector<int8_t> Executor::serializeLiterals(
1036  const std::unordered_map<int, CgenState::LiteralValues>& literals,
1037  const int device_id) {
1038  if (literals.empty()) {
1039  return {};
1040  }
1041  const auto dev_literals_it = literals.find(device_id);
1042  CHECK(dev_literals_it != literals.end());
1043  const auto& dev_literals = dev_literals_it->second;
1044  size_t lit_buf_size{0};
1045  std::vector<std::string> real_strings;
1046  std::vector<std::vector<double>> double_array_literals;
1047  std::vector<std::vector<int8_t>> align64_int8_array_literals;
1048  std::vector<std::vector<int32_t>> int32_array_literals;
1049  std::vector<std::vector<int8_t>> align32_int8_array_literals;
1050  std::vector<std::vector<int8_t>> int8_array_literals;
1051  for (const auto& lit : dev_literals) {
1052  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
1053  if (lit.which() == 7) {
1054  const auto p = boost::get<std::string>(&lit);
1055  CHECK(p);
1056  real_strings.push_back(*p);
1057  } else if (lit.which() == 8) {
1058  const auto p = boost::get<std::vector<double>>(&lit);
1059  CHECK(p);
1060  double_array_literals.push_back(*p);
1061  } else if (lit.which() == 9) {
1062  const auto p = boost::get<std::vector<int32_t>>(&lit);
1063  CHECK(p);
1064  int32_array_literals.push_back(*p);
1065  } else if (lit.which() == 10) {
1066  const auto p = boost::get<std::vector<int8_t>>(&lit);
1067  CHECK(p);
1068  int8_array_literals.push_back(*p);
1069  } else if (lit.which() == 11) {
1070  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
1071  CHECK(p);
1072  if (p->second == 64) {
1073  align64_int8_array_literals.push_back(p->first);
1074  } else if (p->second == 32) {
1075  align32_int8_array_literals.push_back(p->first);
1076  } else {
1077  CHECK(false);
1078  }
1079  }
1080  }
1081  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
1082  throw TooManyLiterals();
1083  }
1084  int16_t crt_real_str_off = lit_buf_size;
1085  for (const auto& real_str : real_strings) {
1086  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1087  lit_buf_size += real_str.size();
1088  }
1089  if (double_array_literals.size() > 0) {
1090  lit_buf_size = align(lit_buf_size, sizeof(double));
1091  }
1092  int16_t crt_double_arr_lit_off = lit_buf_size;
1093  for (const auto& double_array_literal : double_array_literals) {
1094  CHECK_LE(double_array_literal.size(),
1095  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1096  lit_buf_size += double_array_literal.size() * sizeof(double);
1097  }
1098  if (align64_int8_array_literals.size() > 0) {
1099  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
1100  }
1101  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
1102  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
1103  CHECK_LE(align64_int8_array_literals.size(),
1104  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1105  lit_buf_size += align64_int8_array_literal.size();
1106  }
1107  if (int32_array_literals.size() > 0) {
1108  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
1109  }
1110  int16_t crt_int32_arr_lit_off = lit_buf_size;
1111  for (const auto& int32_array_literal : int32_array_literals) {
1112  CHECK_LE(int32_array_literal.size(),
1113  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1114  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
1115  }
1116  if (align32_int8_array_literals.size() > 0) {
1117  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
1118  }
1119  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
1120  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
1121  CHECK_LE(align32_int8_array_literals.size(),
1122  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1123  lit_buf_size += align32_int8_array_literal.size();
1124  }
1125  int16_t crt_int8_arr_lit_off = lit_buf_size;
1126  for (const auto& int8_array_literal : int8_array_literals) {
1127  CHECK_LE(int8_array_literal.size(),
1128  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1129  lit_buf_size += int8_array_literal.size();
1130  }
1131  unsigned crt_real_str_idx = 0;
1132  unsigned crt_double_arr_lit_idx = 0;
1133  unsigned crt_align64_int8_arr_lit_idx = 0;
1134  unsigned crt_int32_arr_lit_idx = 0;
1135  unsigned crt_align32_int8_arr_lit_idx = 0;
1136  unsigned crt_int8_arr_lit_idx = 0;
1137  std::vector<int8_t> serialized(lit_buf_size);
1138  size_t off{0};
1139  for (const auto& lit : dev_literals) {
1140  const auto lit_bytes = CgenState::literalBytes(lit);
1141  off = CgenState::addAligned(off, lit_bytes);
1142  switch (lit.which()) {
1143  case 0: {
1144  const auto p = boost::get<int8_t>(&lit);
1145  CHECK(p);
1146  serialized[off - lit_bytes] = *p;
1147  break;
1148  }
1149  case 1: {
1150  const auto p = boost::get<int16_t>(&lit);
1151  CHECK(p);
1152  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1153  break;
1154  }
1155  case 2: {
1156  const auto p = boost::get<int32_t>(&lit);
1157  CHECK(p);
1158  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1159  break;
1160  }
1161  case 3: {
1162  const auto p = boost::get<int64_t>(&lit);
1163  CHECK(p);
1164  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1165  break;
1166  }
1167  case 4: {
1168  const auto p = boost::get<float>(&lit);
1169  CHECK(p);
1170  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1171  break;
1172  }
1173  case 5: {
1174  const auto p = boost::get<double>(&lit);
1175  CHECK(p);
1176  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1177  break;
1178  }
1179  case 6: {
1180  const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
1181  CHECK(p);
1182  const auto str_id =
1184  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
1185  ->getOrAddTransient(p->first)
1186  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
1187  ->getIdOfString(p->first);
1188  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
1189  break;
1190  }
1191  case 7: {
1192  const auto p = boost::get<std::string>(&lit);
1193  CHECK(p);
1194  int32_t off_and_len = crt_real_str_off << 16;
1195  const auto& crt_real_str = real_strings[crt_real_str_idx];
1196  off_and_len |= static_cast<int16_t>(crt_real_str.size());
1197  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1198  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
1199  ++crt_real_str_idx;
1200  crt_real_str_off += crt_real_str.size();
1201  break;
1202  }
1203  case 8: {
1204  const auto p = boost::get<std::vector<double>>(&lit);
1205  CHECK(p);
1206  int32_t off_and_len = crt_double_arr_lit_off << 16;
1207  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
1208  int32_t len = crt_double_arr_lit.size();
1209  CHECK_EQ((len >> 16), 0);
1210  off_and_len |= static_cast<int16_t>(len);
1211  int32_t double_array_bytesize = len * sizeof(double);
1212  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1213  memcpy(&serialized[crt_double_arr_lit_off],
1214  crt_double_arr_lit.data(),
1215  double_array_bytesize);
1216  ++crt_double_arr_lit_idx;
1217  crt_double_arr_lit_off += double_array_bytesize;
1218  break;
1219  }
1220  case 9: {
1221  const auto p = boost::get<std::vector<int32_t>>(&lit);
1222  CHECK(p);
1223  int32_t off_and_len = crt_int32_arr_lit_off << 16;
1224  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
1225  int32_t len = crt_int32_arr_lit.size();
1226  CHECK_EQ((len >> 16), 0);
1227  off_and_len |= static_cast<int16_t>(len);
1228  int32_t int32_array_bytesize = len * sizeof(int32_t);
1229  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1230  memcpy(&serialized[crt_int32_arr_lit_off],
1231  crt_int32_arr_lit.data(),
1232  int32_array_bytesize);
1233  ++crt_int32_arr_lit_idx;
1234  crt_int32_arr_lit_off += int32_array_bytesize;
1235  break;
1236  }
1237  case 10: {
1238  const auto p = boost::get<std::vector<int8_t>>(&lit);
1239  CHECK(p);
1240  int32_t off_and_len = crt_int8_arr_lit_off << 16;
1241  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
1242  int32_t len = crt_int8_arr_lit.size();
1243  CHECK_EQ((len >> 16), 0);
1244  off_and_len |= static_cast<int16_t>(len);
1245  int32_t int8_array_bytesize = len;
1246  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1247  memcpy(&serialized[crt_int8_arr_lit_off],
1248  crt_int8_arr_lit.data(),
1249  int8_array_bytesize);
1250  ++crt_int8_arr_lit_idx;
1251  crt_int8_arr_lit_off += int8_array_bytesize;
1252  break;
1253  }
1254  case 11: {
1255  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
1256  CHECK(p);
1257  if (p->second == 64) {
1258  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1259  const auto& crt_align64_int8_arr_lit =
1260  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1261  int32_t len = crt_align64_int8_arr_lit.size();
1262  CHECK_EQ((len >> 16), 0);
1263  off_and_len |= static_cast<int16_t>(len);
1264  int32_t align64_int8_array_bytesize = len;
1265  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1266  memcpy(&serialized[crt_align64_int8_arr_lit_off],
1267  crt_align64_int8_arr_lit.data(),
1268  align64_int8_array_bytesize);
1269  ++crt_align64_int8_arr_lit_idx;
1270  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1271  } else if (p->second == 32) {
1272  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1273  const auto& crt_align32_int8_arr_lit =
1274  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1275  int32_t len = crt_align32_int8_arr_lit.size();
1276  CHECK_EQ((len >> 16), 0);
1277  off_and_len |= static_cast<int16_t>(len);
1278  int32_t align32_int8_array_bytesize = len;
1279  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1280  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1281  crt_align32_int8_arr_lit.data(),
1282  align32_int8_array_bytesize);
1283  ++crt_align32_int8_arr_lit_idx;
1284  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1285  } else {
1286  CHECK(false);
1287  }
1288  break;
1289  }
1290  default:
1291  CHECK(false);
1292  }
1293  }
1294  return serialized;
1295 }
1296 
1297 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1298  if (device_type == ExecutorDeviceType::GPU) {
1299  return cudaMgr()->getDeviceCount();
1300  } else {
1301  return 1;
1302  }
1303 }
1304 
1306  const Data_Namespace::MemoryLevel memory_level) const {
1307  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1309 }
1310 
1311 // TODO(alex): remove or split
1312 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1313  const SQLTypeInfo& ti,
1314  const int64_t agg_init_val,
1315  const int8_t out_byte_width,
1316  const int64_t* out_vec,
1317  const size_t out_vec_sz,
1318  const bool is_group_by,
1319  const bool float_argument_input) {
1320  switch (agg) {
1321  case kAVG:
1322  case kSUM:
1323  case kSUM_IF:
1324  if (0 != agg_init_val) {
1325  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1326  int64_t agg_result = agg_init_val;
1327  for (size_t i = 0; i < out_vec_sz; ++i) {
1328  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1329  }
1330  return {agg_result, 0};
1331  } else {
1332  CHECK(ti.is_fp());
1333  switch (out_byte_width) {
1334  case 4: {
1335  int agg_result = static_cast<int32_t>(agg_init_val);
1336  for (size_t i = 0; i < out_vec_sz; ++i) {
1338  &agg_result,
1339  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1340  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1341  }
1342  const int64_t converted_bin =
1343  float_argument_input
1344  ? static_cast<int64_t>(agg_result)
1345  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1346  return {converted_bin, 0};
1347  break;
1348  }
1349  case 8: {
1350  int64_t agg_result = agg_init_val;
1351  for (size_t i = 0; i < out_vec_sz; ++i) {
1353  &agg_result,
1354  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1355  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1356  }
1357  return {agg_result, 0};
1358  break;
1359  }
1360  default:
1361  CHECK(false);
1362  }
1363  }
1364  }
1365  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1366  int64_t agg_result = 0;
1367  for (size_t i = 0; i < out_vec_sz; ++i) {
1368  agg_result += out_vec[i];
1369  }
1370  return {agg_result, 0};
1371  } else {
1372  CHECK(ti.is_fp());
1373  switch (out_byte_width) {
1374  case 4: {
1375  float r = 0.;
1376  for (size_t i = 0; i < out_vec_sz; ++i) {
1377  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1378  }
1379  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1380  const int64_t converted_bin =
1381  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1382  return {converted_bin, 0};
1383  }
1384  case 8: {
1385  double r = 0.;
1386  for (size_t i = 0; i < out_vec_sz; ++i) {
1387  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1388  }
1389  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1390  }
1391  default:
1392  CHECK(false);
1393  }
1394  }
1395  break;
1396  case kCOUNT:
1397  case kCOUNT_IF: {
1398  uint64_t agg_result = 0;
1399  for (size_t i = 0; i < out_vec_sz; ++i) {
1400  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1401  agg_result += out;
1402  }
1403  return {static_cast<int64_t>(agg_result), 0};
1404  }
1405  case kMIN: {
1406  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1407  int64_t agg_result = agg_init_val;
1408  for (size_t i = 0; i < out_vec_sz; ++i) {
1409  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1410  }
1411  return {agg_result, 0};
1412  } else {
1413  switch (out_byte_width) {
1414  case 4: {
1415  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1416  for (size_t i = 0; i < out_vec_sz; ++i) {
1418  &agg_result,
1419  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1420  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1421  }
1422  const int64_t converted_bin =
1423  float_argument_input
1424  ? static_cast<int64_t>(agg_result)
1425  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1426  return {converted_bin, 0};
1427  }
1428  case 8: {
1429  int64_t agg_result = agg_init_val;
1430  for (size_t i = 0; i < out_vec_sz; ++i) {
1432  &agg_result,
1433  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1434  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1435  }
1436  return {agg_result, 0};
1437  }
1438  default:
1439  CHECK(false);
1440  }
1441  }
1442  }
1443  case kMAX:
1444  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1445  int64_t agg_result = agg_init_val;
1446  for (size_t i = 0; i < out_vec_sz; ++i) {
1447  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1448  }
1449  return {agg_result, 0};
1450  } else {
1451  switch (out_byte_width) {
1452  case 4: {
1453  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1454  for (size_t i = 0; i < out_vec_sz; ++i) {
1456  &agg_result,
1457  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1458  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1459  }
1460  const int64_t converted_bin =
1461  float_argument_input ? static_cast<int64_t>(agg_result)
1462  : float_to_double_bin(agg_result, !ti.get_notnull());
1463  return {converted_bin, 0};
1464  }
1465  case 8: {
1466  int64_t agg_result = agg_init_val;
1467  for (size_t i = 0; i < out_vec_sz; ++i) {
1469  &agg_result,
1470  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1471  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1472  }
1473  return {agg_result, 0};
1474  }
1475  default:
1476  CHECK(false);
1477  }
1478  }
1479  case kSINGLE_VALUE: {
1480  int64_t agg_result = agg_init_val;
1481  for (size_t i = 0; i < out_vec_sz; ++i) {
1482  if (out_vec[i] != agg_init_val) {
1483  if (agg_result == agg_init_val) {
1484  agg_result = out_vec[i];
1485  } else if (out_vec[i] != agg_result) {
1487  }
1488  }
1489  }
1490  return {agg_result, 0};
1491  }
1492  case kSAMPLE: {
1493  int64_t agg_result = agg_init_val;
1494  for (size_t i = 0; i < out_vec_sz; ++i) {
1495  if (out_vec[i] != agg_init_val) {
1496  agg_result = out_vec[i];
1497  break;
1498  }
1499  }
1500  return {agg_result, 0};
1501  }
1502  default:
1503  UNREACHABLE() << "Unsupported SQLAgg: " << agg;
1504  }
1505  abort();
1506 }
1507 
1508 namespace {
1509 
1511  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1512  std::vector<TargetInfo> const& targets) {
1513  auto& first = results_per_device.front().first;
1514  CHECK(first);
1515  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1516  if (first_target_idx) {
1517  first->translateDictEncodedColumns(targets, *first_target_idx);
1518  }
1519  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1520  const auto& next = results_per_device[dev_idx].first;
1521  CHECK(next);
1522  if (first_target_idx) {
1523  next->translateDictEncodedColumns(targets, *first_target_idx);
1524  }
1525  first->append(*next);
1526  }
1527  return std::move(first);
1528 }
1529 
1531  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1532  return get_target_info(target_expr, g_bigint_count);
1533  }
1534 };
1535 
1536 } // namespace
1537 
1539  const RelAlgExecutionUnit& ra_exe_unit) {
1540  auto timer = DEBUG_TIMER(__func__);
1541  auto& results_per_device = shared_context.getFragmentResults();
1542  auto const targets = shared::transform<std::vector<TargetInfo>>(
1543  ra_exe_unit.target_exprs, GetTargetInfo{});
1544  if (results_per_device.empty()) {
1545  return std::make_shared<ResultSet>(targets,
1549  blockSize(),
1550  gridSize());
1551  }
1552  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1553  std::sort(results_per_device.begin(),
1554  results_per_device.end(),
1555  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1556  CHECK_GE(lhs.second.size(), size_t(1));
1557  CHECK_GE(rhs.second.size(), size_t(1));
1558  return lhs.second.front() < rhs.second.front();
1559  });
1560 
1561  return get_merged_result(results_per_device, targets);
1562 }
1563 
1565  const RelAlgExecutionUnit& ra_exe_unit,
1566  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1567  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1568  const QueryMemoryDescriptor& query_mem_desc) const {
1569  auto timer = DEBUG_TIMER(__func__);
1570  if (ra_exe_unit.estimator) {
1571  return reduce_estimator_results(ra_exe_unit, results_per_device);
1572  }
1573 
1574  if (results_per_device.empty()) {
1575  auto const targets = shared::transform<std::vector<TargetInfo>>(
1576  ra_exe_unit.target_exprs, GetTargetInfo{});
1577  return std::make_shared<ResultSet>(targets,
1580  nullptr,
1581  blockSize(),
1582  gridSize());
1583  }
1584 
1585  if (query_mem_desc.threadsCanReuseGroupByBuffers()) {
1586  auto unique_results = getUniqueThreadSharedResultSets(results_per_device);
1588  unique_results,
1589  row_set_mem_owner,
1590  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1591  }
1593  results_per_device,
1594  row_set_mem_owner,
1595  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1596 }
1597 
1598 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>
1600  const std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device)
1601  const {
1602  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> unique_thread_results;
1603  if (results_per_device.empty()) {
1604  return unique_thread_results;
1605  }
1606  auto max_ti = [](int acc, auto& e) { return std::max(acc, e.first->getThreadIdx()); };
1607  int const max_thread_idx =
1608  std::accumulate(results_per_device.begin(), results_per_device.end(), -1, max_ti);
1609  std::vector<bool> seen_thread_idxs(max_thread_idx + 1, false);
1610  for (const auto& result : results_per_device) {
1611  const int32_t result_thread_idx = result.first->getThreadIdx();
1612  if (!seen_thread_idxs[result_thread_idx]) {
1613  seen_thread_idxs[result_thread_idx] = true;
1614  unique_thread_results.emplace_back(result);
1615  }
1616  }
1617  return unique_thread_results;
1618 }
1619 
1620 namespace {
1621 
1623  const size_t executor_id,
1624  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1625  int64_t* compilation_queue_time) {
1626  auto clock_begin = timer_start();
1627  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1628  *compilation_queue_time = timer_stop(clock_begin);
1629  const auto& this_result_set = results_per_device[0].first;
1630  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1631  this_result_set->getTargetInfos(),
1632  this_result_set->getTargetInitVals(),
1633  executor_id);
1634  return reduction_jit.codegen();
1635 };
1636 
1637 } // namespace
1638 
1640  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1641  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1642  const QueryMemoryDescriptor& query_mem_desc) const {
1643  auto timer = DEBUG_TIMER(__func__);
1644  std::shared_ptr<ResultSet> reduced_results;
1645 
1646  const auto& first = results_per_device.front().first;
1647 
1648  if (query_mem_desc.getQueryDescriptionType() ==
1650  results_per_device.size() > 1) {
1651  const auto total_entry_count = std::accumulate(
1652  results_per_device.begin(),
1653  results_per_device.end(),
1654  size_t(0),
1655  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1656  const auto& r = rs.first;
1657  return init + r->getQueryMemDesc().getEntryCount();
1658  });
1659  CHECK(total_entry_count);
1660  auto query_mem_desc = first->getQueryMemDesc();
1661  query_mem_desc.setEntryCount(total_entry_count);
1662  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1665  row_set_mem_owner,
1666  blockSize(),
1667  gridSize());
1668  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1669  reduced_results->initializeStorage();
1670  switch (query_mem_desc.getEffectiveKeyWidth()) {
1671  case 4:
1672  first->getStorage()->moveEntriesToBuffer<int32_t>(
1673  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1674  break;
1675  case 8:
1676  first->getStorage()->moveEntriesToBuffer<int64_t>(
1677  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1678  break;
1679  default:
1680  CHECK(false);
1681  }
1682  } else {
1683  reduced_results = first;
1684  }
1685 
1686  int64_t compilation_queue_time = 0;
1687  const auto reduction_code =
1688  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1689 
1690  for (size_t i = 1; i < results_per_device.size(); ++i) {
1691  reduced_results->getStorage()->reduce(
1692  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1693  }
1694  reduced_results->addCompilationQueueTime(compilation_queue_time);
1695  reduced_results->invalidateCachedRowCount();
1696  return reduced_results;
1697 }
1698 
1700  const RelAlgExecutionUnit& ra_exe_unit,
1701  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1702  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1703  const QueryMemoryDescriptor& query_mem_desc) const {
1704  if (results_per_device.size() == 1) {
1705  return std::move(results_per_device.front().first);
1706  }
1707  const auto top_n =
1708  ra_exe_unit.sort_info.limit.value_or(0) + ra_exe_unit.sort_info.offset;
1710  for (const auto& result : results_per_device) {
1711  auto rows = result.first;
1712  CHECK(rows);
1713  if (!rows) {
1714  continue;
1715  }
1716  SpeculativeTopNMap that(
1717  *rows,
1718  ra_exe_unit.target_exprs,
1719  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1720  m.reduce(that);
1721  }
1722  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1723  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1724  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1725 }
1726 
1727 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1728  CHECK(data_mgr);
1729  std::unordered_set<int> available_gpus;
1730  if (data_mgr->gpusPresent()) {
1731  CHECK(data_mgr->getCudaMgr());
1732  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1733  CHECK_GT(gpu_count, 0);
1734  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1735  available_gpus.insert(gpu_id);
1736  }
1737  }
1738  return available_gpus;
1739 }
1740 
1741 size_t get_context_count(const ExecutorDeviceType device_type,
1742  const size_t cpu_count,
1743  const size_t gpu_count) {
1744  return device_type == ExecutorDeviceType::GPU ? gpu_count
1745  : static_cast<size_t>(cpu_count);
1746 }
1747 
1748 namespace {
1749 
1750 // Compute a very conservative entry count for the output buffer entry count using no
1751 // other information than the number of tuples in each table and multiplying them
1752 // together.
1753 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos,
1754  const RelAlgExecutionUnit& ra_exe_unit) {
1755  // we can use filtered_count_all's result if available
1756  if (ra_exe_unit.scan_limit) {
1757  VLOG(1)
1758  << "Exploiting a result of filtered count query as output buffer entry count: "
1759  << ra_exe_unit.scan_limit;
1760  return ra_exe_unit.scan_limit;
1761  }
1763  using checked_size_t = boost::multiprecision::number<
1764  boost::multiprecision::cpp_int_backend<64,
1765  64,
1766  boost::multiprecision::unsigned_magnitude,
1767  boost::multiprecision::checked,
1768  void>>;
1769  checked_size_t checked_max_groups_buffer_entry_guess = 1;
1770  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1771  // baseline group layout with that many entries anyway.
1772  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1773  // Check for overflows since we're multiplying potentially big table sizes.
1774  try {
1775  for (const auto& table_info : query_infos) {
1776  CHECK(!table_info.info.fragments.empty());
1777  checked_size_t table_cardinality = 0;
1778  std::for_each(table_info.info.fragments.begin(),
1779  table_info.info.fragments.end(),
1780  [&table_cardinality](const FragmentInfo& frag_info) {
1781  table_cardinality += frag_info.getNumTuples();
1782  });
1783  checked_max_groups_buffer_entry_guess *= table_cardinality;
1784  }
1785  } catch (...) {
1786  checked_max_groups_buffer_entry_guess = max_groups_buffer_entry_guess_cap;
1787  VLOG(1) << "Detect overflow when approximating output buffer entry count, "
1788  "resetting it as "
1789  << max_groups_buffer_entry_guess_cap;
1790  }
1791  size_t max_groups_buffer_entry_guess =
1792  std::min(static_cast<size_t>(checked_max_groups_buffer_entry_guess),
1793  max_groups_buffer_entry_guess_cap);
1794  VLOG(1) << "Set an approximated output entry count as: "
1795  << max_groups_buffer_entry_guess;
1796  return max_groups_buffer_entry_guess;
1797 }
1798 
1799 std::string get_table_name(const InputDescriptor& input_desc) {
1800  const auto source_type = input_desc.getSourceType();
1801  if (source_type == InputSourceType::TABLE) {
1802  const auto& table_key = input_desc.getTableKey();
1803  CHECK_GT(table_key.table_id, 0);
1804  const auto td = Catalog_Namespace::get_metadata_for_table(table_key);
1805  CHECK(td);
1806  return td->tableName;
1807  } else {
1808  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableKey().table_id);
1809  }
1810 }
1811 
1813  size_t watchdog_max_projected_rows_per_device,
1814  const ExecutorDeviceType device_type,
1815  const int device_count) {
1816  if (device_type == ExecutorDeviceType::GPU) {
1817  return device_count * watchdog_max_projected_rows_per_device;
1818  }
1819  return watchdog_max_projected_rows_per_device;
1820 }
1821 
1823  const std::vector<InputTableInfo>& table_infos,
1824  const ExecutorDeviceType device_type,
1825  const int device_count) {
1826  for (const auto target_expr : ra_exe_unit.target_exprs) {
1827  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1828  return;
1829  }
1830  }
1831  size_t watchdog_max_projected_rows_per_device =
1833  if (ra_exe_unit.query_hint.isHintRegistered(
1835  watchdog_max_projected_rows_per_device =
1837  VLOG(1) << "Set the watchdog per device maximum projection limit: "
1838  << watchdog_max_projected_rows_per_device << " by a query hint";
1839  }
1840  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1841  table_infos.front().info.getPhysicalNumTuples() <
1842  watchdog_max_projected_rows_per_device) {
1843  // Allow a query with no scan limit to run on small tables
1844  return;
1845  }
1846  if (ra_exe_unit.use_bump_allocator) {
1847  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1848  // relative to the size of the input), so we bypass this check for now
1849  return;
1850  }
1851  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1852  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1853  (!ra_exe_unit.scan_limit ||
1854  ra_exe_unit.scan_limit >
1856  watchdog_max_projected_rows_per_device, device_type, device_count))) {
1857  std::vector<std::string> table_names;
1858  const auto& input_descs = ra_exe_unit.input_descs;
1859  for (const auto& input_desc : input_descs) {
1860  table_names.push_back(get_table_name(input_desc));
1861  }
1862  if (!ra_exe_unit.scan_limit) {
1863  throw WatchdogException(
1864  "Projection query would require a scan without a limit on table(s): " +
1865  boost::algorithm::join(table_names, ", "));
1866  } else {
1867  throw WatchdogException(
1868  "Projection query output result set on table(s): " +
1869  boost::algorithm::join(table_names, ", ") + " would contain " +
1870  std::to_string(ra_exe_unit.scan_limit) +
1871  " rows, which is more than the current system limit of " +
1873  watchdog_max_projected_rows_per_device, device_type, device_count)));
1874  }
1875  }
1876 }
1877 
1878 } // namespace
1879 
1880 size_t get_loop_join_size(const std::vector<InputTableInfo>& query_infos,
1881  const RelAlgExecutionUnit& ra_exe_unit) {
1882  const auto inner_table_key = ra_exe_unit.input_descs.back().getTableKey();
1883 
1884  std::optional<size_t> inner_table_idx;
1885  for (size_t i = 0; i < query_infos.size(); ++i) {
1886  if (query_infos[i].table_key == inner_table_key) {
1887  inner_table_idx = i;
1888  break;
1889  }
1890  }
1891  CHECK(inner_table_idx);
1892  return query_infos[*inner_table_idx].info.getNumTuples();
1893 }
1894 
1895 namespace {
1896 
1897 template <typename T>
1898 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1899  std::vector<std::string> expr_strs;
1900  for (const auto& expr : expr_container) {
1901  if (!expr) {
1902  expr_strs.emplace_back("NULL");
1903  } else {
1904  expr_strs.emplace_back(expr->toString());
1905  }
1906  }
1907  return expr_strs;
1908 }
1909 
1910 template <>
1911 std::vector<std::string> expr_container_to_string(
1912  const std::list<Analyzer::OrderEntry>& expr_container) {
1913  std::vector<std::string> expr_strs;
1914  for (const auto& expr : expr_container) {
1915  expr_strs.emplace_back(expr.toString());
1916  }
1917  return expr_strs;
1918 }
1919 
1920 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1921  switch (algorithm) {
1923  return "ResultSet";
1925  return "Speculative Top N";
1927  return "Streaming Top N";
1928  }
1929  UNREACHABLE();
1930  return "";
1931 }
1932 
1933 } // namespace
1934 
1936  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1937  // instead of ra_exec_unit description if possible
1938  std::ostringstream os;
1939  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1940  const auto& scan_desc = input_col_desc->getScanDesc();
1941  os << scan_desc.getTableKey() << "," << input_col_desc->getColId() << ","
1942  << scan_desc.getNestLevel();
1943  table_keys.emplace(scan_desc.getTableKey());
1944  }
1945  if (!ra_exe_unit.simple_quals.empty()) {
1946  for (const auto& qual : ra_exe_unit.simple_quals) {
1947  if (qual) {
1948  os << qual->toString() << ",";
1949  }
1950  }
1951  }
1952  if (!ra_exe_unit.quals.empty()) {
1953  for (const auto& qual : ra_exe_unit.quals) {
1954  if (qual) {
1955  os << qual->toString() << ",";
1956  }
1957  }
1958  }
1959  if (!ra_exe_unit.join_quals.empty()) {
1960  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1961  const auto& join_condition = ra_exe_unit.join_quals[i];
1962  os << std::to_string(i) << ::toString(join_condition.type);
1963  for (const auto& qual : join_condition.quals) {
1964  if (qual) {
1965  os << qual->toString() << ",";
1966  }
1967  }
1968  }
1969  }
1970  if (!ra_exe_unit.groupby_exprs.empty()) {
1971  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1972  if (qual) {
1973  os << qual->toString() << ",";
1974  }
1975  }
1976  }
1977  for (const auto& expr : ra_exe_unit.target_exprs) {
1978  if (expr) {
1979  os << expr->toString() << ",";
1980  }
1981  }
1982  os << ::toString(ra_exe_unit.estimator == nullptr);
1983  os << std::to_string(ra_exe_unit.scan_limit);
1984  key = os.str();
1985 }
1986 
1988  return key == other.key;
1989 }
1990 
1992  return boost::hash_value(key);
1993 }
1994 
1996  return table_keys.find(table_key) != table_keys.end();
1997 }
1998 
1999 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
2000  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
2001  os << "\n\tTable/Col/Levels: ";
2002  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
2003  const auto& scan_desc = input_col_desc->getScanDesc();
2004  os << "(" << scan_desc.getTableKey() << ", " << input_col_desc->getColId() << ", "
2005  << scan_desc.getNestLevel() << ") ";
2006  }
2007  if (!ra_exe_unit.simple_quals.empty()) {
2008  os << "\n\tSimple Quals: "
2010  ", ");
2011  }
2012  if (!ra_exe_unit.quals.empty()) {
2013  os << "\n\tQuals: "
2014  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
2015  }
2016  if (!ra_exe_unit.join_quals.empty()) {
2017  os << "\n\tJoin Quals: ";
2018  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
2019  const auto& join_condition = ra_exe_unit.join_quals[i];
2020  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
2021  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
2022  }
2023  }
2024  if (!ra_exe_unit.groupby_exprs.empty()) {
2025  os << "\n\tGroup By: "
2027  ", ");
2028  }
2029  os << "\n\tProjected targets: "
2031  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
2032  os << "\n\tSort Info: ";
2033  const auto& sort_info = ra_exe_unit.sort_info;
2034  os << "\n\t Order Entries: "
2035  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
2036  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
2037  std::string limit_str = sort_info.limit ? std::to_string(*sort_info.limit) : "N/A";
2038  os << "\n\t Limit: " << limit_str;
2039  os << "\n\t Offset: " << std::to_string(sort_info.offset);
2040  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
2041  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
2042  if (ra_exe_unit.union_all) {
2043  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
2044  }
2045  return os;
2046 }
2047 
2048 namespace {
2049 
2051  const size_t new_scan_limit) {
2052  return {ra_exe_unit_in.input_descs,
2053  ra_exe_unit_in.input_col_descs,
2054  ra_exe_unit_in.simple_quals,
2055  ra_exe_unit_in.quals,
2056  ra_exe_unit_in.join_quals,
2057  ra_exe_unit_in.groupby_exprs,
2058  ra_exe_unit_in.target_exprs,
2059  ra_exe_unit_in.target_exprs_original_type_infos,
2060  ra_exe_unit_in.estimator,
2061  ra_exe_unit_in.sort_info,
2062  new_scan_limit,
2063  ra_exe_unit_in.query_hint,
2064  ra_exe_unit_in.query_plan_dag_hash,
2065  ra_exe_unit_in.hash_table_build_plan_dag,
2066  ra_exe_unit_in.table_id_to_node_map,
2067  ra_exe_unit_in.use_bump_allocator,
2068  ra_exe_unit_in.union_all,
2069  ra_exe_unit_in.query_state};
2070 }
2071 
2072 } // namespace
2073 
2074 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
2075  const bool is_agg,
2076  const std::vector<InputTableInfo>& query_infos,
2077  const RelAlgExecutionUnit& ra_exe_unit_in,
2078  const CompilationOptions& co,
2079  const ExecutionOptions& eo,
2080  RenderInfo* render_info,
2081  const bool has_cardinality_estimation,
2082  ColumnCacheMap& column_cache) {
2083  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
2084  ScopeGuard cleanup_post_execution = [this] {
2085  // cleanup/unpin GPU buffer allocations
2086  // TODO: separate out this state into a single object
2087  plan_state_.reset(nullptr);
2088  if (cgen_state_) {
2089  cgen_state_->in_values_bitmaps_.clear();
2090  cgen_state_->str_dict_translation_mgrs_.clear();
2091  cgen_state_->tree_model_prediction_mgrs_.clear();
2092  }
2093  row_set_mem_owner_->clearNonOwnedGroupByBuffers();
2094  };
2095 
2096  try {
2097  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
2098  is_agg,
2099  true,
2100  query_infos,
2101  ra_exe_unit_in,
2102  co,
2103  eo,
2105  render_info,
2106  has_cardinality_estimation,
2107  column_cache);
2108  if (result) {
2109  result->setKernelQueueTime(kernel_queue_time_ms_);
2110  result->addCompilationQueueTime(compilation_queue_time_ms_);
2111  if (eo.just_validate) {
2112  result->setValidationOnlyRes();
2113  }
2114  }
2115  return result;
2116  } catch (const CompilationRetryNewScanLimit& e) {
2117  auto result =
2118  executeWorkUnitImpl(max_groups_buffer_entry_guess,
2119  is_agg,
2120  false,
2121  query_infos,
2122  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
2123  co,
2124  eo,
2126  render_info,
2127  has_cardinality_estimation,
2128  column_cache);
2129  if (result) {
2130  result->setKernelQueueTime(kernel_queue_time_ms_);
2131  result->addCompilationQueueTime(compilation_queue_time_ms_);
2132  if (eo.just_validate) {
2133  result->setValidationOnlyRes();
2134  }
2135  }
2136  return result;
2137  }
2138 }
2139 
2141  size_t& max_groups_buffer_entry_guess,
2142  const bool is_agg,
2143  const bool allow_single_frag_table_opt,
2144  const std::vector<InputTableInfo>& query_infos,
2145  const RelAlgExecutionUnit& ra_exe_unit_in,
2146  const CompilationOptions& co,
2147  const ExecutionOptions& eo,
2148  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
2149  RenderInfo* render_info,
2150  const bool has_cardinality_estimation,
2151  ColumnCacheMap& column_cache) {
2152  INJECT_TIMER(Exec_executeWorkUnit);
2153  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
2154  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
2155  CHECK(!query_infos.empty());
2156  if (!max_groups_buffer_entry_guess) {
2157  // The query has failed the first execution attempt because of running out
2158  // of group by slots. Make the conservative choice: allocate fragment size
2159  // slots and run on the CPU.
2160  CHECK(device_type == ExecutorDeviceType::CPU);
2161  max_groups_buffer_entry_guess =
2162  compute_buffer_entry_guess(query_infos, ra_exe_unit_in);
2163  }
2164 
2165  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
2166  CompilationOptions copied_co = co;
2167  copied_co.device_type = device_type;
2168  do {
2169  SharedKernelContext shared_context(query_infos);
2170  ColumnFetcher column_fetcher(this, column_cache);
2171  ScopeGuard scope_guard = [&column_fetcher] {
2172  column_fetcher.freeLinearizedBuf();
2173  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
2174  };
2175  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2176  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2177  if (eo.executor_type == ExecutorType::Native) {
2178  try {
2179  INJECT_TIMER(query_step_compilation);
2180  query_mem_desc_owned =
2181  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
2182  crt_min_byte_width,
2183  has_cardinality_estimation,
2184  ra_exe_unit,
2185  query_infos,
2186  deleted_cols_map,
2187  column_fetcher,
2188  copied_co,
2189  eo,
2190  render_info,
2191  this);
2192  CHECK(query_mem_desc_owned);
2193  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
2194  } catch (CompilationRetryNoCompaction& e) {
2195  VLOG(1) << e.what();
2196  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
2197  continue;
2198  }
2199  } else {
2200  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
2201  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2202  CHECK(!query_mem_desc_owned);
2203  query_mem_desc_owned.reset(
2205  }
2206  if (eo.just_explain) {
2207  return executeExplain(*query_comp_desc_owned);
2208  }
2209 
2210  if (query_mem_desc_owned->canUsePerDeviceCardinality(ra_exe_unit)) {
2211  auto const max_rows_per_device =
2212  query_mem_desc_owned->getMaxPerDeviceCardinality(ra_exe_unit);
2213  if (max_rows_per_device && *max_rows_per_device >= 0 &&
2214  *max_rows_per_device < query_mem_desc_owned->getEntryCount()) {
2215  VLOG(1) << "Setting the max per device cardinality of {max_rows_per_device} as "
2216  "the new scan limit: "
2217  << *max_rows_per_device;
2218  throw CompilationRetryNewScanLimit(*max_rows_per_device);
2219  }
2220  }
2221 
2222  if (!eo.just_validate) {
2223  int available_cpus = cpu_threads();
2224  auto available_gpus = get_available_gpus(data_mgr_);
2225 
2226  const auto context_count =
2227  get_context_count(device_type, available_cpus, available_gpus.size());
2228  try {
2229  auto kernels = createKernels(shared_context,
2230  ra_exe_unit,
2231  column_fetcher,
2232  query_infos,
2233  eo,
2234  is_agg,
2235  allow_single_frag_table_opt,
2236  context_count,
2237  *query_comp_desc_owned,
2238  *query_mem_desc_owned,
2239  render_info,
2240  available_gpus,
2241  available_cpus);
2243  launchKernelsViaResourceMgr(shared_context,
2244  std::move(kernels),
2245  query_comp_desc_owned->getDeviceType(),
2246  ra_exe_unit.input_descs,
2247  *query_mem_desc_owned);
2248  } else {
2250  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
2251  }
2252 
2253  } catch (QueryExecutionError& e) {
2254  if (eo.with_dynamic_watchdog && interrupted_.load() &&
2255  e.getErrorCode() == ERR_OUT_OF_TIME) {
2257  }
2258  if (e.getErrorCode() == ERR_INTERRUPTED) {
2260  }
2262  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
2263  crt_min_byte_width <<= 1;
2264  continue;
2265  }
2266  throw;
2267  }
2268  }
2269  if (is_agg) {
2270  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
2271  // update query status to let user know we are now in the reduction phase
2272  std::string curRunningSession{""};
2273  std::string curRunningQuerySubmittedTime{""};
2274  bool sessionEnrolled = false;
2275  {
2278  curRunningSession = getCurrentQuerySession(session_read_lock);
2279  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
2280  sessionEnrolled =
2281  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
2282  }
2283  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
2284  sessionEnrolled) {
2285  updateQuerySessionStatus(curRunningSession,
2286  curRunningQuerySubmittedTime,
2288  }
2289  }
2290  try {
2291  if (eo.estimate_output_cardinality) {
2292  for (const auto& result : shared_context.getFragmentResults()) {
2293  auto row = result.first->getNextRow(false, false);
2294  CHECK_EQ(1u, row.size());
2295  auto scalar_r = boost::get<ScalarTargetValue>(&row[0]);
2296  CHECK(scalar_r);
2297  auto p = boost::get<int64_t>(scalar_r);
2298  CHECK(p);
2299  // todo(yoonmin): sort the frag_ids to make it consistent for later usage
2300  auto frag_ids = result.second;
2301  VLOG(1) << "Filtered cardinality for fragments-{" << ::toString(result.second)
2302  << "} : " << static_cast<size_t>(*p);
2303  ra_exe_unit_in.per_device_cardinality.emplace_back(result.second,
2304  static_cast<size_t>(*p));
2305  result.first->moveToBegin();
2306  }
2307  }
2308  return collectAllDeviceResults(shared_context,
2309  ra_exe_unit,
2310  *query_mem_desc_owned,
2311  query_comp_desc_owned->getDeviceType(),
2312  row_set_mem_owner);
2313  } catch (ReductionRanOutOfSlots&) {
2315  } catch (OverflowOrUnderflow&) {
2316  crt_min_byte_width <<= 1;
2317  continue;
2318  } catch (QueryExecutionError& e) {
2319  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
2320  << ", what(): " << e.what();
2321  throw QueryExecutionError(e.getErrorCode());
2322  }
2323  }
2324  return resultsUnion(shared_context, ra_exe_unit);
2325 
2326  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
2327 
2328  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2331  nullptr,
2332  blockSize(),
2333  gridSize());
2334 }
2335 
2337  const RelAlgExecutionUnit& ra_exe_unit_in,
2338  const InputTableInfo& table_info,
2339  const CompilationOptions& co,
2340  const ExecutionOptions& eo,
2342  PerFragmentCallBack& cb,
2343  const std::set<size_t>& fragment_indexes_param) {
2344  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
2345  ColumnCacheMap column_cache;
2346 
2347  std::vector<InputTableInfo> table_infos{table_info};
2348  SharedKernelContext kernel_context(table_infos);
2349 
2350  ColumnFetcher column_fetcher(this, column_cache);
2351  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2352  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2353  {
2354  query_mem_desc_owned =
2355  query_comp_desc_owned->compile(0,
2356  8,
2357  /*has_cardinality_estimation=*/false,
2358  ra_exe_unit,
2359  table_infos,
2360  deleted_cols_map,
2361  column_fetcher,
2362  co,
2363  eo,
2364  nullptr,
2365  this);
2366  }
2367  CHECK(query_mem_desc_owned);
2368  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2369  const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2370  const auto& outer_fragments = table_info.info.fragments;
2371 
2372  std::set<size_t> fragment_indexes;
2373  if (fragment_indexes_param.empty()) {
2374  // An empty `fragment_indexes_param` set implies executing
2375  // the query for all fragments in the table. In this
2376  // case, populate `fragment_indexes` with all fragment indexes.
2377  for (size_t i = 0; i < outer_fragments.size(); i++) {
2378  fragment_indexes.emplace(i);
2379  }
2380  } else {
2381  fragment_indexes = fragment_indexes_param;
2382  }
2383 
2384  {
2385  auto clock_begin = timer_start();
2386  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2387  kernel_queue_time_ms_ += timer_stop(clock_begin);
2388 
2389  for (auto fragment_index : fragment_indexes) {
2390  // We may want to consider in the future allowing this to execute on devices other
2391  // than CPU
2392  FragmentsList fragments_list{{table_key, {fragment_index}}};
2393  ExecutionKernel kernel(ra_exe_unit,
2394  co.device_type,
2395  /*device_id=*/0,
2396  eo,
2397  column_fetcher,
2398  *query_comp_desc_owned,
2399  *query_mem_desc_owned,
2400  fragments_list,
2402  /*render_info=*/nullptr,
2403  /*rowid_lookup_key=*/-1);
2404  kernel.run(this, 0, kernel_context);
2405  }
2406  }
2407 
2408  const auto& all_fragment_results = kernel_context.getFragmentResults();
2409 
2410  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2411  CHECK_EQ(result_fragment_indexes.size(), 1);
2412  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2413  }
2414 }
2415 
2417  const TableFunctionExecutionUnit exe_unit,
2418  const std::vector<InputTableInfo>& table_infos,
2419  const CompilationOptions& co,
2420  const ExecutionOptions& eo) {
2421  INJECT_TIMER(Exec_executeTableFunction);
2422  if (eo.just_validate) {
2424  /*entry_count=*/0,
2426  return std::make_shared<ResultSet>(
2427  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2428  co.device_type,
2429  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2430  this->getRowSetMemoryOwner(),
2431  this->blockSize(),
2432  this->gridSize());
2433  }
2434 
2435  // Avoid compile functions that set the sizer at runtime if the device is GPU
2436  // This should be fixed in the python script as well to minimize the number of
2437  // QueryMustRunOnCpu exceptions
2440  throw QueryMustRunOnCpu();
2441  }
2442 
2443  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2444  // framework, we may want to move this up a level
2445 
2446  ColumnFetcher column_fetcher(this, column_cache);
2448 
2449  if (exe_unit.table_func.containsPreFlightFn()) {
2450  std::shared_ptr<CompilationContext> compilation_context;
2451  {
2452  Executor::CgenStateManager cgenstate_manager(*this,
2453  false,
2454  table_infos,
2456  nullptr); // locks compilation_mutex
2458  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2459  compilation_context =
2460  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2461  }
2462  exe_context.execute(exe_unit,
2463  table_infos,
2464  compilation_context,
2465  column_fetcher,
2467  this,
2468  true /* is_pre_launch_udtf */);
2469  }
2470  std::shared_ptr<CompilationContext> compilation_context;
2471  {
2472  Executor::CgenStateManager cgenstate_manager(*this,
2473  false,
2474  table_infos,
2476  nullptr); // locks compilation_mutex
2477  TableFunctionCompilationContext tf_compilation_context(this, co);
2478  compilation_context =
2479  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2480  }
2481  return exe_context.execute(exe_unit,
2482  table_infos,
2483  compilation_context,
2484  column_fetcher,
2485  co.device_type,
2486  this,
2487  false /* is_pre_launch_udtf */);
2488 }
2489 
2491  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2492 }
2493 
2495  const RelAlgExecutionUnit& ra_exe_unit,
2496  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2497  TransientDictIdVisitor dict_id_visitor;
2498 
2499  auto visit_expr =
2500  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2501  if (!expr) {
2502  return;
2503  }
2504  const auto& dict_key = dict_id_visitor.visit(expr);
2505  if (dict_key.dict_id >= 0) {
2506  auto sdp = getStringDictionaryProxy(dict_key, row_set_mem_owner, true);
2507  CHECK(sdp);
2508  TransientStringLiteralsVisitor visitor(sdp, this);
2509  visitor.visit(expr);
2510  }
2511  };
2512 
2513  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2514  visit_expr(group_expr.get());
2515  }
2516 
2517  for (const auto& group_expr : ra_exe_unit.quals) {
2518  visit_expr(group_expr.get());
2519  }
2520 
2521  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2522  visit_expr(group_expr.get());
2523  }
2524 
2525  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2526  const auto& target_type = target_expr->get_type_info();
2527  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2528  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2529  if (agg_expr) {
2530  // The following agg types require taking into account transient string values
2531  if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() == kSINGLE_VALUE ||
2532  agg_expr->get_aggtype() == kSAMPLE || agg_expr->get_aggtype() == kMODE) {
2533  visit_expr(agg_expr->get_arg());
2534  }
2535  } else {
2536  visit_expr(target_expr);
2537  }
2538  }
2539  };
2540  const auto& target_exprs = ra_exe_unit.target_exprs;
2541  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2542  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2543  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2544 }
2545 
2547  const RelAlgExecutionUnit& ra_exe_unit,
2548  const ExecutorDeviceType requested_device_type) {
2549  if (!getDataMgr()->gpusPresent()) {
2550  return ExecutorDeviceType::CPU;
2551  }
2552  for (const auto target_expr : ra_exe_unit.target_exprs) {
2553  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2554  if (!ra_exe_unit.groupby_exprs.empty() &&
2555  !isArchPascalOrLater(requested_device_type)) {
2556  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM ||
2557  agg_info.agg_kind == kSUM_IF) &&
2558  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2559  return ExecutorDeviceType::CPU;
2560  }
2561  }
2562  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2563  return ExecutorDeviceType::CPU;
2564  }
2565  }
2566  return requested_device_type;
2567 }
2568 
2569 namespace {
2570 
2571 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2572  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2573  if (ti.is_fp()) {
2574  if (float_argument_input && ti.get_type() == kFLOAT) {
2575  int64_t float_null_val = 0;
2576  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2577  static_cast<float>(inline_fp_null_val(ti));
2578  return float_null_val;
2579  }
2580  const auto double_null_val = inline_fp_null_val(ti);
2581  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2582  }
2583  return inline_int_null_val(ti);
2584 }
2585 
2586 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2587  std::vector<int64_t>& entry,
2588  const std::vector<Analyzer::Expr*>& target_exprs,
2590  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2591  const auto target_expr = target_exprs[target_idx];
2592  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2593  CHECK(agg_info.is_agg);
2594  target_infos.push_back(agg_info);
2595  if (g_cluster) {
2596  const auto executor = query_mem_desc.getExecutor();
2597  CHECK(executor);
2598  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2599  CHECK(row_set_mem_owner);
2600  const auto& count_distinct_desc =
2601  query_mem_desc.getCountDistinctDescriptor(target_idx);
2602  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2603  CHECK(row_set_mem_owner);
2604  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2605  count_distinct_desc.bitmapPaddedSizeBytes(),
2606  /*thread_idx=*/0); // TODO: can we detect thread idx here?
2607  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2608  continue;
2609  }
2610  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2611  auto count_distinct_set = new CountDistinctSet();
2612  CHECK(row_set_mem_owner);
2613  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2614  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2615  continue;
2616  }
2617  }
2618  const bool float_argument_input = takes_float_argument(agg_info);
2619  if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2620  entry.push_back(0);
2621  } else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2622  entry.push_back(0);
2623  entry.push_back(0);
2624  } else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2625  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2626  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2627  entry.push_back(0);
2628  }
2629  } else if (agg_info.sql_type.is_varlen()) {
2630  entry.push_back(0);
2631  entry.push_back(0);
2632  } else {
2633  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2634  }
2635  } else {
2636  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2637  }
2638  }
2639 }
2640 
2642  const std::vector<Analyzer::Expr*>& target_exprs_in,
2644  const ExecutorDeviceType device_type) {
2645  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2646  std::vector<Analyzer::Expr*> target_exprs;
2647  for (const auto target_expr : target_exprs_in) {
2648  const auto target_expr_copy =
2649  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2650  CHECK(target_expr_copy);
2651  auto ti = target_expr->get_type_info();
2652  ti.set_notnull(false);
2653  target_expr_copy->set_type_info(ti);
2654  if (target_expr_copy->get_arg()) {
2655  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2656  arg_ti.set_notnull(false);
2657  target_expr_copy->get_arg()->set_type_info(arg_ti);
2658  }
2659  target_exprs_owned_copies.push_back(target_expr_copy);
2660  target_exprs.push_back(target_expr_copy.get());
2661  }
2662  std::vector<TargetInfo> target_infos;
2663  std::vector<int64_t> entry;
2664  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2665  const auto executor = query_mem_desc.getExecutor();
2666  CHECK(executor);
2667  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2668  CHECK(row_set_mem_owner);
2669  auto rs = std::make_shared<ResultSet>(target_infos,
2670  device_type,
2672  row_set_mem_owner,
2673  executor->blockSize(),
2674  executor->gridSize());
2675  rs->allocateStorage();
2676  rs->fillOneEntry(entry);
2677  return rs;
2678 }
2679 
2680 } // namespace
2681 
2683  SharedKernelContext& shared_context,
2684  const RelAlgExecutionUnit& ra_exe_unit,
2686  const ExecutorDeviceType device_type,
2687  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2688  auto timer = DEBUG_TIMER(__func__);
2689  auto& result_per_device = shared_context.getFragmentResults();
2690  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2693  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2694  }
2695  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2696  try {
2697  return reduceSpeculativeTopN(
2698  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2699  } catch (const std::bad_alloc&) {
2700  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2701  }
2702  }
2703  const auto shard_count =
2704  device_type == ExecutorDeviceType::GPU
2706  : 0;
2707 
2708  if (shard_count && !result_per_device.empty()) {
2709  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit, device_type);
2710  }
2711  return reduceMultiDeviceResults(
2712  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2713 }
2714 
2715 namespace {
2726 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2727  const QueryMemoryDescriptor& input_query_mem_desc,
2728  const ResultSetStorage* output_storage,
2729  size_t output_row_index,
2730  const QueryMemoryDescriptor& output_query_mem_desc,
2731  const std::vector<uint32_t>& top_permutation) {
2732  const auto output_buffer = output_storage->getUnderlyingBuffer();
2733  const auto input_buffer = input_storage->getUnderlyingBuffer();
2734  for (const auto sorted_idx : top_permutation) {
2735  // permuting all group-columns in this result set into the final buffer:
2736  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2737  group_idx++) {
2738  const auto input_column_ptr =
2739  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2740  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2741  const auto output_column_ptr =
2742  output_buffer +
2743  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2744  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2745  memcpy(output_column_ptr,
2746  input_column_ptr,
2747  output_query_mem_desc.groupColWidth(group_idx));
2748  }
2749  // permuting all agg-columns in this result set into the final buffer:
2750  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2751  slot_idx++) {
2752  const auto input_column_ptr =
2753  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2754  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2755  const auto output_column_ptr =
2756  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2757  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2758  memcpy(output_column_ptr,
2759  input_column_ptr,
2760  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2761  }
2762  ++output_row_index;
2763  }
2764  return output_row_index;
2765 }
2766 
2776 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2777  const ResultSetStorage* output_storage,
2778  size_t output_row_index,
2779  const QueryMemoryDescriptor& output_query_mem_desc,
2780  const std::vector<uint32_t>& top_permutation) {
2781  const auto output_buffer = output_storage->getUnderlyingBuffer();
2782  const auto input_buffer = input_storage->getUnderlyingBuffer();
2783  for (const auto sorted_idx : top_permutation) {
2784  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2785  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2786  row_ptr,
2787  output_query_mem_desc.getRowSize());
2788  ++output_row_index;
2789  }
2790  return output_row_index;
2791 }
2792 } // namespace
2793 
2794 // Collect top results from each device, stitch them together and sort. Partial
2795 // results from each device are guaranteed to be disjunct because we only go on
2796 // this path when one of the columns involved is a shard key.
2798  SharedKernelContext& shared_context,
2799  const RelAlgExecutionUnit& ra_exe_unit,
2800  const ExecutorDeviceType device_type) const {
2801  auto& result_per_device = shared_context.getFragmentResults();
2802  const auto first_result_set = result_per_device.front().first;
2803  CHECK(first_result_set);
2804  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2805  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2806  const auto top_n =
2807  ra_exe_unit.sort_info.limit.value_or(0) + ra_exe_unit.sort_info.offset;
2808  top_query_mem_desc.setEntryCount(0);
2809  for (auto& result : result_per_device) {
2810  const auto result_set = result.first;
2811  CHECK(result_set);
2812  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, device_type, this);
2813  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2814  top_query_mem_desc.setEntryCount(new_entry_cnt);
2815  }
2816  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2817  first_result_set->getDeviceType(),
2818  top_query_mem_desc,
2819  first_result_set->getRowSetMemOwner(),
2820  blockSize(),
2821  gridSize());
2822  auto top_storage = top_result_set->allocateStorage();
2823  size_t top_output_row_idx{0};
2824  for (auto& result : result_per_device) {
2825  const auto result_set = result.first;
2826  CHECK(result_set);
2827  const auto& top_permutation = result_set->getPermutationBuffer();
2828  CHECK_LE(top_permutation.size(), top_n);
2829  if (top_query_mem_desc.didOutputColumnar()) {
2830  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2831  result_set->getQueryMemDesc(),
2832  top_storage,
2833  top_output_row_idx,
2834  top_query_mem_desc,
2835  top_permutation);
2836  } else {
2837  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2838  top_storage,
2839  top_output_row_idx,
2840  top_query_mem_desc,
2841  top_permutation);
2842  }
2843  }
2844  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2845  return top_result_set;
2846 }
2847 
2848 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2850  std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2851  const auto& join_info = plan_state_->join_info_;
2852  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2853  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2854  const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2855  id_to_cond.insert(
2856  std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2857  }
2858  return id_to_cond;
2859 }
2860 
2861 namespace {
2862 
2863 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2864  for (const auto& col : fetched_cols) {
2865  if (col.is_lazily_fetched) {
2866  return true;
2867  }
2868  }
2869  return false;
2870 }
2871 
2872 } // namespace
2873 
2874 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2875  SharedKernelContext& shared_context,
2876  const RelAlgExecutionUnit& ra_exe_unit,
2877  ColumnFetcher& column_fetcher,
2878  const std::vector<InputTableInfo>& table_infos,
2879  const ExecutionOptions& eo,
2880  const bool is_agg,
2881  const bool allow_single_frag_table_opt,
2882  const size_t context_count,
2883  const QueryCompilationDescriptor& query_comp_desc,
2885  RenderInfo* render_info,
2886  std::unordered_set<int>& available_gpus,
2887  int& available_cpus) {
2888  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2889 
2890  QueryFragmentDescriptor fragment_descriptor(
2891  ra_exe_unit,
2892  table_infos,
2893  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2895  : std::vector<Data_Namespace::MemoryInfo>{},
2898  CHECK(!ra_exe_unit.input_descs.empty());
2899 
2900  const auto device_type = query_comp_desc.getDeviceType();
2901  const bool uses_lazy_fetch =
2902  plan_state_->allow_lazy_fetch_ &&
2904  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2905  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2906  const auto device_count = deviceCount(device_type);
2907  CHECK_GT(device_count, 0);
2908 
2909  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2910  shared_context.getFragOffsets(),
2911  device_count,
2912  device_type,
2913  use_multifrag_kernel,
2915  this);
2916  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2917  checkWorkUnitWatchdog(ra_exe_unit, table_infos, device_type, device_count);
2918  }
2919 
2920  if (use_multifrag_kernel) {
2921  VLOG(1) << "Creating multifrag execution kernels";
2922  VLOG(1) << query_mem_desc.toString();
2923 
2924  // NB: We should never be on this path when the query is retried because of running
2925  // out of group by slots; also, for scan only queries on CPU we want the
2926  // high-granularity, fragment by fragment execution instead. For scan only queries on
2927  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2928  // buffer per fragment.
2929  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2930  &execution_kernels,
2931  &column_fetcher,
2932  &eo,
2933  &query_comp_desc,
2934  &query_mem_desc,
2935  render_info](const int device_id,
2936  const FragmentsList& frag_list,
2937  const int64_t rowid_lookup_key) {
2938  execution_kernels.emplace_back(
2939  std::make_unique<ExecutionKernel>(ra_exe_unit,
2941  device_id,
2942  eo,
2943  column_fetcher,
2944  query_comp_desc,
2945  query_mem_desc,
2946  frag_list,
2948  render_info,
2949  rowid_lookup_key));
2950  };
2951  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2952  } else {
2953  VLOG(1) << "Creating one execution kernel per fragment";
2954  VLOG(1) << query_mem_desc.toString();
2955 
2956  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2958  table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2959  const auto max_frag_size =
2960  table_infos.front().info.getFragmentNumTuplesUpperBound();
2961  if (max_frag_size < query_mem_desc.getEntryCount()) {
2962  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2963  << " to match max fragment size " << max_frag_size
2964  << " for kernel per fragment execution path.";
2965  throw CompilationRetryNewScanLimit(max_frag_size);
2966  }
2967  }
2968 
2969  size_t frag_list_idx{0};
2970  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2971  &execution_kernels,
2972  &column_fetcher,
2973  &eo,
2974  &frag_list_idx,
2975  &device_type,
2976  &query_comp_desc,
2977  &query_mem_desc,
2978  render_info](const int device_id,
2979  const FragmentsList& frag_list,
2980  const int64_t rowid_lookup_key) {
2981  if (!frag_list.size()) {
2982  return;
2983  }
2984  CHECK_GE(device_id, 0);
2985 
2986  execution_kernels.emplace_back(
2987  std::make_unique<ExecutionKernel>(ra_exe_unit,
2988  device_type,
2989  device_id,
2990  eo,
2991  column_fetcher,
2992  query_comp_desc,
2993  query_mem_desc,
2994  frag_list,
2996  render_info,
2997  rowid_lookup_key));
2998  ++frag_list_idx;
2999  };
3000 
3001  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
3002  ra_exe_unit);
3003  }
3004 
3005  return execution_kernels;
3006 }
3007 
3009  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3010  const ExecutorDeviceType device_type,
3011  const size_t requested_num_threads) {
3012 #ifdef HAVE_TBB
3013  const size_t num_threads =
3014  requested_num_threads == Executor::auto_num_threads
3015  ? std::min(kernels.size(), static_cast<size_t>(cpu_threads()))
3016  : requested_num_threads;
3017  tbb::task_arena local_arena(num_threads);
3018 #else
3019  const size_t num_threads = cpu_threads();
3020 #endif
3021  shared_context.setNumAllocatedThreads(num_threads);
3022  LOG(EXECUTOR) << "Launching query step with " << num_threads << " threads.";
3024  // A hack to have unused unit for results collection.
3025  const RelAlgExecutionUnit* ra_exe_unit =
3026  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
3027 
3028 #ifdef HAVE_TBB
3029  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
3030  shared_context.setThreadPool(&tg);
3031  }
3032  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
3033 #endif // HAVE_TBB
3034 
3035  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
3036  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s)
3037  << " using pool of " << num_threads << " threads.";
3038  size_t kernel_idx = 1;
3039 
3040  for (auto& kernel : kernels) {
3041  CHECK(kernel.get());
3042 #ifdef HAVE_TBB
3043  local_arena.execute([&] {
3044 #endif
3045  tg.run([this,
3046  &kernel,
3047  &shared_context,
3048  parent_thread_local_ids = logger::thread_local_ids(),
3049  num_threads,
3050  crt_kernel_idx = kernel_idx++] {
3051  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
3052  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
3053  // Keep monotonicity of thread_idx by kernel launch time, so that optimizations
3054  // such as launching kernels with data already in pool first become possible
3055 #ifdef HAVE_TBB
3056  const size_t old_thread_idx = crt_kernel_idx % num_threads;
3057  const size_t thread_idx = tbb::this_task_arena::current_thread_index();
3058  LOG(EXECUTOR) << "Thread idx: " << thread_idx
3059  << " Old thread idx: " << old_thread_idx;
3060 #else
3061  const size_t thread_idx = crt_kernel_idx % num_threads;
3062 #endif
3063  kernel->run(this, thread_idx, shared_context);
3064  });
3065 #ifdef HAVE_TBB
3066  }); // local_arena.execute[&]
3067 #endif
3068  }
3069 #ifdef HAVE_TBB
3070  local_arena.execute([&] { tg.wait(); });
3071 #else
3072  tg.wait();
3073 #endif
3074 
3075  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
3076  // The first arg is used for GPU only, it's not our case.
3077  // TODO: add QueryExecutionContext::getRowSet() interface
3078  // for our case.
3079  if (exec_ctx) {
3080  ResultSetPtr results;
3081  if (ra_exe_unit->estimator) {
3082  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
3083  } else {
3084  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
3085  }
3086  shared_context.addDeviceResults(std::move(results), {});
3087  }
3088  }
3089 }
3090 
3092  SharedKernelContext& shared_context,
3093  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3094  const ExecutorDeviceType device_type) {
3095  auto clock_begin = timer_start();
3096  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
3097  kernel_queue_time_ms_ += timer_stop(clock_begin);
3098 
3100  shared_context, std::move(kernels), device_type, Executor::auto_num_threads);
3101 }
3102 
3104  SharedKernelContext& shared_context,
3105  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3106  const ExecutorDeviceType device_type,
3107  const std::vector<InputDescriptor>& input_descs,
3109  // CPU queries in general, plus some GPU queries, i.e. certain types of top-k sorts,
3110  // can generate more kernels than cores/GPU devices, so allow handle this for now
3111  // by capping the number of requested slots from GPU than actual GPUs
3112  const size_t num_kernels = kernels.size();
3113  constexpr bool cap_slots = false;
3114  const size_t num_compute_slots =
3115  cap_slots
3116  ? std::min(num_kernels,
3118  ->get_resource_info(
3119  device_type == ExecutorDeviceType::GPU
3122  .second)
3123  : num_kernels;
3124  const size_t cpu_result_mem_bytes_per_kernel =
3125  query_mem_desc.getBufferSizeBytes(device_type);
3126 
3127  std::vector<std::pair<int32_t, FragmentsList>> kernel_fragments_list;
3128  kernel_fragments_list.reserve(num_kernels);
3129  for (auto& kernel : kernels) {
3130  const auto device_id = kernel->get_chosen_device_id();
3131  const auto frag_list = kernel->get_fragment_list();
3132  if (!frag_list.empty()) {
3133  kernel_fragments_list.emplace_back(std::make_pair(device_id, frag_list));
3134  }
3135  }
3136  const auto chunk_request_info = getChunkRequestInfo(
3137  device_type, input_descs, shared_context.getQueryInfos(), kernel_fragments_list);
3138 
3139  auto gen_resource_request_info = [device_type,
3140  num_compute_slots,
3141  cpu_result_mem_bytes_per_kernel,
3142  &chunk_request_info,
3143  &query_mem_desc]() {
3144  if (device_type == ExecutorDeviceType::GPU) {
3146  device_type,
3147  static_cast<size_t>(0), // priority_level
3148  static_cast<size_t>(0), // cpu_slots
3149  static_cast<size_t>(0), // min_cpu_slots,
3150  num_compute_slots, // gpu_slots
3151  num_compute_slots, // min_gpu_slots
3152  cpu_result_mem_bytes_per_kernel * num_compute_slots, // cpu_result_mem,
3153  cpu_result_mem_bytes_per_kernel * num_compute_slots, // min_cpu_result_mem,
3154  chunk_request_info, // chunks needed
3155  false); // output_buffers_reusable_intra_thrad
3156  } else {
3157  const size_t min_cpu_slots{1};
3158  const size_t min_cpu_result_mem =
3159  query_mem_desc.threadsCanReuseGroupByBuffers()
3160  ? cpu_result_mem_bytes_per_kernel * min_cpu_slots
3161  : cpu_result_mem_bytes_per_kernel * num_compute_slots;
3163  device_type,
3164  static_cast<size_t>(0), // priority_level
3165  num_compute_slots, // cpu_slots
3166  min_cpu_slots, // min_cpu_slots
3167  size_t(0), // gpu_slots
3168  size_t(0), // min_gpu_slots
3169  cpu_result_mem_bytes_per_kernel * num_compute_slots, // cpu_result_mem
3170  min_cpu_result_mem, // min_cpu_result_mem
3171  chunk_request_info, // chunks needed
3172  query_mem_desc
3173  .threadsCanReuseGroupByBuffers()); // output_buffers_reusable_intra_thread
3174  }
3175  };
3176 
3177  const auto resource_request_info = gen_resource_request_info();
3178 
3179  auto clock_begin = timer_start();
3180  const bool is_empty_request =
3181  resource_request_info.cpu_slots == 0UL && resource_request_info.gpu_slots == 0UL;
3182  auto resource_handle =
3183  is_empty_request ? nullptr
3184  : executor_resource_mgr_->request_resources(resource_request_info);
3185  const auto num_cpu_threads =
3186  is_empty_request ? 0UL : resource_handle->get_resource_grant().cpu_slots;
3187  if (device_type == ExecutorDeviceType::GPU) {
3188  const auto num_gpu_slots =
3189  is_empty_request ? 0UL : resource_handle->get_resource_grant().gpu_slots;
3190  VLOG(1) << "In Executor::LaunchKernels executor " << getExecutorId() << " requested "
3191  << "between " << resource_request_info.min_gpu_slots << " and "
3192  << resource_request_info.gpu_slots << " GPU slots, and was granted "
3193  << num_gpu_slots << " GPU slots.";
3194  } else {
3195  VLOG(1) << "In Executor::LaunchKernels executor " << getExecutorId() << " requested "
3196  << "between " << resource_request_info.min_cpu_slots << " and "
3197  << resource_request_info.cpu_slots << " CPU slots, and was granted "
3198  << num_cpu_threads << " CPU slots.";
3199  }
3200  kernel_queue_time_ms_ += timer_stop(clock_begin);
3201  launchKernelsImpl(shared_context, std::move(kernels), device_type, num_cpu_threads);
3202 }
3203 
3205  const RelAlgExecutionUnit& ra_exe_unit,
3206  const ExecutorDeviceType device_type,
3207  const size_t table_idx,
3208  const size_t outer_frag_idx,
3209  std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
3210  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3211  inner_table_id_to_join_condition) {
3212  const auto& table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
3213  auto table_frags_it = selected_tables_fragments.find(table_key);
3214  CHECK(table_frags_it != selected_tables_fragments.end());
3215  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
3216  const auto outer_table_fragments_it =
3217  selected_tables_fragments.find(outer_input_desc.getTableKey());
3218  const auto outer_table_fragments = outer_table_fragments_it->second;
3219  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
3220  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
3221  if (!table_idx) {
3222  return {outer_frag_idx};
3223  }
3224  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
3225  auto& inner_frags = table_frags_it->second;
3226  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
3227  std::vector<size_t> all_frag_ids;
3228  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
3229  ++inner_frag_idx) {
3230  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
3231  if (skipFragmentPair(outer_fragment_info,
3232  inner_frag_info,
3233  table_idx,
3234  inner_table_id_to_join_condition,
3235  ra_exe_unit,
3236  device_type)) {
3237  continue;
3238  }
3239  all_frag_ids.push_back(inner_frag_idx);
3240  }
3241  return all_frag_ids;
3242 }
3243 
3244 // Returns true iff the join between two fragments cannot yield any results, per
3245 // shard information. The pair can be skipped to avoid full broadcast.
3247  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
3248  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
3249  const int table_idx,
3250  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3251  inner_table_id_to_join_condition,
3252  const RelAlgExecutionUnit& ra_exe_unit,
3253  const ExecutorDeviceType device_type) {
3254  if (device_type != ExecutorDeviceType::GPU) {
3255  return false;
3256  }
3257  CHECK(table_idx >= 0 &&
3258  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
3259  const auto& inner_table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
3260  // Both tables need to be sharded the same way.
3261  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
3262  outer_fragment_info.shard == inner_fragment_info.shard) {
3263  return false;
3264  }
3265  const Analyzer::BinOper* join_condition{nullptr};
3266  if (ra_exe_unit.join_quals.empty()) {
3267  CHECK(!inner_table_id_to_join_condition.empty());
3268  auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
3269  CHECK(condition_it != inner_table_id_to_join_condition.end());
3270  join_condition = condition_it->second;
3271  CHECK(join_condition);
3272  } else {
3273  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
3274  plan_state_->join_info_.join_hash_tables_.size());
3275  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
3276  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
3277  table_idx) {
3278  CHECK(!join_condition);
3279  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
3280  }
3281  }
3282  }
3283  if (!join_condition) {
3284  return false;
3285  }
3286  // TODO(adb): support fragment skipping based on the bounding box intersect operator
3287  if (join_condition->is_bbox_intersect_oper()) {
3288  return false;
3289  }
3290  size_t shard_count{0};
3291  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
3292  join_condition->get_left_operand())) {
3293  auto inner_outer_pairs =
3294  HashJoin::normalizeColumnPairs(join_condition, getTemporaryTables()).first;
3296  join_condition, this, inner_outer_pairs);
3297  } else {
3298  shard_count = get_shard_count(join_condition, this);
3299  }
3300  if (shard_count && !ra_exe_unit.join_quals.empty()) {
3301  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
3302  }
3303  return shard_count;
3304 }
3305 
3306 namespace {
3307 
3309  const auto& table_key = col_desc->getScanDesc().getTableKey();
3310  const auto col_id = col_desc->getColId();
3311  return get_column_descriptor_maybe({table_key, col_id});
3312 }
3313 
3314 } // namespace
3315 
3316 std::map<shared::TableKey, std::vector<uint64_t>> get_table_id_to_frag_offsets(
3317  const std::vector<InputDescriptor>& input_descs,
3318  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3319  std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
3320  for (auto& desc : input_descs) {
3321  const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
3322  CHECK(fragments_it != all_tables_fragments.end());
3323  const auto& fragments = *fragments_it->second;
3324  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
3325  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
3326  frag_offsets[i] = off;
3327  off += fragments[i].getNumTuples();
3328  }
3329  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
3330  }
3331  return tab_id_to_frag_offsets;
3332 }
3333 
3334 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
3336  const RelAlgExecutionUnit& ra_exe_unit,
3337  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
3338  const std::vector<InputDescriptor>& input_descs,
3339  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3340  std::vector<std::vector<int64_t>> all_num_rows;
3341  std::vector<std::vector<uint64_t>> all_frag_offsets;
3342  const auto tab_id_to_frag_offsets =
3343  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
3344  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
3345  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3346  std::vector<int64_t> num_rows;
3347  std::vector<uint64_t> frag_offsets;
3348  if (!ra_exe_unit.union_all) {
3349  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
3350  }
3351  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
3352  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
3353  const auto fragments_it =
3354  all_tables_fragments.find(input_descs[tab_idx].getTableKey());
3355  CHECK(fragments_it != all_tables_fragments.end());
3356  const auto& fragments = *fragments_it->second;
3357  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
3358  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
3359  const auto& fragment = fragments[frag_id];
3360  num_rows.push_back(fragment.getNumTuples());
3361  } else {
3362  size_t total_row_count{0};
3363  for (const auto& fragment : fragments) {
3364  total_row_count += fragment.getNumTuples();
3365  }
3366  num_rows.push_back(total_row_count);
3367  }
3368  const auto frag_offsets_it =
3369  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
3370  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
3371  const auto& offsets = frag_offsets_it->second;
3372  CHECK_LT(frag_id, offsets.size());
3373  frag_offsets.push_back(offsets[frag_id]);
3374  }
3375  all_num_rows.push_back(num_rows);
3376  // Fragment offsets of outer table should be ONLY used by rowid for now.
3377  all_frag_offsets.push_back(frag_offsets);
3378  }
3379  return {all_num_rows, all_frag_offsets};
3380 }
3381 
3382 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
3383 // all the table fragments.
3385  const RelAlgExecutionUnit& ra_exe_unit,
3386  const FragmentsList& selected_fragments) const {
3387  const auto& input_descs = ra_exe_unit.input_descs;
3388  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
3389  if (nest_level < 1 ||
3390  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
3391  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
3392  (ra_exe_unit.join_quals.empty() &&
3393  plan_state_->isLazyFetchColumn(inner_col_desc))) {
3394  return false;
3395  }
3396  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
3397  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3398  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3399  const auto& fragments = selected_fragments[nest_level].fragment_ids;
3400  return fragments.size() > 1;
3401 }
3402 
3404  const ColumnDescriptor* cd,
3405  const InputColDescriptor& inner_col_desc,
3406  const RelAlgExecutionUnit& ra_exe_unit,
3407  const FragmentsList& selected_fragments,
3408  const Data_Namespace::MemoryLevel memory_level) const {
3409  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
3410  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
3411  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3412  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3413  const auto& fragments = selected_fragments[nest_level].fragment_ids;
3414  auto need_linearize =
3415  cd->columnType.is_array() ||
3417  return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
3418 }
3419 
3420 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
3421  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
3422  << " num_rows" << shared::printContainer(fetch_result.num_rows)
3423  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
3424 }
3425 
3427  const ColumnFetcher& column_fetcher,
3428  const RelAlgExecutionUnit& ra_exe_unit,
3429  const int device_id,
3430  const Data_Namespace::MemoryLevel memory_level,
3431  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3432  const FragmentsList& selected_fragments,
3433  std::list<ChunkIter>& chunk_iterators,
3434  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3435  DeviceAllocator* device_allocator,
3436  const size_t thread_idx,
3437  const bool allow_runtime_interrupt) {
3438  auto timer = DEBUG_TIMER(__func__);
3440  const auto& col_global_ids = ra_exe_unit.input_col_descs;
3441  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3442  std::vector<size_t> local_col_to_frag_pos;
3443  buildSelectedFragsMapping(selected_fragments_crossjoin,
3444  local_col_to_frag_pos,
3445  col_global_ids,
3446  selected_fragments,
3447  ra_exe_unit);
3448 
3450  selected_fragments_crossjoin);
3451  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3452  std::vector<std::vector<int64_t>> all_num_rows;
3453  std::vector<std::vector<uint64_t>> all_frag_offsets;
3454  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3455  std::vector<const int8_t*> frag_col_buffers(
3456  plan_state_->global_to_local_col_ids_.size());
3457  for (const auto& col_id : col_global_ids) {
3458  if (allow_runtime_interrupt) {
3459  bool isInterrupted = false;
3460  {
3463  const auto query_session = getCurrentQuerySession(session_read_lock);
3464  isInterrupted =
3465  checkIsQuerySessionInterrupted(query_session, session_read_lock);
3466  }
3467  if (isInterrupted) {
3469  }
3470  }
3471  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3473  }
3474  CHECK(col_id);
3475  const auto cd = try_get_column_descriptor(col_id.get());
3476  if (cd && cd->isVirtualCol) {
3477  CHECK_EQ("rowid", cd->columnName);
3478  continue;
3479  }
3480  const auto& table_key = col_id->getScanDesc().getTableKey();
3481  const auto fragments_it = all_tables_fragments.find(table_key);
3482  CHECK(fragments_it != all_tables_fragments.end());
3483  const auto fragments = fragments_it->second;
3484  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3485  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3486  CHECK_LT(static_cast<size_t>(it->second),
3487  plan_state_->global_to_local_col_ids_.size());
3488  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3489  if (!fragments->size()) {
3490  return {};
3491  }
3492  CHECK_LT(frag_id, fragments->size());
3493  auto memory_level_for_column = memory_level;
3494  const shared::ColumnKey tbl_col_key{col_id->getScanDesc().getTableKey(),
3495  col_id->getColId()};
3496  if (!plan_state_->isColumnToFetch(tbl_col_key)) {
3497  memory_level_for_column = Data_Namespace::CPU_LEVEL;
3498  }
3499  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3500  frag_col_buffers[it->second] =
3501  column_fetcher.getResultSetColumn(col_id.get(),
3502  memory_level_for_column,
3503  device_id,
3504  device_allocator,
3505  thread_idx);
3506  } else {
3507  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3508  // determine if we need special treatment to linearlize multi-frag table
3509  // i.e., a column that is classified as varlen type, i.e., array
3510  // for now, we only support fixed-length array that contains
3511  // geo point coordianates but we can support more types in this way
3513  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3514  bool for_lazy_fetch = false;
3515  if (plan_state_->isColumnToNotFetch(tbl_col_key)) {
3516  for_lazy_fetch = true;
3517  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3518  << ", col_name: " << cd->columnName << ")";
3519  }
3520  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3521  col_id->getScanDesc().getTableKey(),
3522  col_id->getColId(),
3523  all_tables_fragments,
3524  chunks,
3525  chunk_iterators,
3526  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3527  for_lazy_fetch ? 0 : device_id,
3528  device_allocator,
3529  thread_idx);
3530  } else {
3531  frag_col_buffers[it->second] = column_fetcher.getAllTableColumnFragments(
3532  col_id->getScanDesc().getTableKey(),
3533  col_id->getColId(),
3534  all_tables_fragments,
3535  memory_level_for_column,
3536  device_id,
3537  device_allocator,
3538  thread_idx);
3539  }
3540  } else {
3541  frag_col_buffers[it->second] = column_fetcher.getOneTableColumnFragment(
3542  col_id->getScanDesc().getTableKey(),
3543  frag_id,
3544  col_id->getColId(),
3545  all_tables_fragments,
3546  chunks,
3547  chunk_iterators,
3548  memory_level_for_column,
3549  device_id,
3550  device_allocator);
3551  }
3552  }
3553  }
3554  all_frag_col_buffers.push_back(frag_col_buffers);
3555  }
3556  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3557  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3558  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3559 }
3560 
3561 namespace {
3563  std::vector<InputDescriptor> const& input_descs) {
3564  auto const has_table_key = [&table_key](InputDescriptor const& input_desc) {
3565  return table_key == input_desc.getTableKey();
3566  };
3567  return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3568  input_descs.begin();
3569 }
3570 
3572  const shared::TableKey& table_key,
3573  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3574  auto const has_table_key = [&table_key](auto const& input_desc) {
3575  return table_key == input_desc->getScanDesc().getTableKey();
3576  };
3577  return std::distance(
3578  input_col_descs.begin(),
3579  std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3580 }
3581 
3582 std::list<std::shared_ptr<const InputColDescriptor>> get_selected_input_col_descs(
3583  const shared::TableKey& table_key,
3584  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3585  std::list<std::shared_ptr<const InputColDescriptor>> selected;
3586  for (auto const& input_col_desc : input_col_descs) {
3587  if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3588  selected.push_back(input_col_desc);
3589  }
3590  }
3591  return selected;
3592 }
3593 
3594 // Set N consecutive elements of frag_col_buffers to ptr in the range of local_col_id.
3595 void set_mod_range(std::vector<int8_t const*>& frag_col_buffers,
3596  int8_t const* const ptr,
3597  size_t const local_col_id,
3598  size_t const N) {
3599  size_t const begin = local_col_id - local_col_id % N; // N divides begin
3600  size_t const end = begin + N;
3601  CHECK_LE(end, frag_col_buffers.size()) << (void*)ptr << ' ' << local_col_id << ' ' << N;
3602  for (size_t i = begin; i < end; ++i) {
3603  frag_col_buffers[i] = ptr;
3604  }
3605 }
3606 } // namespace
3607 
3608 // fetchChunks() assumes that multiple inputs implies a JOIN.
3609 // fetchUnionChunks() assumes that multiple inputs implies a UNION ALL.
3611  const ColumnFetcher& column_fetcher,
3612  const RelAlgExecutionUnit& ra_exe_unit,
3613  const int device_id,
3614  const Data_Namespace::MemoryLevel memory_level,
3615  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3616  const FragmentsList& selected_fragments,
3617  std::list<ChunkIter>& chunk_iterators,
3618  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3619  DeviceAllocator* device_allocator,
3620  const size_t thread_idx,
3621  const bool allow_runtime_interrupt) {
3622  auto timer = DEBUG_TIMER(__func__);
3624 
3625  CHECK_EQ(1u, selected_fragments.size());
3626  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3627  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3628  auto const& input_descs = ra_exe_unit.input_descs;
3629  const auto& selected_table_key = selected_fragments.front().table_key;
3630  size_t const input_descs_index =
3631  get_selected_input_descs_index(selected_table_key, input_descs);
3632  CHECK_LT(input_descs_index, input_descs.size());
3633  size_t const input_col_descs_index =
3634  get_selected_input_col_descs_index(selected_table_key, ra_exe_unit.input_col_descs);
3635  CHECK_LT(input_col_descs_index, ra_exe_unit.input_col_descs.size());
3636  VLOG(2) << "selected_table_key=" << selected_table_key
3637  << " input_descs_index=" << input_descs_index
3638  << " input_col_descs_index=" << input_col_descs_index
3639  << " input_descs=" << shared::printContainer(input_descs)
3640  << " ra_exe_unit.input_col_descs="
3641  << shared::printContainer(ra_exe_unit.input_col_descs);
3642 
3643  std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3644  get_selected_input_col_descs(selected_table_key, ra_exe_unit.input_col_descs);
3645  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3646 
3648  selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3649 
3651  selected_fragments_crossjoin);
3652 
3653  if (allow_runtime_interrupt) {
3654  bool isInterrupted = false;
3655  {
3658  const auto query_session = getCurrentQuerySession(session_read_lock);
3659  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3660  }
3661  if (isInterrupted) {
3663  }
3664  }
3665  std::vector<const int8_t*> frag_col_buffers(
3666  plan_state_->global_to_local_col_ids_.size());
3667  for (const auto& col_id : selected_input_col_descs) {
3668  CHECK(col_id);
3669  const auto cd = try_get_column_descriptor(col_id.get());
3670  if (cd && cd->isVirtualCol) {
3671  CHECK_EQ("rowid", cd->columnName);
3672  continue;
3673  }
3674  const auto fragments_it = all_tables_fragments.find(selected_table_key);
3675  CHECK(fragments_it != all_tables_fragments.end());
3676  const auto fragments = fragments_it->second;
3677  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3678  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3679  size_t const local_col_id = it->second;
3680  CHECK_LT(local_col_id, plan_state_->global_to_local_col_ids_.size());
3681  constexpr size_t frag_id = 0;
3682  if (fragments->empty()) {
3683  return {};
3684  }
3685  MemoryLevel const memory_level_for_column =
3686  plan_state_->isColumnToFetch({selected_table_key, col_id->getColId()})
3687  ? memory_level
3689  int8_t const* ptr;
3690  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3691  ptr = column_fetcher.getResultSetColumn(
3692  col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3693  } else if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3694  ptr = column_fetcher.getAllTableColumnFragments(selected_table_key,
3695  col_id->getColId(),
3696  all_tables_fragments,
3697  memory_level_for_column,
3698  device_id,
3699  device_allocator,
3700  thread_idx);
3701  } else {
3702  ptr = column_fetcher.getOneTableColumnFragment(selected_table_key,
3703  frag_id,
3704  col_id->getColId(),
3705  all_tables_fragments,
3706  chunks,
3707  chunk_iterators,
3708  memory_level_for_column,
3709  device_id,
3710  device_allocator);
3711  }
3712  // Set frag_col_buffers[i]=ptr for i in mod input_descs.size() range of local_col_id.
3713  set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3714  }
3715  auto const [num_rows, frag_offsets] = getRowCountAndOffsetForAllFrags(
3716  ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3717 
3718  VLOG(2) << "frag_col_buffers=" << shared::printContainer(frag_col_buffers)
3719  << " num_rows=" << shared::printContainer(num_rows)
3720  << " frag_offsets=" << shared::printContainer(frag_offsets)
3721  << " input_descs_index=" << input_descs_index
3722  << " input_col_descs_index=" << input_col_descs_index;
3723  return {{std::move(frag_col_buffers)},
3724  {{num_rows[0][input_descs_index]}},
3725  {{frag_offsets[0][input_descs_index]}}};
3726 }
3727 
3728 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3729  const size_t scan_idx,
3730  const RelAlgExecutionUnit& ra_exe_unit) {
3731  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3732  scan_idx > 0 &&
3733  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3734  !selected_fragments[scan_idx].fragment_ids.empty()) {
3735  // Fetch all fragments
3736  return {size_t(0)};
3737  }
3738 
3739  return selected_fragments[scan_idx].fragment_ids;
3740 }
3741 
3743  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3744  std::vector<size_t>& local_col_to_frag_pos,
3745  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3746  const FragmentsList& selected_fragments,
3747  const RelAlgExecutionUnit& ra_exe_unit) {
3748  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3749  size_t frag_pos{0};
3750  const auto& input_descs = ra_exe_unit.input_descs;
3751  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3752  const auto& table_key = input_descs[scan_idx].getTableKey();
3753  CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3754  selected_fragments_crossjoin.push_back(
3755  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3756  for (const auto& col_id : col_global_ids) {
3757  CHECK(col_id);
3758  const auto& input_desc = col_id->getScanDesc();
3759  if (input_desc.getTableKey() != table_key ||
3760  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3761  continue;
3762  }
3763  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3764  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3765  CHECK_LT(static_cast<size_t>(it->second),
3766  plan_state_->global_to_local_col_ids_.size());
3767  local_col_to_frag_pos[it->second] = frag_pos;
3768  }
3769  ++frag_pos;
3770  }
3771 }
3772 
3774  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3775  const FragmentsList& selected_fragments,
3776  const RelAlgExecutionUnit& ra_exe_unit) {
3777  const auto& input_descs = ra_exe_unit.input_descs;
3778  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3779  // selected_fragments is set in assignFragsToKernelDispatch execution_kernel.fragments
3780  if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3781  selected_fragments_crossjoin.push_back({size_t(1)});
3782  }
3783  }
3784 }
3785 
3786 namespace {
3787 
3789  public:
3790  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3792  for (auto out : out_vec_) {
3793  delete[] out;
3794  }
3795  }
3796 
3797  private:
3798  std::vector<int64_t*> out_vec_;
3799 };
3800 } // namespace
3801 
3803  const RelAlgExecutionUnit& ra_exe_unit,
3804  const CompilationResult& compilation_result,
3805  const bool hoist_literals,
3806  ResultSetPtr* results,
3807  const std::vector<Analyzer::Expr*>& target_exprs,
3808  const ExecutorDeviceType device_type,
3809  std::vector<std::vector<const int8_t*>>& col_buffers,
3810  QueryExecutionContext* query_exe_context,
3811  const std::vector<std::vector<int64_t>>& num_rows,
3812  const std::vector<std::vector<uint64_t>>& frag_offsets,
3813  Data_Namespace::DataMgr* data_mgr,
3814  const int device_id,
3815  const uint32_t start_rowid,
3816  const uint32_t num_tables,
3817  const bool allow_runtime_interrupt,
3818  RenderInfo* render_info,
3819  const bool optimize_cuda_block_and_grid_sizes,
3820  const int64_t rows_to_process) {
3822  auto timer = DEBUG_TIMER(__func__);
3823  CHECK(!results || !(*results));
3824  if (col_buffers.empty()) {
3825  return 0;
3826  }
3827 
3828  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3829  if (render_info) {
3830  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3831  // here, we are in non-insitu mode.
3832  CHECK(render_info->useCudaBuffers() || !render_info->isInSitu())
3833  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3834  "currently unsupported.";
3835  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3836  }
3837 
3838  int32_t error_code = 0;
3839  std::vector<int64_t*> out_vec;
3840  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3841  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3842  std::unique_ptr<OutVecOwner> output_memory_scope;
3843  if (allow_runtime_interrupt) {
3844  bool isInterrupted = false;
3845  {
3848  const auto query_session = getCurrentQuerySession(session_read_lock);
3849  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3850  }
3851  if (isInterrupted) {
3853  }
3854  }
3855  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3857  }
3858  if (device_type == ExecutorDeviceType::CPU) {
3859  CpuCompilationContext* cpu_generated_code =
3860  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3861  CHECK(cpu_generated_code);
3862  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3863  cpu_generated_code,
3864  hoist_literals,
3865  hoist_buf,
3866  col_buffers,
3867  num_rows,
3868  frag_offsets,
3869  0,
3870  &error_code,
3871  start_rowid,
3872  num_tables,
3873  join_hash_table_ptrs,
3874  rows_to_process);
3875  output_memory_scope.reset(new OutVecOwner(out_vec));
3876  } else {
3877  GpuCompilationContext* gpu_generated_code =
3878  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3879  CHECK(gpu_generated_code);
3880  try {
3881  out_vec = query_exe_context->launchGpuCode(
3882  ra_exe_unit,
3883  gpu_generated_code,
3884  hoist_literals,
3885  hoist_buf,
3886  col_buffers,
3887  num_rows,
3888  frag_offsets,
3889  0,
3890  data_mgr,
3891  blockSize(),
3892  gridSize(),
3893  device_id,
3894  compilation_result.gpu_smem_context.getSharedMemorySize(),
3895  &error_code,
3896  num_tables,
3897  allow_runtime_interrupt,
3898  join_hash_table_ptrs,
3899  render_allocator_map_ptr,
3900  optimize_cuda_block_and_grid_sizes);
3901  output_memory_scope.reset(new OutVecOwner(out_vec));
3902  } catch (const OutOfMemory&) {
3903  return ERR_OUT_OF_GPU_MEM;
3904  } catch (const std::exception& e) {
3905  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3906  }
3907  }
3908  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3909  error_code == Executor::ERR_DIV_BY_ZERO ||
3910  error_code == Executor::ERR_OUT_OF_TIME ||
3911  error_code == Executor::ERR_INTERRUPTED ||
3913  error_code == Executor::ERR_GEOS ||
3915  return error_code;
3916  }
3917  if (ra_exe_unit.estimator) {
3918  CHECK(!error_code);
3919  if (results) {
3920  *results =
3921  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3922  }
3923  return 0;
3924  }
3925  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3926  CHECK(results);
3927  std::vector<int64_t> reduced_outs;
3928  const auto num_frags = col_buffers.size();
3929  const size_t entry_count =
3930  device_type == ExecutorDeviceType::GPU
3931  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3932  ? 1
3933  : blockSize() * gridSize() * num_frags)
3934  : num_frags;
3935  if (size_t(1) == entry_count) {
3936  for (auto out : out_vec) {
3937  CHECK(out);
3938  reduced_outs.push_back(*out);
3939  }
3940  } else {
3941  size_t out_vec_idx = 0;
3942 
3943  for (const auto target_expr : target_exprs) {
3944  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3945  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3946  << target_expr->toString();
3947 
3948  const int num_iterations = agg_info.sql_type.is_geometry()
3949  ? agg_info.sql_type.get_physical_coord_cols()
3950  : 1;
3951 
3952  for (int i = 0; i < num_iterations; i++) {
3953  int64_t val1;
3954  const bool float_argument_input = takes_float_argument(agg_info);
3955  if (is_distinct_target(agg_info) ||
3956  shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3957  bool const check = shared::
3958  is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3959  agg_info.agg_kind);
3960  CHECK(check) << agg_info.agg_kind;
3961  val1 = out_vec[out_vec_idx][0];
3962  error_code = 0;
3963  } else {
3964  const auto chosen_bytes = static_cast<size_t>(
3965  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3966  std::tie(val1, error_code) = Executor::reduceResults(
3967  agg_info.agg_kind,
3968  agg_info.sql_type,
3969  query_exe_context->getAggInitValForIndex(out_vec_idx),
3970  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3971  out_vec[out_vec_idx],
3972  entry_count,
3973  false,
3974  float_argument_input);
3975  }
3976  if (error_code) {
3977  break;
3978  }
3979  reduced_outs.push_back(val1);
3980  if (agg_info.agg_kind == kAVG ||
3981  (agg_info.agg_kind == kSAMPLE &&
3982  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3983  const auto chosen_bytes = static_cast<size_t>(
3984  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3985  1));
3986  int64_t val2;
3987  std::tie(val2, error_code) = Executor::reduceResults(
3988  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3989  agg_info.sql_type,
3990  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3991  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3992  out_vec[out_vec_idx + 1],
3993  entry_count,
3994  false,
3995  false);
3996  if (error_code) {
3997  break;
3998  }
3999  reduced_outs.push_back(val2);
4000  ++out_vec_idx;
4001  }
4002  ++out_vec_idx;
4003  }
4004  }
4005  }
4006 
4007  if (error_code) {
4008  return error_code;
4009  }
4010 
4011  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
4012  auto rows_ptr = std::shared_ptr<ResultSet>(
4013  query_exe_context->query_buffers_->result_sets_[0].release());
4014  rows_ptr->fillOneEntry(reduced_outs);
4015  *results = std::move(rows_ptr);
4016  return error_code;
4017 }
4018 
4019 namespace {
4020 
4021 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
4022  CHECK(scan_limit);
4023  return results && results->rowCount() < scan_limit;
4024 }
4025 
4026 } // namespace
4027 
4029  const RelAlgExecutionUnit& ra_exe_unit,
4030  const CompilationResult& compilation_result,
4031  const bool hoist_literals,
4032  ResultSetPtr* results,
4033  const ExecutorDeviceType device_type,
4034  std::vector<std::vector<const int8_t*>>& col_buffers,
4035  const std::vector<size_t> outer_tab_frag_ids,
4036  QueryExecutionContext* query_exe_context,
4037  const std::vector<std::vector<int64_t>>& num_rows,
4038  const std::vector<std::vector<uint64_t>>& frag_offsets,
4039  Data_Namespace::DataMgr* data_mgr,
4040  const int device_id,
4041  const shared::TableKey& outer_table_key,
4042  const int64_t scan_limit,
4043  const uint32_t start_rowid,
4044  const uint32_t num_tables,
4045  const bool allow_runtime_interrupt,
4046  RenderInfo* render_info,
4047  const bool optimize_cuda_block_and_grid_sizes,
4048  const int64_t rows_to_process) {
4049  auto timer = DEBUG_TIMER(__func__);
4051  // TODO: get results via a separate method, but need to do something with literals.
4052  CHECK(!results || !(*results));
4053  if (col_buffers.empty()) {
4054  return 0;
4055  }
4056  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
4057  // TODO(alex):
4058  // 1. Optimize size (make keys more compact).
4059  // 2. Resize on overflow.
4060  // 3. Optimize runtime.
4061  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
4062  int32_t error_code = 0;
4063  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
4064  if (allow_runtime_interrupt) {
4065  bool isInterrupted = false;
4066  {
4069  const auto query_session = getCurrentQuerySession(session_read_lock);
4070  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
4071  }
4072  if (isInterrupted) {
4074  }
4075  }
4076  if (g_enable_dynamic_watchdog && interrupted_.load()) {
4077  return ERR_INTERRUPTED;
4078  }
4079 
4080  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
4081  if (render_info && render_info->useCudaBuffers()) {
4082  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
4083  }
4084 
4085  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
4086  << " ra_exe_unit.input_descs="
4087  << shared::printContainer(ra_exe_unit.input_descs)
4088  << " ra_exe_unit.input_col_descs="
4089  << shared::printContainer(ra_exe_unit.input_col_descs)
4090  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
4091  << " num_rows=" << shared::printContainer(num_rows)
4092  << " frag_offsets=" << shared::printContainer(frag_offsets)
4093  << " query_exe_context->query_buffers_->num_rows_="
4094  << query_exe_context->query_buffers_->num_rows_
4095  << " query_exe_context->query_mem_desc_.getEntryCount()="
4096  << query_exe_context->query_mem_desc_.getEntryCount()
4097  << " device_id=" << device_id << " outer_table_key=" << outer_table_key
4098  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
4099  << " num_tables=" << num_tables;
4100 
4101  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
4102  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
4103  // with outer_table_id.
4104  if (ra_exe_unit_copy.union_all) {
4105  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
4106  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
4107  ra_exe_unit_copy.input_descs.end(),
4108  [outer_table_key](auto const& a, auto const& b) {
4109  return a.getTableKey() == outer_table_key &&
4110  b.getTableKey() != outer_table_key;
4111  });
4112  while (!ra_exe_unit_copy.input_descs.empty() &&
4113  ra_exe_unit_copy.input_descs.back().getTableKey() != outer_table_key) {
4114  ra_exe_unit_copy.input_descs.pop_back();
4115  }
4116  // Filter ra_exe_unit_copy.input_col_descs.
4117  ra_exe_unit_copy.input_col_descs.remove_if(
4118  [outer_table_key](auto const& input_col_desc) {
4119  return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
4120  });
4121  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
4122  }
4123 
4124  if (device_type == ExecutorDeviceType::CPU) {
4125  const int32_t scan_limit_for_query =
4126  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
4127  const int32_t max_matched = scan_limit_for_query == 0
4128  ? query_exe_context->query_mem_desc_.getEntryCount()
4129  : scan_limit_for_query;
4130  CpuCompilationContext* cpu_generated_code =
4131  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
4132  CHECK(cpu_generated_code);
4133  query_exe_context->launchCpuCode(ra_exe_unit_copy,
4134  cpu_generated_code,
4135  hoist_literals,
4136  hoist_buf,
4137  col_buffers,
4138  num_rows,
4139  frag_offsets,
4140  max_matched,
4141  &error_code,
4142  start_rowid,
4143  num_tables,
4144  join_hash_table_ptrs,
4145  rows_to_process);
4146  } else {
4147  try {
4148  GpuCompilationContext* gpu_generated_code =
4149  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
4150  CHECK(gpu_generated_code);
4151  query_exe_context->launchGpuCode(
4152  ra_exe_unit_copy,
4153  gpu_generated_code,
4154  hoist_literals,
4155  hoist_buf,
4156  col_buffers,
4157  num_rows,
4158  frag_offsets,
4159  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
4160  data_mgr,
4161  blockSize(),
4162  gridSize(),
4163  device_id,
4164  compilation_result.gpu_smem_context.getSharedMemorySize(),
4165  &error_code,
4166  num_tables,
4167  allow_runtime_interrupt,
4168  join_hash_table_ptrs,
4169  render_allocator_map_ptr,
4170  optimize_cuda_block_and_grid_sizes);
4171  } catch (const OutOfMemory&) {
4172  return ERR_OUT_OF_GPU_MEM;
4173  } catch (const OutOfRenderMemory&) {
4174  return ERR_OUT_OF_RENDER_MEM;
4175  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
4177  } catch (const std::exception& e) {
4178  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
4179  }
4180  }
4181 
4182  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
4183  error_code == Executor::ERR_DIV_BY_ZERO ||
4184  error_code == Executor::ERR_OUT_OF_TIME ||
4185  error_code == Executor::ERR_INTERRUPTED ||
4187  error_code == Executor::ERR_GEOS ||
4189  return error_code;
4190  }
4191 
4192  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
4193  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
4194  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
4195  query_exe_context->query_mem_desc_);
4196  CHECK(*results);
4197  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
4198  (*results)->holdLiterals(hoist_buf);
4199  }
4200  if (error_code < 0 && render_allocator_map_ptr) {
4201  auto const adjusted_scan_limit =
4202  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
4203  // More rows passed the filter than available slots. We don't have a count to check,
4204  // so assume we met the limit if a scan limit is set
4205  if (adjusted_scan_limit != 0) {
4206  return 0;
4207  } else {
4208  return error_code;
4209  }
4210  }
4211  if (results && error_code &&
4212  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
4213  return error_code; // unlucky, not enough results and we ran out of slots
4214  }
4215 
4216  return 0;
4217 }
4218 
4219 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
4220  const int device_id) {
4221  std::vector<int8_t*> table_ptrs;
4222  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
4223  for (auto hash_table : join_hash_tables) {
4224  if (!hash_table) {
4225  CHECK(table_ptrs.empty());
4226  return {};
4227  }
4228  table_ptrs.push_back(hash_table->getJoinHashBuffer(
4229  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
4230  }
4231  return table_ptrs;
4232 }
4233 
4234 void Executor::nukeOldState(const bool allow_lazy_fetch,
4235  const std::vector<InputTableInfo>& query_infos,
4236  const PlanState::DeletedColumnsMap& deleted_cols_map,
4237  const RelAlgExecutionUnit* ra_exe_unit) {
4240  const bool contains_left_deep_outer_join =
4241  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
4242  ra_exe_unit->join_quals.end(),
4243  [](const JoinCondition& join_condition) {
4244  return join_condition.type == JoinType::LEFT;
4245  }) != ra_exe_unit->join_quals.end();
4246  cgen_state_.reset(
4247  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
4248  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
4249  query_infos,
4250  deleted_cols_map,
4251  this));
4252 }
4253 
4254 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
4255  const std::vector<InputTableInfo>& query_infos) {
4257  const auto ld_count = input_descs.size();
4258  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
4259  for (size_t i = 0; i < ld_count; ++i) {
4260  CHECK_LT(i, query_infos.size());
4261  const auto frag_count = query_infos[i].info.fragments.size();
4262  if (i > 0) {
4263  cgen_state_->frag_offsets_.push_back(nullptr);
4264  } else {
4265  if (frag_count > 1) {
4266  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
4267  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
4268  } else {
4269  cgen_state_->frag_offsets_.push_back(nullptr);
4270  }
4271  }
4272  }
4273 }
4274 
4276  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
4277  const std::vector<InputTableInfo>& query_infos,
4278  const MemoryLevel memory_level,
4279  const JoinType join_type,
4280  const HashType preferred_hash_type,
4281  ColumnCacheMap& column_cache,
4282  const HashTableBuildDagMap& hashtable_build_dag_map,
4283  const RegisteredQueryHint& query_hint,
4284  const TableIdToNodeMap& table_id_to_node_map) {
4285  if (!g_enable_bbox_intersect_hashjoin && qual_bin_oper->is_bbox_intersect_oper()) {
4286  return {nullptr,
4287  "Bounding box intersection disabled, attempting to fall back to loop join"};
4288  }
4289  if (g_enable_dynamic_watchdog && interrupted_.load()) {
4291  }
4292  try {
4293  auto tbl = HashJoin::getInstance(qual_bin_oper,
4294  query_infos,
4295  memory_level,
4296  join_type,
4297  preferred_hash_type,
4298  deviceCountForMemoryLevel(memory_level),
4299  column_cache,
4300  this,
4301  hashtable_build_dag_map,
4302  query_hint,
4303  table_id_to_node_map);
4304  return {tbl, ""};
4305  } catch (const HashJoinFail& e) {
4306  return {nullptr, e.what()};
4307  }
4308 }
4309 
4310 int8_t Executor::warpSize() const {
4311  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
4312  CHECK(!dev_props.empty());
4313  return dev_props.front().warpSize;
4314 }
4315 
4316 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
4317 // exist?
4318 unsigned Executor::gridSize() const {
4319  CHECK(data_mgr_);
4320  const auto cuda_mgr = data_mgr_->getCudaMgr();
4321  if (!cuda_mgr) {
4322  return 0;
4323  }
4324  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
4325 }
4326 
4327 unsigned Executor::numBlocksPerMP() const {
4328  return std::max((unsigned)2,
4329  shared::ceil_div(grid_size_x_, cudaMgr()->getMinNumMPsForAllDevices()));
4330 }
4331 
4332 unsigned Executor::blockSize() const {
4333  CHECK(data_mgr_);
4334  const auto cuda_mgr = data_mgr_->getCudaMgr();
4335  if (!cuda_mgr) {
4336  return 0;
4337  }
4338  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
4339  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
4340 }
4341 
4342 void Executor::setGridSize(unsigned grid_size) {
4343  grid_size_x_ = grid_size;
4344 }
4345 
4347  grid_size_x_ = 0;
4348 }
4349 
4350 void Executor::setBlockSize(unsigned block_size) {
4351  block_size_x_ = block_size;
4352 }
4353 
4355  block_size_x_ = 0;
4356 }
4357 
4359  return max_gpu_slab_size_;
4360 }
4361 
4362 int64_t Executor::deviceCycles(int milliseconds) const {
4363  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
4364  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
4365 }
4366 
4367 llvm::Value* Executor::castToFP(llvm::Value* value,
4368  SQLTypeInfo const& from_ti,
4369  SQLTypeInfo const& to_ti) {
4371  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
4372  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
4373  llvm::Type* fp_type{nullptr};
4374  switch (to_ti.get_size()) {
4375  case 4:
4376  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
4377  break;
4378  case 8:
4379  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
4380  break;
4381  default:
4382  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
4383  }
4384  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
4385  if (from_ti.get_scale()) {
4386  value = cgen_state_->ir_builder_.CreateFDiv(
4387  value,
4388  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
4389  }
4390  }
4391  return value;
4392 }
4393 
4394 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
4396  CHECK(val->getType()->isPointerTy());
4397 
4398  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
4399  const auto val_type = val_ptr_type->getPointerElementType();
4400  size_t val_width = 0;
4401  if (val_type->isIntegerTy()) {
4402  val_width = val_type->getIntegerBitWidth();
4403  } else {
4404  if (val_type->isFloatTy()) {
4405  val_width = 32;
4406  } else {
4407  CHECK(val_type->isDoubleTy());
4408  val_width = 64;
4409  }
4410  }
4411  CHECK_LT(size_t(0), val_width);
4412  if (bitWidth == val_width) {
4413  return val;
4414  }
4415  return cgen_state_->ir_builder_.CreateBitCast(
4416  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
4417 }
4418 
4419 #define EXECUTE_INCLUDE
4420 #include "ArrayOps.cpp"
4421 #include "DateAdd.cpp"
4422 #include "GeoOps.cpp"
4423 #include "RowFunctionOps.cpp"
4424 #include "StringFunctions.cpp"
4426 #undef EXECUTE_INCLUDE
4427 
4428 namespace {
4430  const ColumnDescriptor* deleted_cd,
4431  const shared::TableKey& table_key) {
4432  auto deleted_cols_it = deleted_cols_map.find(table_key);
4433  if (deleted_cols_it == deleted_cols_map.end()) {
4434  CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
4435  } else {
4436  CHECK_EQ(deleted_cd, deleted_cols_it->second);
4437  }
4438 }
4439 } // namespace
4440 
4441 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
4442  const RelAlgExecutionUnit& ra_exe_unit,
4443  const CompilationOptions& co) {
4444  if (!co.filter_on_deleted_column) {
4445  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
4446  }
4447  auto ra_exe_unit_with_deleted = ra_exe_unit;
4448  PlanState::DeletedColumnsMap deleted_cols_map;
4449  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
4450  if (input_table.getSourceType() != InputSourceType::TABLE) {
4451  continue;
4452  }
4453  const auto& table_key = input_table.getTableKey();
4454  const auto catalog =
4456  CHECK(catalog);
4457  const auto td = catalog->getMetadataForTable(table_key.table_id);
4458  CHECK(td);
4459  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4460  if (!deleted_cd) {
4461  continue;
4462  }
4463  CHECK(deleted_cd->columnType.is_boolean());
4464  // check deleted column is not already present
4465  bool found = false;
4466  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
4467  if (input_col.get()->getColId() == deleted_cd->columnId &&
4468  input_col.get()->getScanDesc().getTableKey() == table_key &&
4469  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
4470  found = true;
4471  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
4472  break;
4473  }
4474  }
4475  if (!found) {
4476  // add deleted column
4477  ra_exe_unit_with_deleted.input_col_descs.emplace_back(
4478  new InputColDescriptor(deleted_cd->columnId,
4479  deleted_cd->tableId,
4480  table_key.db_id,
4481  input_table.getNestLevel()));
4482  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
4483  }
4484  }
4485  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
4486 }
4487 
4488 namespace {
4489 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
4490 // scaled epoch value and `false` for overflow/underflow values as the first argument of
4491 // return type.
4492 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
4493  const int64_t chunk_min,
4494  const int64_t chunk_max,
4495  const SQLTypeInfo& lhs_type,
4496  const SQLTypeInfo& rhs_type) {
4497  const int32_t ldim = lhs_type.get_dimension();
4498  const int32_t rdim = rhs_type.get_dimension();
4499  CHECK(ldim != rdim);
4500  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
4501  if (ldim > rdim) {
4502  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
4503  return {true, chunk_min / scale, chunk_max / scale};
4504  }
4505 
4506  using checked_int64_t = boost::multiprecision::number<
4507  boost::multiprecision::cpp_int_backend<64,
4508  64,
4509  boost::multiprecision::signed_magnitude,
4510  boost::multiprecision::checked,
4511  void>>;
4512 
4513  try {
4514  auto ret =
4515  std::make_tuple(true,
4516  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
4517  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
4518  return ret;
4519  } catch (const std::overflow_error& e) {
4520  // noop
4521  }
4522  return std::make_tuple(false, chunk_min, chunk_max);
4523 }
4524 
4525 } // namespace
4526 
4528  const InputDescriptor& table_desc,
4529  const Fragmenter_Namespace::FragmentInfo& fragment) {
4530  // Skip temporary tables
4531  const auto& table_key = table_desc.getTableKey();
4532  if (table_key.table_id < 0) {
4533  return false;
4534  }
4535 
4536  const auto catalog =
4538  CHECK(catalog);
4539  const auto td = catalog->getMetadataForTable(fragment.physicalTableId);
4540  CHECK(td);
4541  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4542  if (!deleted_cd) {
4543  return false;
4544  }
4545 
4546  const auto& chunk_type = deleted_cd->columnType;
4547  CHECK(chunk_type.is_boolean());
4548 
4549  const auto deleted_col_id = deleted_cd->columnId;
4550  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4551  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4552  const int64_t chunk_min =
4553  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4554  const int64_t chunk_max =
4555  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4556  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4557  // is true (signifying all rows deleted)
4558  return true;
4559  }
4560  }
4561  return false;
4562 }
4563 
4565  const Analyzer::BinOper* comp_expr,
4566  const Analyzer::ColumnVar* lhs_col,
4567  const Fragmenter_Namespace::FragmentInfo& fragment,
4568  const Analyzer::Constant* rhs_const) const {
4569  auto col_id = lhs_col->getColumnKey().column_id;
4570  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4571  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4573  }
4574  double chunk_min{0.};
4575  double chunk_max{0.};
4576  const auto& chunk_type = lhs_col->get_type_info();
4577  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4578  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4579  if (chunk_min > chunk_max) {
4581  }
4582 
4583  const auto datum_fp = rhs_const->get_constval();
4584  const auto rhs_type = rhs_const->get_type_info().get_type();
4585  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4586 
4587  // Do we need to codegen the constant like the integer path does?
4588  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4589 
4590  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4591  // slightly tricky due to do cleanly as we do not have rowid on this path
4592  switch (comp_expr->get_optype()) {
4593  case kGE:
4594  if (chunk_max < rhs_val) {
4596  }
4597  break;
4598  case kGT:
4599  if (chunk_max <= rhs_val) {
4601  }
4602  break;
4603  case kLE:
4604  if (chunk_min > rhs_val) {
4606  }
4607  break;
4608  case kLT:
4609  if (chunk_min >= rhs_val) {
4611  }
4612  break;
4613  case kEQ:
4614  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4616  }
4617  break;
4618  default:
4619  break;
4620  }
4622 }
4623 
4624 std::pair<bool, int64_t> Executor::skipFragment(
4625  const InputDescriptor& table_desc,
4626  const Fragmenter_Namespace::FragmentInfo& fragment,
4627  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4628  const std::vector<uint64_t>& frag_offsets,
4629  const size_t frag_idx) {
4630  // First check to see if all of fragment is deleted, in which case we know we can skip
4631  if (isFragmentFullyDeleted(table_desc, fragment)) {
4632  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4633  << ", fragment id: " << frag_idx;
4634  return {true, -1};
4635  }
4636 
4637  for (const auto& simple_qual : simple_quals) {
4638  const auto comp_expr =
4639  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4640  if (!comp_expr) {
4641  // is this possible?
4642  return {false, -1};
4643  }
4644  const auto lhs = comp_expr->get_left_operand();
4645  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4646  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4647  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4648  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4649  if (lhs_uexpr) {
4650  CHECK(lhs_uexpr->get_optype() ==
4651  kCAST); // We should have only been passed a cast expression
4652  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4653  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4654  continue;
4655  }
4656  } else {
4657  continue;
4658  }
4659  }
4660  const auto rhs = comp_expr->get_right_operand();
4661  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4662  if (!rhs_const) {
4663  // is this possible?
4664  return {false, -1};
4665  }
4666  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4667  !lhs->get_type_info().is_fp()) {
4668  continue;
4669  }
4670  if (lhs->get_type_info().is_fp()) {
4671  const auto fragment_skip_status =
4672  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4673  switch (fragment_skip_status) {
4675  return {true, -1};
4677  return {false, -1};
4679  continue;
4680  default:
4681  UNREACHABLE();
4682  }
4683  }
4684 
4685  // Everything below is logic for integer and integer-backed timestamps
4686  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4687 
4688  if (lhs_col->get_type_info().is_timestamp() &&
4689  rhs_const->get_type_info().is_any<kTIME>()) {
4690  // when casting from a timestamp to time
4691  // is not possible to get a valid range
4692  // so we can't skip any fragment
4693  continue;
4694  }
4695 
4696  const int col_id = lhs_col->getColumnKey().column_id;
4697  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4698  int64_t chunk_min{0};
4699  int64_t chunk_max{0};
4700  bool is_rowid{false};
4701  size_t start_rowid{0};
4702  const auto& table_key = table_desc.getTableKey();
4703  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4704  auto cd = get_column_descriptor({table_key, col_id});
4705  if (cd->isVirtualCol) {
4706  CHECK(cd->columnName == "rowid");
4707  const auto& table_generation = getTableGeneration(table_key);
4708  start_rowid = table_generation.start_rowid;
4709  chunk_min = frag_offsets[frag_idx] + start_rowid;
4710  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4711  is_rowid = true;
4712  }
4713  } else {
4714  const auto& chunk_type = lhs_col->get_type_info();
4715  chunk_min =
4716  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4717  chunk_max =
4718  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4719  }
4720  if (chunk_min > chunk_max) {
4721  // invalid metadata range, do not skip fragment
4722  return {false, -1};
4723  }
4724  if (lhs->get_type_info().is_timestamp() &&
4725  (lhs_col->get_type_info().get_dimension() !=
4726  rhs_const->get_type_info().get_dimension()) &&
4727  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4728  rhs_const->get_type_info().is_high_precision_timestamp())) {
4729  // If original timestamp lhs col has different precision,
4730  // column metadata holds value in original precision
4731  // therefore adjust rhs value to match lhs precision
4732 
4733  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4734  // artificially limit the lhs column range. RHS overflow/underflow is already
4735  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4736  bool is_valid;
4737  std::tie(is_valid, chunk_min, chunk_max) =
4739  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4740  if (!is_valid) {
4741  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4742  "value: "
4743  << std::to_string(chunk_min)
4744  << "\nChunk max value: " << std::to_string(chunk_max)
4745  << "\nLHS col precision is: "
4746  << std::to_string(lhs_col->get_type_info().get_dimension())
4747  << "\nRHS precision is: "
4748  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4749  return {false, -1};
4750  }
4751  }
4752  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4753  // It is obvious that a cast from timestamp to date is happening here,
4754  // so we have to correct the chunk min and max values to lower the precision as of
4755  // the date
4756  chunk_min = DateTruncateHighPrecisionToDate(
4757  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4758  chunk_max = DateTruncateHighPrecisionToDate(
4759  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4760  }
4761  llvm::LLVMContext local_context;
4762  CgenState local_cgen_state(local_context);
4763  CodeGenerator code_generator(&local_cgen_state, nullptr);
4764 
4765  const auto rhs_val =
4766  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4767 
4768  switch (comp_expr->get_optype()) {
4769  case kGE:
4770  if (chunk_max < rhs_val) {
4771  return {true, -1};
4772  }
4773  break;
4774  case kGT:
4775  if (chunk_max <= rhs_val) {
4776  return {true, -1};
4777  }
4778  break;
4779  case kLE:
4780  if (chunk_min > rhs_val) {
4781  return {true, -1};
4782  }
4783  break;
4784  case kLT:
4785  if (chunk_min >= rhs_val) {
4786  return {true, -1};
4787  }
4788  break;
4789  case kEQ:
4790  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4791  return {true, -1};
4792  } else if (is_rowid) {
4793  return {false, rhs_val - start_rowid};
4794  }
4795  break;
4796  default:
4797  break;
4798  }
4799  }
4800  return {false, -1};
4801 }
4802 
4803 /*
4804  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4805  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4806  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4807  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4808  * if at least one of these skipFragment calls return a true statment in its first value.
4809  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4810  * second value only if its first value is "false".
4811  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4812  * i.e., we only skip if none of the quals trigger the code to update the
4813  * rowid_lookup_key
4814  * - Only AND operations are valid and considered:
4815  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4816  * the skip
4817  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4818  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4819  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4820  * implemented differently, which cause the first one to skip correctly, but the second
4821  * one will not skip.
4822  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4823  * possible
4824  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4825  * projection, no skipping
4826  */
4827 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4828  const InputDescriptor& table_desc,
4829  const RelAlgExecutionUnit& ra_exe_unit,
4830  const Fragmenter_Namespace::FragmentInfo& fragment,
4831  const std::vector<uint64_t>& frag_offsets,
4832  const size_t frag_idx) {
4833  std::pair<bool, int64_t> skip_frag{false, -1};
4834  for (auto& inner_join : ra_exe_unit.join_quals) {
4835  if (inner_join.type != JoinType::INNER) {
4836  continue;
4837  }
4838 
4839  // extracting all the conjunctive simple_quals from the quals stored for the inner
4840  // join
4841  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4842  for (auto& qual : inner_join.quals) {
4843  auto temp_qual = qual_to_conjunctive_form(qual);
4844  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4845  temp_qual.simple_quals.begin(),
4846  temp_qual.simple_quals.end());
4847  }
4848  auto temp_skip_frag = skipFragment(
4849  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4850  if (temp_skip_frag.second != -1) {
4851  skip_frag.second = temp_skip_frag.second;
4852  return skip_frag;
4853  } else {
4854  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4855  }
4856  }
4857  return skip_frag;
4858 }
4859 
4861  const std::unordered_set<PhysicalInput>& phys_inputs) {
4862  AggregatedColRange agg_col_range_cache;
4863  std::unordered_set<shared::TableKey> phys_table_keys;
4864  for (const auto& phys_input : phys_inputs) {
4865  phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4866  }
4867  std::vector<InputTableInfo> query_infos;
4868  for (const auto& table_key : phys_table_keys) {
4869  query_infos.emplace_back(InputTableInfo{table_key, getTableInfo(table_key)});
4870  }
4871  for (const auto& phys_input : phys_inputs) {
4872  auto db_id = phys_input.db_id;
4873  auto table_id = phys_input.table_id;
4874  auto column_id = phys_input.col_id;
4875  const auto cd =
4876  Catalog_Namespace::get_metadata_for_column({db_id, table_id, column_id});
4877  CHECK(cd);
4878  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4879  const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4880  cd->columnType, shared::ColumnKey{db_id, table_id, column_id}, 0);
4881  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4882  agg_col_range_cache.setColRange(phys_input, col_range);
4883  }
4884  }
4885  return agg_col_range_cache;
4886 }
4887 
4889  const std::unordered_set<PhysicalInput>& phys_inputs) {
4890  StringDictionaryGenerations string_dictionary_generations;
4891  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4892  // the case then we need to populate them here to make sure that the generations are set
4893  // correctly.
4894  prepare_string_dictionaries(phys_inputs);
4895  for (const auto& phys_input : phys_inputs) {
4896  const auto catalog =
4898  CHECK(catalog);
4899  const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4900  CHECK(cd);
4901  const auto& col_ti =
4902  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4903  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4904  const auto& dict_key = col_ti.getStringDictKey();
4905  const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4906  CHECK(dd && dd->stringDict);
4907  string_dictionary_generations.setGeneration(dict_key,
4908  dd->stringDict->storageEntryCount());
4909  }
4910  }
4911  return string_dictionary_generations;
4912 }
4913 
4915  const std::unordered_set<shared::TableKey>& phys_table_keys) {
4916  TableGenerations table_generations;
4917  for (const auto& table_key : phys_table_keys) {
4918  const auto table_info = getTableInfo(table_key);
4919  table_generations.setGeneration(
4920  table_key,
4921  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4922  }
4923  return table_generations;
4924 }
4925 
4926 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4927  const std::unordered_set<shared::TableKey>& phys_table_ids) {
4928  row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(
4930  row_set_mem_owner_->setDictionaryGenerations(
4931  computeStringDictionaryGenerations(phys_inputs));
4933  table_generations_ = computeTableGenerations(phys_table_ids);
4934 }
4935 
4937  return recycler_mutex_;
4938 }
4939 
4941  return query_plan_dag_cache_;
4942 }
4943 
4946 }
4947 
4949  return executor_session_mutex_;
4950 }
4951 
4954  return current_query_session_;
4955 }
4956 
4958  const QuerySessionId& candidate_query_session,
4960  // if current_query_session is equal to the candidate_query_session,
4961  // or it is empty session we consider
4962  return !candidate_query_session.empty() &&
4963  (current_query_session_ == candidate_query_session);
4964 }
4965 
4966 // used only for testing
4968  const QuerySessionId& candidate_query_session,
4970  if (queries_session_map_.count(candidate_query_session) &&
4971  !queries_session_map_.at(candidate_query_session).empty()) {
4972  return queries_session_map_.at(candidate_query_session)
4973  .begin()
4974  ->second.getQueryStatus();
4975  }
4976  return QuerySessionStatus::QueryStatus::UNDEFINED;
4977 }
4978 
4982 }
4983 
4985  const QuerySessionId& query_session_id,
4986  const std::string& query_str,
4987  const std::string& query_submitted_time) {
4988  if (!query_session_id.empty()) {
4989  // if session is valid, do update 1) the exact executor id and 2) query status
4992  query_session_id, query_submitted_time, executor_id_, write_lock);
4993  updateQuerySessionStatusWithLock(query_session_id,
4994  query_submitted_time,
4995  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4996  write_lock);
4997  }
4998  return {query_session_id, query_str};
4999 }
5000 
5002  // check whether we are okay to execute the "pending" query
5003  // i.e., before running the query check if this query session is "ALREADY" interrupted
5005  if (query_session.empty()) {
5006  return;
5007  }
5008  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
5009  // something goes wrong since we assume this is caller's responsibility
5010  // (call this function only for enrolled query session)
5011  if (!queries_session_map_.count(query_session)) {
5012  VLOG(1) << "Interrupting pending query is not available since the query session is "
5013  "not enrolled";
5014  } else {
5015  // here the query session is enrolled but the interrupt flag is not registered
5016  VLOG(1)
5017  << "Interrupting pending query is not available since its interrupt flag is "
5018  "not registered";
5019  }
5020  return;
5021  }
5022  if (queries_interrupt_flag_[query_session]) {
5024  }
5025 }
5026 
5028  const std::string& submitted_time_str) {
5030  // clear the interrupt-related info for a finished query
5031  if (query_session.empty()) {
5032  return;
5033  }
5034  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
5035  if (query_session.compare(current_query_session_) == 0) {
5036  invalidateRunningQuerySession(session_write_lock);
5037  resetInterrupt();
5038  }
5039 }
5040 
5042  const QuerySessionId& query_session,
5043  const std::string& submitted_time_str,
5044  const QuerySessionStatus::QueryStatus new_query_status) {
5045  // update the running query session's the current status
5047  if (query_session.empty()) {
5048  return;
5049  }
5050  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5051  current_query_session_ = query_session;
5052  }
5054  query_session, submitted_time_str, new_query_status, session_write_lock);
5055 }
5056 
5058  const QuerySessionId& query_session,
5059  const std::string& query_str,
5060  const std::string& submitted_time_str,
5061  const size_t executor_id,
5062  const QuerySessionStatus::QueryStatus query_session_status) {
5063  // enroll the query session into the Executor's session map
5065  if (query_session.empty()) {
5066  return;
5067  }
5068 
5069  addToQuerySessionList(query_session,
5070  query_str,
5071  submitted_time_str,
5072  executor_id,
5073  query_session_status,
5074  session_write_lock);
5075 
5076  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5077  current_query_session_ = query_session;
5078  }
5079 }
5080 
5083  return queries_session_map_.size();
5084 }
5085 
5087  const QuerySessionId& query_session,
5088  const std::string& query_str,
5089  const std::string& submitted_time_str,
5090  const size_t executor_id,
5091  const QuerySessionStatus::QueryStatus query_status,
5093  // an internal API that enrolls the query session into the Executor's session map
5094  if (queries_session_map_.count(query_session)) {
5095  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
5096  queries_session_map_.at(query_session).erase(submitted_time_str);
5097  queries_session_map_.at(query_session)
5098  .emplace(submitted_time_str,
5099  QuerySessionStatus(query_session,
5100  executor_id,
5101  query_str,
5102  submitted_time_str,
5103  query_status));
5104  } else {
5105  queries_session_map_.at(query_session)
5106  .emplace(submitted_time_str,
5107  QuerySessionStatus(query_session,
5108  executor_id,
5109  query_str,
5110  submitted_time_str,
5111  query_status));
5112  }
5113  } else {
5114  std::map<std::string, QuerySessionStatus> executor_per_query_map;
5115  executor_per_query_map.emplace(
5116  submitted_time_str,
5118  query_session, executor_id, query_str, submitted_time_str, query_status));
5119  queries_session_map_.emplace(query_session, executor_per_query_map);
5120  }
5121  return queries_interrupt_flag_.emplace(query_session, false).second;
5122 }
5123 
5125  const QuerySessionId& query_session,
5126  const std::string& submitted_time_str,
5127  const QuerySessionStatus::QueryStatus updated_query_status,
5129  // an internal API that updates query session status
5130  if (query_session.empty()) {
5131  return false;
5132  }
5133  if (queries_session_map_.count(query_session)) {
5134  for (auto& query_status : queries_session_map_.at(query_session)) {
5135  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
5136  // no time difference --> found the target query status
5137  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5138  auto prev_status = query_status.second.getQueryStatus();
5139  if (prev_status == updated_query_status) {
5140  return false;
5141  }
5142  query_status.second.setQueryStatus(updated_query_status);
5143  return true;
5144  }
5145  }
5146  }
5147  return false;
5148 }
5149 
5151  const QuerySessionId& query_session,
5152  const std::string& submitted_time_str,
5153  const size_t executor_id,
5155  // update the executor id of the query session
5156  if (query_session.empty()) {
5157  return false;
5158  }
5159  if (queries_session_map_.count(query_session)) {
5160  auto storage = queries_session_map_.at(query_session);
5161  for (auto it = storage.begin(); it != storage.end(); it++) {
5162  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5163  // no time difference --> found the target query status
5164  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5165  queries_session_map_.at(query_session)
5166  .at(submitted_time_str)
5167  .setExecutorId(executor_id);
5168  return true;
5169  }
5170  }
5171  }
5172  return false;
5173 }
5174 
5176  const QuerySessionId& query_session,
5177  const std::string& submitted_time_str,
5179  if (query_session.empty()) {
5180  return false;
5181  }
5182  if (queries_session_map_.count(query_session)) {
5183  auto& storage = queries_session_map_.at(query_session);
5184  if (storage.size() > 1) {
5185  // in this case we only remove query executor info
5186  for (auto it = storage.begin(); it != storage.end(); it++) {
5187  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5188  // no time difference && have the same executor id--> found the target query
5189  if (it->second.getExecutorId() == executor_id_ &&
5190  submitted_time_str.compare(target_submitted_t_str) == 0) {
5191  storage.erase(it);
5192  return true;
5193  }
5194  }
5195  } else if (storage.size() == 1) {
5196  // here this session only has a single query executor
5197  // so we clear both executor info and its interrupt flag
5198  queries_session_map_.erase(query_session);
5199  queries_interrupt_flag_.erase(query_session);
5200  if (interrupted_.load()) {
5201  interrupted_.store(false);
5202  }
5203  return true;
5204  }
5205  }
5206  return false;
5207 }
5208 
5210  const QuerySessionId& query_session,
5212  if (query_session.empty()) {
5213  return;
5214  }
5215  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
5216  queries_interrupt_flag_[query_session] = true;
5217  }
5218 }
5219 
5221  const QuerySessionId& query_session,
5223  if (query_session.empty()) {
5224  return false;
5225  }
5226  auto flag_it = queries_interrupt_flag_.find(query_session);
5227  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
5228  flag_it->second;
5229 }
5230 
5232  const QuerySessionId& query_session,
5234  if (query_session.empty()) {
5235  return false;
5236  }
5237  return !query_session.empty() && queries_session_map_.count(query_session);
5238 }
5239 
5241  const double runtime_query_check_freq,
5242  const unsigned pending_query_check_freq) const {
5243  // The only one scenario that we intentionally call this function is
5244  // to allow runtime query interrupt in QueryRunner for test cases.
5245  // Because test machine's default setting does not allow runtime query interrupt,
5246  // so we have to turn it on within test code if necessary.
5248  g_pending_query_interrupt_freq = pending_query_check_freq;
5249  g_running_query_interrupt_freq = runtime_query_check_freq;
5252  }
5253 }
5254 
5256  const size_t cache_value) {
5259  cardinality_cache_[cache_key] = cache_value;
5260  VLOG(1) << "Put estimated cardinality to the cache";
5261  }
5262 }
5263 
5265  const CardinalityCacheKey& cache_key) {
5268  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
5269  VLOG(1) << "Reuse cached cardinality";
5270  return {true, cardinality_cache_[cache_key]};
5271  }
5272  return {false, -1};
5273 }
5274 
5278  cardinality_cache_.clear();
5279  }
5280 }
5281 
5285  for (auto it = cardinality_cache_.begin(); it != cardinality_cache_.end();) {
5286  if (it->first.containsTableKey(table_key)) {
5287  it = cardinality_cache_.erase(it);
5288  } else {
5289  it++;
5290  }
5291  }
5292  }
5293 }
5294 
5295 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
5296  const QuerySessionId& query_session,
5298  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
5299  auto& query_infos = queries_session_map_.at(query_session);
5300  std::vector<QuerySessionStatus> ret;
5301  for (auto& info : query_infos) {
5302  ret.emplace_back(query_session,
5303  info.second.getExecutorId(),
5304  info.second.getQueryStr(),
5305  info.second.getQuerySubmittedTime(),
5306  info.second.getQueryStatus());
5307  }
5308  return ret;
5309  }
5310  return {};
5311 }
5312 
5313 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
5314  const QuerySessionId& interrupt_session) const {
5315  std::vector<size_t> res;
5317  auto it = queries_session_map_.find(interrupt_session);
5318  if (it != queries_session_map_.end()) {
5319  for (auto& kv : it->second) {
5320  if (kv.second.getQueryStatus() ==
5321  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5322  res.push_back(kv.second.getExecutorId());
5323  }
5324  }
5325  }
5326  return res;
5327 }
5328 
5330  // this function should be called within an executor which is assigned
5331  // to the specific query thread (that indicates we already enroll the session)
5332  // check whether this is called from non unitary executor
5334  return false;
5335  };
5337  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
5338  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
5339  flag_it->second;
5340 }
5341 
5343  // this function is called under the recycler lock
5344  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
5345  latest_query_plan_extracted_ = query_plan_dag;
5346 }
5347 
5351 }
5352 
5354  const size_t num_cpu_slots,
5355  const size_t num_gpu_slots,
5356  const size_t cpu_result_mem,
5357  const size_t cpu_buffer_pool_mem,
5358  const size_t gpu_buffer_pool_mem,
5359  const double per_query_max_cpu_slots_ratio,
5360  const double per_query_max_cpu_result_mem_ratio,
5361  const bool allow_cpu_kernel_concurrency,
5362  const bool allow_cpu_gpu_kernel_concurrency,
5363  const bool allow_cpu_slot_oversubscription_concurrency,
5364  const bool allow_cpu_result_mem_oversubscription_concurrency,
5365  const double max_available_resource_use_ratio) {
5366  const double per_query_max_pinned_cpu_buffer_pool_mem_ratio{1.0};
5367  const double per_query_max_pageable_cpu_buffer_pool_mem_ratio{0.5};
5369  num_cpu_slots,
5370  num_gpu_slots,
5371  cpu_result_mem,
5372  cpu_buffer_pool_mem,
5373  gpu_buffer_pool_mem,
5374  per_query_max_cpu_slots_ratio,
5375  per_query_max_cpu_result_mem_ratio,
5376  per_query_max_pinned_cpu_buffer_pool_mem_ratio,
5377  per_query_max_pageable_cpu_buffer_pool_mem_ratio,
5378  allow_cpu_kernel_concurrency,
5379  allow_cpu_gpu_kernel_concurrency,
5380  allow_cpu_slot_oversubscription_concurrency,
5381  true, // allow_gpu_slot_oversubscription
5382  allow_cpu_result_mem_oversubscription_concurrency,
5383  max_available_resource_use_ratio);
5384 }
5385 
5388  throw std::runtime_error(
5389  "Executor queue cannot be paused as it requires Executor Resource Manager to be "
5390  "enabled");
5391  }
5392  executor_resource_mgr_->pause_process_queue();
5393 }
5394 
5397  throw std::runtime_error(
5398  "Executor queue cannot be resumed as it requires Executor Resource Manager to be "
5399  "enabled");
5400  }
5401  executor_resource_mgr_->resume_process_queue();
5402 }
5403 
5405  const ExecutorResourceMgr_Namespace::ResourceType resource_type) {
5407  throw std::runtime_error(
5408  "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5409  }
5410  return executor_resource_mgr_->get_resource_info(resource_type).second;
5411 }
5412 
5416  throw std::runtime_error(
5417  "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5418  }
5419  return executor_resource_mgr_->get_resource_info();
5420 }
5421 
5423  const ExecutorResourceMgr_Namespace::ResourceType resource_type,
5424  const size_t resource_quantity) {
5426  throw std::runtime_error(
5427  "ExecutorResourceMgr must be enabled to set executor resource pool resource.");
5428  }
5429  executor_resource_mgr_->set_resource(resource_type, resource_quantity);
5430 }
5431 
5434  const ExecutorResourceMgr_Namespace::ResourceType resource_type) {
5436  throw std::runtime_error(
5437  "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5438  "policy.");
5439  }
5440  return executor_resource_mgr_->get_concurrent_resource_grant_policy(resource_type);
5441 }
5442 
5445  concurrent_resource_grant_policy) {
5447  throw std::runtime_error(
5448  "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5449  "policy.");
5450  }
5451  executor_resource_mgr_->set_concurrent_resource_grant_policy(
5452  concurrent_resource_grant_policy);
5453 }
5454 
5455 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
5456 
5457 // contain the interrupt flag's status per query session
5459 // contain a list of queries per query session
5461 // session lock
5463 
5466 
5470 
5472 std::mutex Executor::kernel_mutex_;
5473 
5474 std::shared_ptr<ExecutorResourceMgr_Namespace::ExecutorResourceMgr>
5476 
5479 std::unordered_map<CardinalityCacheKey, size_t> Executor::cardinality_cache_;
5480 // Executor has a single global result set recycler holder
5481 // which contains two recyclers related to query resultset
5484 
5485 // Useful for debugging.
5486 std::string Executor::dumpCache() const {
5487  std::stringstream ss;
5488  ss << "colRangeCache: ";
5489  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
5490  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
5491  << "} = " << exp_range.toString() << ", ";
5492  }
5493  ss << "stringDictGenerations: ";
5494  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
5495  ss << key << " = " << val << ", ";
5496  }
5497  ss << "tableGenerations: ";
5498  for (auto& [key, val] : table_generations_.asMap()) {
5499  ss << key << " = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
5500  }
5501  ss << "\n";
5502  return ss.str();
5503 }
void logSystemGPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
Definition: Execute.cpp:751
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:235
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:2336
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
A container to store requested and minimum neccessary resource requests across all resource types cur...
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4860
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:5240
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
Definition: Execute.cpp:5348
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2874
int32_t getNestLevel() const
size_t getBufferSizeBytes(const RelAlgExecutionUnit &ra_exe_unit, const unsigned thread_count, const ExecutorDeviceType device_type) const
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
std::vector< int > ChunkKey
Definition: types.h:36
double g_running_query_interrupt_freq
Definition: Execute.cpp:137
ExtModuleKinds
Definition: Execute.h:518
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::string get_cuda_libdevice_dir(void)
Definition: CudaMgr.cpp:612
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
Definition: Execute.h:1585
static QuerySessionMap queries_session_map_
Definition: Execute.h:1580
CudaMgr_Namespace::CudaMgr * cudaMgr() const
Definition: Execute.h:865
void log_system_memory_info_impl(std::string const &mem_log, size_t executor_id, size_t log_time_ms, std::string const &log_tag, size_t const thread_idx)
Definition: Execute.cpp:727
unsigned ceil_div(unsigned const dividend, unsigned const divisor)
Definition: misc.h:329
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:5220
int64_t kernel_queue_time_ms_
Definition: Execute.h:1562
JoinType
Definition: sqldefs.h:174
size_t maxGpuSlabSize() const
Definition: Execute.cpp:4358
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool useCudaBuffers() const
Definition: RenderInfo.cpp:54
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::DataMgr * data_mgr_
Definition: Execute.h:1558
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< shared::TableKey, const TableFragments * > &selected_tables_fragments, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:3204
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
Definition: Execute.h:1563
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:86
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
Definition: Execute.cpp:1510
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
Definition: Execute.cpp:294
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:5001
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
Definition: Execute.cpp:617
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
Definition: sqltypes.h:76
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:4219
std::unordered_map< shared::TableKey, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
bool g_allow_memory_status_log
Definition: Execute.cpp:123
void setEntryCount(const size_t val)
input_table_info_cache_(this)
Definition: Execute.cpp:289
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
Definition: Execute.cpp:3595
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1822
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:703
std::atomic< bool > interrupted_
Definition: Execute.h:1543
void setGeneration(const shared::StringDictKey &dict_key, const uint64_t generation)
std::map< shared::TableKey, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:3316
static ResultSetRecyclerHolder resultset_recycler_holder_
Definition: Execute.h:1608
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:4492
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:648
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
double extract_max_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static const int max_gpu_count
Definition: Execute.h:1535
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:3790
static ExecutorResourceMgr_Namespace::ResourcePoolInfo get_executor_resource_pool_info()
Definition: Execute.cpp:5414
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:136
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:155
#define LOG(tag)
Definition: Logger.h:285
void freeLinearizedBuf()
std::string QueryPlanDAG
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:872
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
SystemMemoryUsage getSystemMemoryUsage() const
Definition: DataMgr.cpp:123
bool is_fp() const
Definition: sqltypes.h:571
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:87
size_t getDeviceBasedWatchdogScanLimit(size_t watchdog_max_projected_rows_per_device, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1812
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:217
Definition: sqldefs.h:34
static std::shared_ptr< ExecutorResourceMgr_Namespace::ExecutorResourceMgr > executor_resource_mgr_
Definition: Execute.h:1645
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:992
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:5041
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:465
Definition: sqldefs.h:35
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1727
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:4441
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t start_rowid, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:981
#define UNREACHABLE()
Definition: Logger.h:338
size_t g_preflight_count_query_threshold
Definition: Execute.cpp:84
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
void launchKernelsImpl(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type, const size_t requested_num_threads)
Definition: Execute.cpp:3008
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
Definition: Execute.cpp:223
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1593
ExpressionRange getColRange(const PhysicalInput &) const
debug_dir_(debug_dir)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
#define CHECK_GE(x, y)
Definition: Logger.h:306
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
ResourceType
Stores the resource type for a ExecutorResourcePool request.
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3610
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type) const
Definition: Execute.cpp:2797
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:1312
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:48
Definition: sqldefs.h:29
Functions to support geospatial operations used by the executor.
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:602
QuerySessionId current_query_session_
Definition: Execute.h:1576
ResultSetRecyclerHolder & getResultSetRecyclerHolder()
Definition: Execute.cpp:4944
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
heavyai::shared_mutex & getSessionLock()
Definition: Execute.cpp:4948
static const int32_t ERR_GEOS
Definition: Execute.h:1629
ExecutorResourceMgr_Namespace::ChunkRequestInfo getChunkRequestInfo(const ExecutorDeviceType device_type, const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos, const std::vector< std::pair< int32_t, FragmentsList >> &device_fragment_lists) const
Determines a unique list of chunks and their associated byte sizes for a given query plan...
Definition: Execute.cpp:852
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1572
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1541
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1502
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:2586
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:5057
static void init_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_cpu_result_mem_oversubscription, const double max_available_resource_use_ratio)
Definition: Execute.cpp:5353
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:106
Specifies the policies for resource grants in the presence of other requests, both under situations o...
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3802
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1540
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
Definition: Execute.cpp:4564
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:1297
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:673
SortAlgorithm algorithm
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:4394
void reset(bool discard_runtime_modules_only=false)
Definition: Execute.cpp:323
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1641
unsigned numBlocksPerMP() const
Definition: Execute.cpp:4327
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key, const bool with_generation) const
Definition: Execute.h:578
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:574
#define CHECK_GT(x, y)
Definition: Logger.h:305
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:681
void resetGridSize()
Definition: Execute.cpp:4346
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4957
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:2494
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2776
std::vector< FragmentsPerTable > FragmentsList
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
bool is_time() const
Definition: sqltypes.h:577
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:3384
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
ExecutorDeviceType
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:418
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:5124
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2074
static void clearCardinalityCache()
Definition: Execute.cpp:5275
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:5329
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:531
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:94
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:5175
std::vector< Analyzer::Expr * > target_exprs_union
static void resume_executor_queue()
Definition: Execute.cpp:5395
constexpr double a
Definition: Utm.h:32
CardinalityCacheKey(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1935
bool g_enable_string_functions
std::unordered_map< shared::TableKey, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
Definition: Execute.cpp:2849
Definition: sqldefs.h:75
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:82
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:509
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:4254
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:3335
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1753
tuple rows
Definition: report.py:114
bool g_enable_executor_resource_mgr
Definition: Execute.cpp:174
SQLOps get_optype() const
Definition: Analyzer.h:452
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1627
const ExecutorId executor_id_
Definition: Execute.h:1476
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:234
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:5150
int8_t warpSize() const
Definition: Execute.cpp:4310
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:88
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:665
const size_t max_gpu_slab_size_
Definition: Execute.h:1554
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
Definition: Execute.cpp:1531
bool g_is_test_env
Definition: Execute.cpp:149
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1699
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:2682
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:691
int8_t groupColWidth(const size_t key_idx) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1615
void setGeneration(const shared::TableKey &table_key, const TableGeneration &generation)
std::vector< std::pair< std::vector< size_t >, size_t > > per_device_cardinality
static SysCatalog & instance()
Definition: SysCatalog.h:343
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
static void set_executor_resource_pool_resource(const ExecutorResourceMgr_Namespace::ResourceType resource_type, const size_t resource_quantity)
Definition: Execute.cpp:5422
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
Classes representing a parse tree.
void logSystemCPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
Definition: Execute.cpp:740
int getDeviceCount() const
Definition: CudaMgr.h:90
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1741
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:4362
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
std::mutex str_dict_mutex_
Definition: Execute.h:1545
bool is_integer() const
Definition: sqltypes.h:565
#define INJECT_TIMER(DESC)
Definition: measure.h:96
double extract_min_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static size_t addAligned(const size_t off_in, const size_t alignment)
Definition: CgenState.h:449
#define CHECK_NE(x, y)
Definition: Logger.h:302
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
Fragmenter_Namespace::TableInfo getTableInfo(const shared::TableKey &table_key) const
Definition: Execute.cpp:711
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1619
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1564
std::vector< std::string > expr_container_to_string(const T &expr_container)
Definition: Execute.cpp:1898
static void set_concurrent_resource_grant_policy(const ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Definition: Execute.cpp:5443
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_file(const std::string &udf_ir_filename, llvm::LLVMContext &ctx, bool is_gpu=false)
bool threadsCanReuseGroupByBuffers() const
void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id)
Definition: Execute.cpp:233
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:1533
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
static QueryPlanDAG latest_query_plan_extracted_
Definition: Execute.h:1612
void addToCardinalityCache(const CardinalityCacheKey &cache_key, const size_t cache_value)
Definition: Execute.cpp:5255
friend class QueryMemoryDescriptor
Definition: Execute.h:1658
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const shared::TableKey &outer_table_key, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
Definition: Execute.cpp:4028
std::optional< size_t > limit
int64_t extract_min_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
size_t getNumCurentSessionsEnrolled() const
Definition: Execute.cpp:5081
TableIdToNodeMap table_id_to_node_map
int getColId() const
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
Definition: Execute.cpp:2571
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1621
std::list< Analyzer::OrderEntry > order_entries
CachedCardinality getCachedCardinality(const CardinalityCacheKey &cache_key)
Definition: Execute.cpp:5264
size_t g_watchdog_max_projected_rows_per_device
Definition: Execute.cpp:83
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1578
executor_(executor)
std::unique_lock< T > unique_lock
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1532
RUNTIME_EXPORT void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1622
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd, const shared::TableKey &table_key)
Definition: Execute.cpp:4429
static const ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ExecutorResourceMgr_Namespace::ResourceType resource_type)
Definition: Execute.cpp:5433
bool g_bigint_count
Definition: sqldefs.h:77
Checked json field retrieval.
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:2416
bool g_enable_watchdog
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
Definition: Execute.cpp:4367
size_t get_selected_input_descs_index(const shared::TableKey &table_key, std::vector< InputDescriptor > const &input_descs)
Definition: Execute.cpp:3562
std::pair< bool, size_t > CachedCardinality
Definition: Execute.h:1403
size_t watchdog_max_projected_rows_per_device
Definition: QueryHint.h:336
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< shared::TableKey > &phys_table_keys)
Definition: Execute.cpp:4926
static void invalidateCardinalityCacheForTable(const shared::TableKey &table_key)
Definition: Execute.cpp:5282
bool is_dict_encoded_type() const
Definition: sqltypes.h:653
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:102
size_t g_approx_quantile_buffer
Definition: Execute.cpp:167
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
Definition: Execute.cpp:686
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
bool containsTableKey(const shared::TableKey &table_key) const
Definition: Execute.cpp:1995
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:5231
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:580
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1628
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
Definition: Execute.h:1581
unsigned grid_size_x_
Definition: Execute.h:1553
QuerySessionStatus::QueryStatus getQuerySessionStatus(const QuerySessionId &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4967
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t get_selected_input_col_descs_index(const shared::TableKey &table_key, std::list< std::shared_ptr< InputColDescriptor const >> const &input_col_descs)
Definition: Execute.cpp:3571
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1616
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static const size_t auto_num_threads
Definition: Execute.h:1536
const TemporaryTables * getTemporaryTables()
Definition: Execute.h:573
int32_t getOrAddTransient(const std::string &)
RUNTIME_EXPORT void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:2050
size_t hash() const
Definition: Execute.cpp:1991
std::string key
Definition: Execute.h:402
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
size_t get_col_byte_width(const shared::ColumnKey &column_key)
Definition: Execute.cpp:766
QueryDescriptionType getQueryDescriptionType() const
void freeTemporaryCpuLinearizedIdxBuf()
std::shared_ptr< CompilationContext > generated_code
const shared::TableKey & getTableKey() const
QueryMemoryDescriptor query_mem_desc_
ExecutorDeviceType device_type
void launchKernelsLocked(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:3091
virtual ReductionCode codegen() const
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:272
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
static std::unordered_map< CardinalityCacheKey, size_t > cardinality_cache_
Definition: Execute.h:1607
std::string dumpCache() const
Definition: Execute.cpp:5486
Definition: sqldefs.h:33
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
Definition: Execute.cpp:1920
InputSourceType getSourceType() const
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
const std::vector< size_t > getExecutorIdsRunningQuery(const QuerySessionId &interrupt_session) const
Definition: Execute.cpp:5313
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK_LT(x, y)
Definition: Logger.h:303
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1538
void registerExtractedQueryPlanDag(const QueryPlanDAG &query_plan_dag)
Definition: Execute.cpp:5342
#define REGULAR_DICT(TRANSIENTID)
Definition: sqltypes.h:323
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
Definition: ConstantIR.cpp:89
ReductionCode get_reduction_code(const size_t executor_id, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
Definition: Execute.cpp:1622
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Definition: Execute.cpp:4275
QuerySessionId & getCurrentQuerySession(heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4952
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3728
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
const shared::ColumnKey & getColumnKey() const
Definition: Analyzer.h:198
size_t ExecutorId
Definition: Execute.h:422
#define CHECK_LE(x, y)
Definition: Logger.h:304
void setGridSize(unsigned grid_size)
Definition: Execute.cpp:4342
static heavyai::shared_mutex recycler_mutex_
Definition: Execute.h:1605
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:1035
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:1571
size_t getNumBytesForFetchedRow(const std::set< shared::TableKey > &table_keys_to_fetch) const
bool g_enable_bbox_intersect_hashjoin
Definition: Execute.cpp:105
void setBlockSize(unsigned block_size)
Definition: Execute.cpp:4350
Speculative top N algorithm.
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::chrono::steady_clock::time_point lock_queue_clock_
Definition: Execute.h:1496
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:383
Datum get_constval() const
Definition: Analyzer.h:348
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:4624
std::unordered_map< size_t, SQLTypeInfo > target_exprs_original_type_infos
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const TableGeneration & getGeneration(const shared::TableKey &table_key) const
Definition: sqldefs.h:78
unsigned gridSize() const
Definition: Execute.cpp:4318
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
Fragmenter_Namespace::TableInfo getTableInfo(const shared::TableKey &table_key)
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
TableGenerations computeTableGenerations(const std::unordered_set< shared::TableKey > &phys_table_keys)
Definition: Execute.cpp:4914
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:33
static std::map< ExtModuleKinds, std::string > extension_module_sources
Definition: Execute.h:528
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2726
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4888
unsigned block_size_x_
Definition: Execute.h:1552
static const int32_t ERR_WIDTH_BUCKET_INVALID_ARGUMENT
Definition: Execute.h:1630
Data_Namespace::DataMgr * getDataMgr() const
Definition: Execute.h:623
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:3403
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:766
bool g_cache_string_hash
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel) const
Definition: DataMgr.cpp:380
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:4234
static heavyai::shared_mutex executor_session_mutex_
Definition: Execute.h:1574
RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t DateTruncateHighPrecisionToDate(const int64_t timeval, const int64_t scale)
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:4021
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
Definition: Execute.cpp:4827
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3742
TableGenerations table_generations_
Definition: Execute.h:1573
data_mgr_(data_mgr)
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2641
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3525
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:890
void resetInterrupt()
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
Definition: Execute.cpp:135
const std::vector< InputTableInfo > & getQueryInfos() const
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:3773
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
RUNTIME_EXPORT void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
const std::unordered_map< PhysicalInput, ExpressionRange > & asMap() const
std::string QuerySessionId
Definition: Execute.h:86
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1639
constexpr unsigned N
Definition: Utm.h:110
const std::unordered_map< shared::TableKey, TableGeneration > & asMap() const
def error_code
Definition: report.py:244
RegisteredQueryHint query_hint
void set_notnull(bool n)
Definition: sqltypes.h:475
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > getUniqueThreadSharedResultSets(const std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device) const
Definition: Execute.cpp:1599
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:291
QueryPlanDagCache & getQueryPlanDagCache()
Definition: Execute.cpp:4940
#define DEBUG_TIMER(name)
Definition: Logger.h:412
RUNTIME_EXPORT void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::map< shared::ColumnKey, size_t > getColumnByteWidthMap(const std::set< shared::TableKey > &table_ids_to_fetch, const bool include_lazy_fetched_cols) const
Definition: Execute.cpp:794
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
Definition: DateTimeUtils.h:51
bool gpusPresent() const
Definition: DataMgr.h:228
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:1617
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1015
void resetBlockSize()
Definition: Execute.cpp:4354
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_enable_cpu_sub_tasks
Definition: Execute.cpp:85
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
CgenStateManager(Executor &executor)
Definition: Execute.cpp:434
bool g_cluster
std::mutex compilation_mutex_
Definition: Execute.h:1635
RUNTIME_EXPORT ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
heavyai::shared_mutex & getDataRecyclerLock()
Definition: Execute.cpp:4936
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
static void pause_executor_queue()
Definition: Execute.cpp:5386
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc)
Definition: Execute.cpp:3308
Definition: sqldefs.h:32
size_t g_approx_quantile_centroids
Definition: Execute.cpp:168
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
static std::shared_ptr< QueryEngine > getInstance()
Definition: QueryEngine.h:89
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::shared_ptr< CompilationContext > compile(const TableFunctionExecutionUnit &exe_unit, bool emit_only_preflight_fn)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:455
static bool typeSupportsRange(const SQLTypeInfo &ti)
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:2546
void invalidateRunningQuerySession(heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4979
std::shared_ptr< const query_state::QueryState > query_state
void setNumAllocatedThreads(size_t num_threads)
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
ExpressionRange getColRange(const PhysicalInput &) const
Definition: Execute.cpp:721
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
Definition: Execute.cpp:4984
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
Definition: Execute.cpp:3246
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
void launchKernelsViaResourceMgr(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type, const std::vector< InputDescriptor > &input_descs, const QueryMemoryDescriptor &query_mem_desc)
Launches a vector of kernels for a given query step, gated/scheduled by ExecutorResourceMgr.
Definition: Execute.cpp:3103
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
debug_file_(debug_file)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool is_string() const
Definition: sqltypes.h:559
const TableGeneration & getTableGeneration(const shared::TableKey &table_key) const
Definition: Execute.cpp:716
const std::vector< DeviceProperties > & getAllDeviceProperties() const
Definition: CudaMgr.h:134
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
RUNTIME_EXPORT void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
unsigned blockSize() const
Definition: Execute.cpp:4332
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
constexpr double n
Definition: Utm.h:38
std::shared_timed_mutex shared_mutex
static std::mutex register_runtime_extension_functions_mutex_
Definition: Execute.h:1640
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
ExecutorId getExecutorId() const
Definition: Execute.h:1332
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
Definition: Execute.h:1468
static heavyai::shared_mutex executors_cache_mutex_
Definition: Execute.h:1602
Definition: sqldefs.h:76
bool operator==(const CardinalityCacheKey &other) const
Definition: Execute.cpp:1987
int cpu_threads()
Definition: thread_count.h:25
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str)
Definition: Execute.cpp:5027
RUNTIME_EXPORT void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
bool is_decimal() const
Definition: sqltypes.h:568
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1084
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:5209
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:1305
static size_t get_executor_resource_pool_total_resource_quantity(const ExecutorResourceMgr_Namespace::ResourceType resource_type)
Definition: Execute.cpp:5404
temporary_tables_(nullptr)
std::list< std::shared_ptr< const InputColDescriptor > > get_selected_input_col_descs(const shared::TableKey &table_key, std::list< std::shared_ptr< InputColDescriptor const >> const &input_col_descs)
Definition: Execute.cpp:3582
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:2140
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:5295
Descriptor for the fragments required for an execution kernel.
Definition: sqldefs.h:74
size_t getColOffInBytes(const size_t col_idx) const
static size_t getArenaBlockSize()
Definition: Execute.cpp:558
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2863
const StringDictionaryProxy::TranslationMap< Datum > * getStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:636
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:285
int32_t getIdOfString(const std::string &str) const
HashType
Definition: HashTable.h:19
const InputDescriptor & getScanDesc() const
#define IS_GEO(T)
Definition: sqltypes.h:310
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:133
Definition: sqldefs.h:83
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:880
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:5086
bool is_array() const
Definition: sqltypes.h:583
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
static QueryPlanDagCache query_plan_dag_cache_
Definition: Execute.h:1604
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
static constexpr int32_t literalsDictId
Definition: DictRef.h:18
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
Definition: Execute.h:1539
std::unique_ptr< llvm::Module > read_llvm_module_from_bc_file(const std::string &udf_ir_filename, llvm::LLVMContext &ctx)
std::string get_table_name(int32_t db_id, int32_t table_id)
void clearMetaInfoCache()
Definition: Execute.cpp:1029
FragmentSkipStatus
Definition: Execute.h:164
const TemporaryTables * temporary_tables_
Definition: Execute.h:1559
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map, bool optimize_cuda_block_and_grid_sizes)
HashTableBuildDagMap hash_table_build_plan_dag
void update_extension_modules(bool update_runtime_modules_only=false)
Definition: Execute.cpp:346
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3426
StringDictionaryProxy * getOrAddStringDictProxy(const shared::StringDictKey &dict_key, const bool with_generation)
Definition: Execute.cpp:572
size_t get_loop_join_size(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1880
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:2490
bool isFragmentFullyDeleted(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:4527
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const