OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <iostream>
30 #include <memory>
31 #include <mutex>
32 #include <numeric>
33 #include <set>
34 #include <thread>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
41 #include "Parser/ParserNode.h"
72 #include "Shared/checked_alloc.h"
73 #include "Shared/measure.h"
74 #include "Shared/misc.h"
75 #include "Shared/scope.h"
76 #include "Shared/shard_key.h"
77 #include "Shared/threading.h"
78 
79 bool g_enable_watchdog{false};
83 size_t g_cpu_sub_task_size{500'000};
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118  256}; // minimum memory allocation required for projection query output buffer
119  // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131  4096}; // GPU shared memory threshold (in bytes), if larger
132  // buffer sizes are required we do not use GPU shared
133  // memory optimizations Setting this to 0 means unlimited
134  // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136  true}; // enable use of shared memory when performing group-by with select non-count
137  // aggregates
138 bool g_enable_smem_non_grouped_agg{
139  true}; // enable optimizations for using GPU shared memory in implementation of
140  // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142  // limits the allocation for the output buffer arena
143  // and data recycler test
144 size_t g_enable_parallel_linearization{
145  10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
157 
158 size_t g_approx_quantile_buffer{1000};
159 size_t g_approx_quantile_centroids{300};
160 
161 bool g_enable_automatic_ir_metadata{true};
162 
163 size_t g_max_log_length{500};
164 
165 extern bool g_cache_string_hash;
166 
167 int const Executor::max_gpu_count;
168 
169 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
170 
171 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
172 
173 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
174  const std::string& udf_ir_filename,
175  llvm::LLVMContext& ctx);
176 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
177  const std::string& udf_ir_filename,
178  llvm::LLVMContext& ctx,
179  bool is_gpu = false);
180 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
181  const std::string& udf_ir_string,
182  llvm::LLVMContext& ctx,
183  bool is_gpu = false);
184 
185 namespace {
186 // This function is notably different from that in RelAlgExecutor because it already
187 // expects SPI values and therefore needs to avoid that transformation.
188 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs,
189  const Catalog_Namespace::Catalog& catalog) {
190  for (const auto [col_id, table_id] : phys_inputs) {
191  foreign_storage::populate_string_dictionary(table_id, col_id, catalog);
192  }
193 }
194 
195 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
196  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
197  // The fragmenter always returns at least one fragment, even when the table is empty.
198  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
199 }
200 } // namespace
201 
202 namespace foreign_storage {
203 // Foreign tables skip the population of dictionaries during metadata scan. This function
204 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
205 void populate_string_dictionary(const int32_t table_id,
206  const int32_t col_id,
207  const Catalog_Namespace::Catalog& catalog) {
208  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
209  catalog.getMetadataForTable(table_id, false))) {
210  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
211  if (col_desc->columnType.is_dict_encoded_type()) {
212  auto& fragmenter = foreign_table->fragmenter;
213  CHECK(fragmenter != nullptr);
214  if (is_empty_table(fragmenter.get())) {
215  return;
216  }
217  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
218  ChunkKey chunk_key = {catalog.getDatabaseId(), table_id, col_id, frag.fragmentId};
219  // If the key is sharded across leaves, only populate fragments that are sharded
220  // to this leaf.
221  if (key_does_not_shard_to_leaf(chunk_key)) {
222  continue;
223  }
224 
225  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
226  CHECK(metadata_map.find(col_id) != metadata_map.end());
227  if (auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
228  // When this goes out of scope it will stay in CPU cache but become
229  // evictable
230  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
231  &(catalog.getDataMgr()),
232  chunk_key,
234  0,
235  0,
236  0);
237  }
238  }
239  }
240  }
241 }
242 } // namespace foreign_storage
243 
244 Executor::Executor(const ExecutorId executor_id,
245  Data_Namespace::DataMgr* data_mgr,
246  const size_t block_size_x,
247  const size_t grid_size_x,
248  const size_t max_gpu_slab_size,
249  const std::string& debug_dir,
250  const std::string& debug_file)
251  : executor_id_(executor_id)
252  , context_(new llvm::LLVMContext())
253  , cgen_state_(new CgenState({}, false, this))
254  , block_size_x_(block_size_x)
255  , grid_size_x_(grid_size_x)
256  , max_gpu_slab_size_(max_gpu_slab_size)
257  , debug_dir_(debug_dir)
258  , debug_file_(debug_file)
259  , catalog_(nullptr)
260  , data_mgr_(data_mgr)
261  , temporary_tables_(nullptr)
265  update_extension_modules();
266 }
267 
272  auto root_path = heavyai::get_root_abs_path();
273  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
274  CHECK(boost::filesystem::exists(template_path));
276  template_path;
277 #ifdef ENABLE_GEOS
278  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
279  CHECK(boost::filesystem::exists(rt_geos_path));
281  rt_geos_path;
282 #endif
283 #ifdef HAVE_CUDA
284  auto rt_libdevice_path = get_cuda_home() + "/nvvm/libdevice/libdevice.10.bc";
285  if (boost::filesystem::exists(rt_libdevice_path)) {
287  rt_libdevice_path;
288  } else {
289  LOG(WARNING) << "File " << rt_libdevice_path
290  << " does not exist; support for some UDF "
291  "functions might not be available.";
292  }
293 #endif
294  }
295 }
296 
297 void Executor::reset(bool discard_runtime_modules_only) {
298  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
299  auto qe = QueryEngine::getInstance();
300  qe->s_code_accessor->clear();
301  qe->s_stubs_accessor->clear();
302  qe->cpu_code_accessor->clear();
303  qe->gpu_code_accessor->clear();
304  qe->tf_code_accessor->clear();
305 
306  if (discard_runtime_modules_only) {
307  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
308 #ifdef HAVE_CUDA
309  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
310 #endif
311  cgen_state_->module_ = nullptr;
312  } else {
313  extension_modules_.clear();
314  cgen_state_.reset();
315  context_.reset(new llvm::LLVMContext());
316  cgen_state_.reset(new CgenState({}, false, this));
317  }
318 }
319 
320 void Executor::update_extension_modules(bool update_runtime_modules_only) {
321  auto read_module = [&](Executor::ExtModuleKinds module_kind,
322  const std::string& source) {
323  /*
324  source can be either a filename of a LLVM IR
325  or LLVM BC source, or a string containing
326  LLVM IR code.
327  */
328  CHECK(!source.empty());
329  switch (module_kind) {
333  return read_llvm_module_from_bc_file(source, getContext());
334  }
336  return read_llvm_module_from_ir_file(source, getContext(), false);
337  }
339  return read_llvm_module_from_ir_file(source, getContext(), true);
340  }
342  return read_llvm_module_from_ir_string(source, getContext(), false);
343  }
345  return read_llvm_module_from_ir_string(source, getContext(), true);
346  }
347  default: {
348  UNREACHABLE();
349  return std::unique_ptr<llvm::Module>();
350  }
351  }
352  };
353  auto update_module = [&](Executor::ExtModuleKinds module_kind,
354  bool erase_not_found = false) {
355  auto it = Executor::extension_module_sources.find(module_kind);
356  if (it != Executor::extension_module_sources.end()) {
357  auto llvm_module = read_module(module_kind, it->second);
358  if (llvm_module) {
359  extension_modules_[module_kind] = std::move(llvm_module);
360  } else if (erase_not_found) {
361  extension_modules_.erase(module_kind);
362  } else {
363  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
364  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
365  << " LLVM module. The module will be unavailable.";
366  } else {
367  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
368  << " LLVM module. Using the existing module.";
369  }
370  }
371  } else {
372  if (erase_not_found) {
373  extension_modules_.erase(module_kind);
374  } else {
375  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
376  LOG(WARNING) << "Source of " << ::toString(module_kind)
377  << " LLVM module is unavailable. The module will be unavailable.";
378  } else {
379  LOG(WARNING) << "Source of " << ::toString(module_kind)
380  << " LLVM module is unavailable. Using the existing module.";
381  }
382  }
383  }
384  };
385 
386  if (!update_runtime_modules_only) {
387  // required compile-time modules, their requirements are enforced
388  // by Executor::initialize_extension_module_sources():
390 #ifdef ENABLE_GEOS
392 #endif
393  // load-time modules, these are optional:
394  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
395 #ifdef HAVE_CUDA
396  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
398 #endif
399  }
400  // run-time modules, these are optional and erasable:
401  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
402 #ifdef HAVE_CUDA
403  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
404 #endif
405 }
406 
407 // Used by StubGenerator::generateStub
409  : executor_(executor)
410  , lock_queue_clock_(timer_start())
411  , lock_(executor_.compilation_mutex_)
412  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
413 {
414  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
415  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
416 }
417 
419  Executor& executor,
420  const bool allow_lazy_fetch,
421  const std::vector<InputTableInfo>& query_infos,
422  const PlanState::DeletedColumnsMap& deleted_cols_map,
423  const RelAlgExecutionUnit* ra_exe_unit)
424  : executor_(executor)
425  , lock_queue_clock_(timer_start())
426  , lock_(executor_.compilation_mutex_)
427  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
428 {
429  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
430  // nukeOldState creates new CgenState and PlanState instances for
431  // the subsequent code generation. It also resets
432  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
433  // not currently restore.. should we accumulate these timings?
434  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
435 }
436 
438  // prevent memory leak from hoisted literals
439  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
440  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
441  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
442  // The llvm::Value instance stored in p.first is created by the
443  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
444  p.first->deleteValue();
445  }
446  }
447  executor_.cgen_state_->row_func_hoisted_literals_.clear();
448 
449  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
450  // to the old CgenState instance as the execution of the generated
451  // code uses these bitmaps
452 
453  for (auto& str_dict_translation_mgr :
454  executor_.cgen_state_->str_dict_translation_mgrs_) {
455  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
456  }
457  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
458 
459  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
460  cgen_state_->moveInValuesBitmap(bm);
461  }
462  executor_.cgen_state_->in_values_bitmaps_.clear();
463 
464  // restore the old CgenState instance
465  executor_.cgen_state_.reset(cgen_state_.release());
466 }
467 
468 std::shared_ptr<Executor> Executor::getExecutor(
469  const ExecutorId executor_id,
470  const std::string& debug_dir,
471  const std::string& debug_file,
472  const SystemParameters& system_parameters) {
474 
476  auto it = executors_.find(executor_id);
477  if (it != executors_.end()) {
478  return it->second;
479  }
481  auto executor = std::make_shared<Executor>(executor_id,
482  &data_mgr,
483  system_parameters.cuda_block_size,
484  system_parameters.cuda_grid_size,
485  system_parameters.max_gpu_slab_size,
486  debug_dir,
487  debug_file);
488  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
489  return executor;
490 }
491 
493  switch (memory_level) {
497  execute_mutex_); // Don't flush memory while queries are running
498 
499  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
500  // The hash table cache uses CPU memory not managed by the buffer manager. In the
501  // future, we should manage these allocations with the buffer manager directly.
502  // For now, assume the user wants to purge the hash table cache when they clear
503  // CPU memory (currently used in ExecuteTest to lower memory pressure)
505  }
508  break;
509  }
510  default: {
511  throw std::runtime_error(
512  "Clearing memory levels other than the CPU level or GPU level is not "
513  "supported.");
514  }
515  }
516 }
517 
519  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
520 }
521 
523  const int dict_id_in,
524  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
525  const bool with_generation) const {
526  CHECK(row_set_mem_owner);
527  std::lock_guard<std::mutex> lock(
528  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
529  return row_set_mem_owner->getOrAddStringDictProxy(
530  dict_id_in, with_generation, catalog_);
531 }
532 
534  const int dict_id_in,
535  const bool with_generation,
536  const Catalog_Namespace::Catalog* catalog) {
537  const int dict_id{dict_id_in < 0 ? REGULAR_DICT(dict_id_in) : dict_id_in};
538  CHECK(catalog);
539  const auto dd = catalog->getMetadataForDict(dict_id);
540  if (dd) {
541  CHECK(dd->stringDict);
542  CHECK_LE(dd->dictNBits, 32);
543  const int64_t generation =
544  with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
545  return addStringDict(dd->stringDict, dict_id, generation);
546  }
548  if (!lit_str_dict_proxy_) {
549  DictRef literal_dict_ref(catalog->getDatabaseId(), DictRef::literalsDictId);
550  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
551  literal_dict_ref, "", false, true, g_cache_string_hash);
552  lit_str_dict_proxy_ =
553  std::make_shared<StringDictionaryProxy>(tsd, literal_dict_ref.dictId, 0);
554  }
555  return lit_str_dict_proxy_.get();
556 }
557 
559  const int source_dict_id,
560  const int dest_dict_id,
561  const RowSetMemoryOwner::StringTranslationType translation_type,
562  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
563  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
564  const bool with_generation) const {
565  CHECK(row_set_mem_owner);
566  std::lock_guard<std::mutex> lock(
567  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
568  return row_set_mem_owner->getOrAddStringProxyTranslationMap(source_dict_id,
569  dest_dict_id,
570  with_generation,
571  translation_type,
572  string_op_infos,
573  catalog_);
574 }
575 
578  const StringDictionaryProxy* source_proxy,
579  StringDictionaryProxy* dest_proxy,
580  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
581  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
582  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
583  CHECK(row_set_mem_owner);
584  std::lock_guard<std::mutex> lock(
585  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
586  // First translate lhs onto itself if there are string ops
587  if (!dest_string_op_infos.empty()) {
588  row_set_mem_owner->addStringProxyUnionTranslationMap(
589  dest_proxy, dest_proxy, dest_string_op_infos);
590  }
591  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
592  source_proxy, dest_proxy, source_string_op_infos);
593 }
594 
596  const int source_dict_id_in,
597  const int dest_dict_id_in,
598  const bool with_generation,
599  const RowSetMemoryOwner::StringTranslationType translation_type,
600  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
601  const Catalog_Namespace::Catalog* catalog) {
602  const auto source_proxy =
603  getOrAddStringDictProxy(source_dict_id_in, with_generation, catalog);
604  auto dest_proxy = getOrAddStringDictProxy(dest_dict_id_in, with_generation, catalog);
606  return addStringProxyIntersectionTranslationMap(
607  source_proxy, dest_proxy, string_op_infos);
608  } else {
609  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
610  }
611 }
612 
614  std::lock_guard<std::mutex> lock(state_mutex_);
615  return t_digests_
616  .emplace_back(std::make_unique<quantile::TDigest>(
618  .get();
619 }
620 
621 bool Executor::isCPUOnly() const {
622  CHECK(data_mgr_);
623  return !data_mgr_->getCudaMgr();
624 }
625 
627  const Analyzer::ColumnVar* col_var) const {
629  col_var->get_column_id(), col_var->get_table_id(), *catalog_);
630 }
631 
633  const Analyzer::ColumnVar* col_var,
634  int n) const {
635  const auto cd = getColumnDescriptor(col_var);
636  if (!cd || n > cd->columnType.get_physical_cols()) {
637  return nullptr;
638  }
640  col_var->get_column_id() + n, col_var->get_table_id(), *catalog_);
641 }
642 
644  return catalog_;
645 }
646 
648  catalog_ = catalog;
649 }
650 
651 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
652  return row_set_mem_owner_;
653 }
654 
656  return temporary_tables_;
657 }
658 
660  return input_table_info_cache_.getTableInfo(table_id);
661 }
662 
663 const TableGeneration& Executor::getTableGeneration(const int table_id) const {
664  return table_generations_.getGeneration(table_id);
665 }
666 
668  return agg_col_range_cache_.getColRange(phys_input);
669 }
670 
671 size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const {
672  size_t num_bytes = 0;
673  if (!plan_state_) {
674  return 0;
675  }
676  for (const auto& fetched_col_pair : plan_state_->columns_to_fetch_) {
677  if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
678  continue;
679  }
680 
681  if (fetched_col_pair.first < 0) {
682  num_bytes += 8;
683  } else {
684  const auto cd =
685  catalog_->getMetadataForColumn(fetched_col_pair.first, fetched_col_pair.second);
686  const auto& ti = cd->columnType;
687  const auto sz = ti.get_size();
688  if (sz < 0) {
689  // for varlen types, only account for the pointer/size for each row, for now
690  if (!ti.is_logical_geo_type()) {
691  // Don't count size for logical geo types, as they are
692  // backed by physical columns
693  num_bytes += 16;
694  }
695  } else {
696  num_bytes += sz;
697  }
698  }
699  }
700  return num_bytes;
701 }
702 
704  const std::vector<Analyzer::Expr*>& target_exprs) const {
706  for (const auto target_expr : target_exprs) {
707  if (plan_state_->isLazyFetchColumn(target_expr)) {
708  return true;
709  }
710  }
711  return false;
712 }
713 
714 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
715  const std::vector<Analyzer::Expr*>& target_exprs) const {
717  CHECK(catalog_);
718  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
719  for (const auto target_expr : target_exprs) {
720  if (!plan_state_->isLazyFetchColumn(target_expr)) {
721  col_lazy_fetch_info.emplace_back(
722  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
723  } else {
724  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
725  CHECK(col_var);
726  auto col_id = col_var->get_column_id();
727  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
728  auto cd = (col_var->get_table_id() > 0)
729  ? get_column_descriptor(col_id, col_var->get_table_id(), *catalog_)
730  : nullptr;
731  if (cd && IS_GEO(cd->columnType.get_type())) {
732  // Geo coords cols will be processed in sequence. So we only need to track the
733  // first coords col in lazy fetch info.
734  {
735  auto cd0 =
736  get_column_descriptor(col_id + 1, col_var->get_table_id(), *catalog_);
737  auto col0_ti = cd0->columnType;
738  CHECK(!cd0->isVirtualCol);
739  auto col0_var = makeExpr<Analyzer::ColumnVar>(
740  col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
741  auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
742  col_lazy_fetch_info.emplace_back(
743  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
744  }
745  } else {
746  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
747  const auto& col_ti = col_var->get_type_info();
748  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
749  }
750  }
751  }
752  return col_lazy_fetch_info;
753 }
754 
759 }
760 
761 std::vector<int8_t> Executor::serializeLiterals(
762  const std::unordered_map<int, CgenState::LiteralValues>& literals,
763  const int device_id) {
764  if (literals.empty()) {
765  return {};
766  }
767  const auto dev_literals_it = literals.find(device_id);
768  CHECK(dev_literals_it != literals.end());
769  const auto& dev_literals = dev_literals_it->second;
770  size_t lit_buf_size{0};
771  std::vector<std::string> real_strings;
772  std::vector<std::vector<double>> double_array_literals;
773  std::vector<std::vector<int8_t>> align64_int8_array_literals;
774  std::vector<std::vector<int32_t>> int32_array_literals;
775  std::vector<std::vector<int8_t>> align32_int8_array_literals;
776  std::vector<std::vector<int8_t>> int8_array_literals;
777  for (const auto& lit : dev_literals) {
778  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
779  if (lit.which() == 7) {
780  const auto p = boost::get<std::string>(&lit);
781  CHECK(p);
782  real_strings.push_back(*p);
783  } else if (lit.which() == 8) {
784  const auto p = boost::get<std::vector<double>>(&lit);
785  CHECK(p);
786  double_array_literals.push_back(*p);
787  } else if (lit.which() == 9) {
788  const auto p = boost::get<std::vector<int32_t>>(&lit);
789  CHECK(p);
790  int32_array_literals.push_back(*p);
791  } else if (lit.which() == 10) {
792  const auto p = boost::get<std::vector<int8_t>>(&lit);
793  CHECK(p);
794  int8_array_literals.push_back(*p);
795  } else if (lit.which() == 11) {
796  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
797  CHECK(p);
798  if (p->second == 64) {
799  align64_int8_array_literals.push_back(p->first);
800  } else if (p->second == 32) {
801  align32_int8_array_literals.push_back(p->first);
802  } else {
803  CHECK(false);
804  }
805  }
806  }
807  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
808  throw TooManyLiterals();
809  }
810  int16_t crt_real_str_off = lit_buf_size;
811  for (const auto& real_str : real_strings) {
812  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
813  lit_buf_size += real_str.size();
814  }
815  if (double_array_literals.size() > 0) {
816  lit_buf_size = align(lit_buf_size, sizeof(double));
817  }
818  int16_t crt_double_arr_lit_off = lit_buf_size;
819  for (const auto& double_array_literal : double_array_literals) {
820  CHECK_LE(double_array_literal.size(),
821  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
822  lit_buf_size += double_array_literal.size() * sizeof(double);
823  }
824  if (align64_int8_array_literals.size() > 0) {
825  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
826  }
827  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
828  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
829  CHECK_LE(align64_int8_array_literals.size(),
830  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
831  lit_buf_size += align64_int8_array_literal.size();
832  }
833  if (int32_array_literals.size() > 0) {
834  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
835  }
836  int16_t crt_int32_arr_lit_off = lit_buf_size;
837  for (const auto& int32_array_literal : int32_array_literals) {
838  CHECK_LE(int32_array_literal.size(),
839  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
840  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
841  }
842  if (align32_int8_array_literals.size() > 0) {
843  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
844  }
845  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
846  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
847  CHECK_LE(align32_int8_array_literals.size(),
848  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
849  lit_buf_size += align32_int8_array_literal.size();
850  }
851  int16_t crt_int8_arr_lit_off = lit_buf_size;
852  for (const auto& int8_array_literal : int8_array_literals) {
853  CHECK_LE(int8_array_literal.size(),
854  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
855  lit_buf_size += int8_array_literal.size();
856  }
857  unsigned crt_real_str_idx = 0;
858  unsigned crt_double_arr_lit_idx = 0;
859  unsigned crt_align64_int8_arr_lit_idx = 0;
860  unsigned crt_int32_arr_lit_idx = 0;
861  unsigned crt_align32_int8_arr_lit_idx = 0;
862  unsigned crt_int8_arr_lit_idx = 0;
863  std::vector<int8_t> serialized(lit_buf_size);
864  size_t off{0};
865  for (const auto& lit : dev_literals) {
866  const auto lit_bytes = CgenState::literalBytes(lit);
867  off = CgenState::addAligned(off, lit_bytes);
868  switch (lit.which()) {
869  case 0: {
870  const auto p = boost::get<int8_t>(&lit);
871  CHECK(p);
872  serialized[off - lit_bytes] = *p;
873  break;
874  }
875  case 1: {
876  const auto p = boost::get<int16_t>(&lit);
877  CHECK(p);
878  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
879  break;
880  }
881  case 2: {
882  const auto p = boost::get<int32_t>(&lit);
883  CHECK(p);
884  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
885  break;
886  }
887  case 3: {
888  const auto p = boost::get<int64_t>(&lit);
889  CHECK(p);
890  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
891  break;
892  }
893  case 4: {
894  const auto p = boost::get<float>(&lit);
895  CHECK(p);
896  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
897  break;
898  }
899  case 5: {
900  const auto p = boost::get<double>(&lit);
901  CHECK(p);
902  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
903  break;
904  }
905  case 6: {
906  const auto p = boost::get<std::pair<std::string, int>>(&lit);
907  CHECK(p);
908  const auto str_id =
910  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
911  ->getOrAddTransient(p->first)
912  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
913  ->getIdOfString(p->first);
914  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
915  break;
916  }
917  case 7: {
918  const auto p = boost::get<std::string>(&lit);
919  CHECK(p);
920  int32_t off_and_len = crt_real_str_off << 16;
921  const auto& crt_real_str = real_strings[crt_real_str_idx];
922  off_and_len |= static_cast<int16_t>(crt_real_str.size());
923  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
924  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
925  ++crt_real_str_idx;
926  crt_real_str_off += crt_real_str.size();
927  break;
928  }
929  case 8: {
930  const auto p = boost::get<std::vector<double>>(&lit);
931  CHECK(p);
932  int32_t off_and_len = crt_double_arr_lit_off << 16;
933  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
934  int32_t len = crt_double_arr_lit.size();
935  CHECK_EQ((len >> 16), 0);
936  off_and_len |= static_cast<int16_t>(len);
937  int32_t double_array_bytesize = len * sizeof(double);
938  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
939  memcpy(&serialized[crt_double_arr_lit_off],
940  crt_double_arr_lit.data(),
941  double_array_bytesize);
942  ++crt_double_arr_lit_idx;
943  crt_double_arr_lit_off += double_array_bytesize;
944  break;
945  }
946  case 9: {
947  const auto p = boost::get<std::vector<int32_t>>(&lit);
948  CHECK(p);
949  int32_t off_and_len = crt_int32_arr_lit_off << 16;
950  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
951  int32_t len = crt_int32_arr_lit.size();
952  CHECK_EQ((len >> 16), 0);
953  off_and_len |= static_cast<int16_t>(len);
954  int32_t int32_array_bytesize = len * sizeof(int32_t);
955  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
956  memcpy(&serialized[crt_int32_arr_lit_off],
957  crt_int32_arr_lit.data(),
958  int32_array_bytesize);
959  ++crt_int32_arr_lit_idx;
960  crt_int32_arr_lit_off += int32_array_bytesize;
961  break;
962  }
963  case 10: {
964  const auto p = boost::get<std::vector<int8_t>>(&lit);
965  CHECK(p);
966  int32_t off_and_len = crt_int8_arr_lit_off << 16;
967  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
968  int32_t len = crt_int8_arr_lit.size();
969  CHECK_EQ((len >> 16), 0);
970  off_and_len |= static_cast<int16_t>(len);
971  int32_t int8_array_bytesize = len;
972  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
973  memcpy(&serialized[crt_int8_arr_lit_off],
974  crt_int8_arr_lit.data(),
975  int8_array_bytesize);
976  ++crt_int8_arr_lit_idx;
977  crt_int8_arr_lit_off += int8_array_bytesize;
978  break;
979  }
980  case 11: {
981  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
982  CHECK(p);
983  if (p->second == 64) {
984  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
985  const auto& crt_align64_int8_arr_lit =
986  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
987  int32_t len = crt_align64_int8_arr_lit.size();
988  CHECK_EQ((len >> 16), 0);
989  off_and_len |= static_cast<int16_t>(len);
990  int32_t align64_int8_array_bytesize = len;
991  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
992  memcpy(&serialized[crt_align64_int8_arr_lit_off],
993  crt_align64_int8_arr_lit.data(),
994  align64_int8_array_bytesize);
995  ++crt_align64_int8_arr_lit_idx;
996  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
997  } else if (p->second == 32) {
998  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
999  const auto& crt_align32_int8_arr_lit =
1000  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1001  int32_t len = crt_align32_int8_arr_lit.size();
1002  CHECK_EQ((len >> 16), 0);
1003  off_and_len |= static_cast<int16_t>(len);
1004  int32_t align32_int8_array_bytesize = len;
1005  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1006  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1007  crt_align32_int8_arr_lit.data(),
1008  align32_int8_array_bytesize);
1009  ++crt_align32_int8_arr_lit_idx;
1010  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1011  } else {
1012  CHECK(false);
1013  }
1014  break;
1015  }
1016  default:
1017  CHECK(false);
1018  }
1019  }
1020  return serialized;
1021 }
1022 
1023 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1024  if (device_type == ExecutorDeviceType::GPU) {
1025  return cudaMgr()->getDeviceCount();
1026  } else {
1027  return 1;
1028  }
1029 }
1030 
1032  const Data_Namespace::MemoryLevel memory_level) const {
1033  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1035 }
1036 
1037 // TODO(alex): remove or split
1038 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1039  const SQLTypeInfo& ti,
1040  const int64_t agg_init_val,
1041  const int8_t out_byte_width,
1042  const int64_t* out_vec,
1043  const size_t out_vec_sz,
1044  const bool is_group_by,
1045  const bool float_argument_input) {
1046  switch (agg) {
1047  case kAVG:
1048  case kSUM:
1049  if (0 != agg_init_val) {
1050  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1051  int64_t agg_result = agg_init_val;
1052  for (size_t i = 0; i < out_vec_sz; ++i) {
1053  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1054  }
1055  return {agg_result, 0};
1056  } else {
1057  CHECK(ti.is_fp());
1058  switch (out_byte_width) {
1059  case 4: {
1060  int agg_result = static_cast<int32_t>(agg_init_val);
1061  for (size_t i = 0; i < out_vec_sz; ++i) {
1063  &agg_result,
1064  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1065  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1066  }
1067  const int64_t converted_bin =
1068  float_argument_input
1069  ? static_cast<int64_t>(agg_result)
1070  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1071  return {converted_bin, 0};
1072  break;
1073  }
1074  case 8: {
1075  int64_t agg_result = agg_init_val;
1076  for (size_t i = 0; i < out_vec_sz; ++i) {
1078  &agg_result,
1079  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1080  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1081  }
1082  return {agg_result, 0};
1083  break;
1084  }
1085  default:
1086  CHECK(false);
1087  }
1088  }
1089  }
1090  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1091  int64_t agg_result = 0;
1092  for (size_t i = 0; i < out_vec_sz; ++i) {
1093  agg_result += out_vec[i];
1094  }
1095  return {agg_result, 0};
1096  } else {
1097  CHECK(ti.is_fp());
1098  switch (out_byte_width) {
1099  case 4: {
1100  float r = 0.;
1101  for (size_t i = 0; i < out_vec_sz; ++i) {
1102  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1103  }
1104  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1105  const int64_t converted_bin =
1106  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1107  return {converted_bin, 0};
1108  }
1109  case 8: {
1110  double r = 0.;
1111  for (size_t i = 0; i < out_vec_sz; ++i) {
1112  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1113  }
1114  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1115  }
1116  default:
1117  CHECK(false);
1118  }
1119  }
1120  break;
1121  case kCOUNT: {
1122  uint64_t agg_result = 0;
1123  for (size_t i = 0; i < out_vec_sz; ++i) {
1124  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1125  agg_result += out;
1126  }
1127  return {static_cast<int64_t>(agg_result), 0};
1128  }
1129  case kMIN: {
1130  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1131  int64_t agg_result = agg_init_val;
1132  for (size_t i = 0; i < out_vec_sz; ++i) {
1133  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1134  }
1135  return {agg_result, 0};
1136  } else {
1137  switch (out_byte_width) {
1138  case 4: {
1139  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1140  for (size_t i = 0; i < out_vec_sz; ++i) {
1142  &agg_result,
1143  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1144  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1145  }
1146  const int64_t converted_bin =
1147  float_argument_input
1148  ? static_cast<int64_t>(agg_result)
1149  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1150  return {converted_bin, 0};
1151  }
1152  case 8: {
1153  int64_t agg_result = agg_init_val;
1154  for (size_t i = 0; i < out_vec_sz; ++i) {
1156  &agg_result,
1157  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1158  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1159  }
1160  return {agg_result, 0};
1161  }
1162  default:
1163  CHECK(false);
1164  }
1165  }
1166  }
1167  case kMAX:
1168  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1169  int64_t agg_result = agg_init_val;
1170  for (size_t i = 0; i < out_vec_sz; ++i) {
1171  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1172  }
1173  return {agg_result, 0};
1174  } else {
1175  switch (out_byte_width) {
1176  case 4: {
1177  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1178  for (size_t i = 0; i < out_vec_sz; ++i) {
1180  &agg_result,
1181  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1182  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1183  }
1184  const int64_t converted_bin =
1185  float_argument_input ? static_cast<int64_t>(agg_result)
1186  : float_to_double_bin(agg_result, !ti.get_notnull());
1187  return {converted_bin, 0};
1188  }
1189  case 8: {
1190  int64_t agg_result = agg_init_val;
1191  for (size_t i = 0; i < out_vec_sz; ++i) {
1193  &agg_result,
1194  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1195  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1196  }
1197  return {agg_result, 0};
1198  }
1199  default:
1200  CHECK(false);
1201  }
1202  }
1203  case kSINGLE_VALUE: {
1204  int64_t agg_result = agg_init_val;
1205  for (size_t i = 0; i < out_vec_sz; ++i) {
1206  if (out_vec[i] != agg_init_val) {
1207  if (agg_result == agg_init_val) {
1208  agg_result = out_vec[i];
1209  } else if (out_vec[i] != agg_result) {
1211  }
1212  }
1213  }
1214  return {agg_result, 0};
1215  }
1216  case kSAMPLE: {
1217  int64_t agg_result = agg_init_val;
1218  for (size_t i = 0; i < out_vec_sz; ++i) {
1219  if (out_vec[i] != agg_init_val) {
1220  agg_result = out_vec[i];
1221  break;
1222  }
1223  }
1224  return {agg_result, 0};
1225  }
1226  default:
1227  CHECK(false);
1228  }
1229  abort();
1230 }
1231 
1232 namespace {
1233 
1235  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1236  std::vector<TargetInfo> const& targets) {
1237  auto& first = results_per_device.front().first;
1238  CHECK(first);
1239  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1240  if (first_target_idx) {
1241  first->translateDictEncodedColumns(targets, *first_target_idx);
1242  }
1243  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1244  const auto& next = results_per_device[dev_idx].first;
1245  CHECK(next);
1246  if (first_target_idx) {
1247  next->translateDictEncodedColumns(targets, *first_target_idx);
1248  }
1249  first->append(*next);
1250  }
1251  return std::move(first);
1252 }
1253 
1255  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1256  return get_target_info(target_expr, g_bigint_count);
1257  }
1258 };
1259 
1260 } // namespace
1261 
1263  const RelAlgExecutionUnit& ra_exe_unit) {
1264  auto timer = DEBUG_TIMER(__func__);
1265  auto& results_per_device = shared_context.getFragmentResults();
1266  auto const targets = shared::transform<std::vector<TargetInfo>>(
1267  ra_exe_unit.target_exprs, GetTargetInfo{});
1268  if (results_per_device.empty()) {
1269  return std::make_shared<ResultSet>(targets,
1273  catalog_,
1274  blockSize(),
1275  gridSize());
1276  }
1277  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1278  std::sort(results_per_device.begin(),
1279  results_per_device.end(),
1280  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1281  CHECK_GE(lhs.second.size(), size_t(1));
1282  CHECK_GE(rhs.second.size(), size_t(1));
1283  return lhs.second.front() < rhs.second.front();
1284  });
1285 
1286  return get_merged_result(results_per_device, targets);
1287 }
1288 
1290  const RelAlgExecutionUnit& ra_exe_unit,
1291  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1292  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1293  const QueryMemoryDescriptor& query_mem_desc) const {
1294  auto timer = DEBUG_TIMER(__func__);
1295  if (ra_exe_unit.estimator) {
1296  return reduce_estimator_results(ra_exe_unit, results_per_device);
1297  }
1298 
1299  if (results_per_device.empty()) {
1300  auto const targets = shared::transform<std::vector<TargetInfo>>(
1301  ra_exe_unit.target_exprs, GetTargetInfo{});
1302  return std::make_shared<ResultSet>(targets,
1305  nullptr,
1306  catalog_,
1307  blockSize(),
1308  gridSize());
1309  }
1310 
1312  results_per_device,
1313  row_set_mem_owner,
1314  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1315 }
1316 
1317 namespace {
1318 
1320  const size_t executor_id,
1321  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1322  int64_t* compilation_queue_time) {
1323  auto clock_begin = timer_start();
1324  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1325  *compilation_queue_time = timer_stop(clock_begin);
1326  const auto& this_result_set = results_per_device[0].first;
1327  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1328  this_result_set->getTargetInfos(),
1329  this_result_set->getTargetInitVals(),
1330  executor_id);
1331  return reduction_jit.codegen();
1332 };
1333 
1334 } // namespace
1335 
1337  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1338  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1339  const QueryMemoryDescriptor& query_mem_desc) const {
1340  auto timer = DEBUG_TIMER(__func__);
1341  std::shared_ptr<ResultSet> reduced_results;
1342 
1343  const auto& first = results_per_device.front().first;
1344 
1345  if (query_mem_desc.getQueryDescriptionType() ==
1347  results_per_device.size() > 1) {
1348  const auto total_entry_count = std::accumulate(
1349  results_per_device.begin(),
1350  results_per_device.end(),
1351  size_t(0),
1352  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1353  const auto& r = rs.first;
1354  return init + r->getQueryMemDesc().getEntryCount();
1355  });
1356  CHECK(total_entry_count);
1357  auto query_mem_desc = first->getQueryMemDesc();
1358  query_mem_desc.setEntryCount(total_entry_count);
1359  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1362  row_set_mem_owner,
1363  catalog_,
1364  blockSize(),
1365  gridSize());
1366  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1367  reduced_results->initializeStorage();
1368  switch (query_mem_desc.getEffectiveKeyWidth()) {
1369  case 4:
1370  first->getStorage()->moveEntriesToBuffer<int32_t>(
1371  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1372  break;
1373  case 8:
1374  first->getStorage()->moveEntriesToBuffer<int64_t>(
1375  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1376  break;
1377  default:
1378  CHECK(false);
1379  }
1380  } else {
1381  reduced_results = first;
1382  }
1383 
1384  int64_t compilation_queue_time = 0;
1385  const auto reduction_code =
1386  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1387 
1388  for (size_t i = 1; i < results_per_device.size(); ++i) {
1389  reduced_results->getStorage()->reduce(
1390  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1391  }
1392  reduced_results->addCompilationQueueTime(compilation_queue_time);
1393  return reduced_results;
1394 }
1395 
1397  const RelAlgExecutionUnit& ra_exe_unit,
1398  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1399  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1400  const QueryMemoryDescriptor& query_mem_desc) const {
1401  if (results_per_device.size() == 1) {
1402  return std::move(results_per_device.front().first);
1403  }
1404  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1406  for (const auto& result : results_per_device) {
1407  auto rows = result.first;
1408  CHECK(rows);
1409  if (!rows) {
1410  continue;
1411  }
1412  SpeculativeTopNMap that(
1413  *rows,
1414  ra_exe_unit.target_exprs,
1415  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1416  m.reduce(that);
1417  }
1418  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1419  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1420  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1421 }
1422 
1423 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1424  CHECK(data_mgr);
1425  std::unordered_set<int> available_gpus;
1426  if (data_mgr->gpusPresent()) {
1427  CHECK(data_mgr->getCudaMgr());
1428  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1429  CHECK_GT(gpu_count, 0);
1430  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1431  available_gpus.insert(gpu_id);
1432  }
1433  }
1434  return available_gpus;
1435 }
1436 
1437 size_t get_context_count(const ExecutorDeviceType device_type,
1438  const size_t cpu_count,
1439  const size_t gpu_count) {
1440  return device_type == ExecutorDeviceType::GPU ? gpu_count
1441  : static_cast<size_t>(cpu_count);
1442 }
1443 
1444 namespace {
1445 
1446 // Compute a very conservative entry count for the output buffer entry count using no
1447 // other information than the number of tuples in each table and multiplying them
1448 // together.
1449 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos) {
1451  // Check for overflows since we're multiplying potentially big table sizes.
1452  using checked_size_t = boost::multiprecision::number<
1453  boost::multiprecision::cpp_int_backend<64,
1454  64,
1455  boost::multiprecision::unsigned_magnitude,
1456  boost::multiprecision::checked,
1457  void>>;
1458  checked_size_t max_groups_buffer_entry_guess = 1;
1459  for (const auto& query_info : query_infos) {
1460  CHECK(!query_info.info.fragments.empty());
1461  auto it = std::max_element(query_info.info.fragments.begin(),
1462  query_info.info.fragments.end(),
1463  [](const FragmentInfo& f1, const FragmentInfo& f2) {
1464  return f1.getNumTuples() < f2.getNumTuples();
1465  });
1466  max_groups_buffer_entry_guess *= it->getNumTuples();
1467  }
1468  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1469  // baseline group layout with that many entries anyway.
1470  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1471  try {
1472  return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1473  max_groups_buffer_entry_guess_cap);
1474  } catch (...) {
1475  return max_groups_buffer_entry_guess_cap;
1476  }
1477 }
1478 
1479 std::string get_table_name(const InputDescriptor& input_desc,
1481  const auto source_type = input_desc.getSourceType();
1482  if (source_type == InputSourceType::TABLE) {
1483  const auto td = cat.getMetadataForTable(input_desc.getTableId());
1484  CHECK(td);
1485  return td->tableName;
1486  } else {
1487  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableId());
1488  }
1489 }
1490 
1491 inline size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type,
1492  const int device_count) {
1493  if (device_type == ExecutorDeviceType::GPU) {
1494  return device_count * Executor::high_scan_limit;
1495  }
1497 }
1498 
1500  const std::vector<InputTableInfo>& table_infos,
1502  const ExecutorDeviceType device_type,
1503  const int device_count) {
1504  for (const auto target_expr : ra_exe_unit.target_exprs) {
1505  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1506  return;
1507  }
1508  }
1509  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1510  table_infos.front().info.getPhysicalNumTuples() < Executor::high_scan_limit) {
1511  // Allow a query with no scan limit to run on small tables
1512  return;
1513  }
1514  if (ra_exe_unit.use_bump_allocator) {
1515  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1516  // relative to the size of the input), so we bypass this check for now
1517  return;
1518  }
1519  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1520  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1521  (!ra_exe_unit.scan_limit ||
1522  ra_exe_unit.scan_limit > getDeviceBasedScanLimit(device_type, device_count))) {
1523  std::vector<std::string> table_names;
1524  const auto& input_descs = ra_exe_unit.input_descs;
1525  for (const auto& input_desc : input_descs) {
1526  table_names.push_back(get_table_name(input_desc, cat));
1527  }
1528  if (!ra_exe_unit.scan_limit) {
1529  throw WatchdogException(
1530  "Projection query would require a scan without a limit on table(s): " +
1531  boost::algorithm::join(table_names, ", "));
1532  } else {
1533  throw WatchdogException(
1534  "Projection query output result set on table(s): " +
1535  boost::algorithm::join(table_names, ", ") + " would contain " +
1536  std::to_string(ra_exe_unit.scan_limit) +
1537  " rows, which is more than the current system limit of " +
1538  std::to_string(getDeviceBasedScanLimit(device_type, device_count)));
1539  }
1540  }
1541 }
1542 
1543 } // namespace
1544 
1545 bool is_trivial_loop_join(const std::vector<InputTableInfo>& query_infos,
1546  const RelAlgExecutionUnit& ra_exe_unit) {
1547  if (ra_exe_unit.input_descs.size() < 2) {
1548  return false;
1549  }
1550 
1551  // We only support loop join at the end of folded joins
1552  // where ra_exe_unit.input_descs.size() > 2 for now.
1553  const auto inner_table_id = ra_exe_unit.input_descs.back().getTableId();
1554 
1555  std::optional<size_t> inner_table_idx;
1556  for (size_t i = 0; i < query_infos.size(); ++i) {
1557  if (query_infos[i].table_id == inner_table_id) {
1558  inner_table_idx = i;
1559  break;
1560  }
1561  }
1562  CHECK(inner_table_idx);
1563  return query_infos[*inner_table_idx].info.getNumTuples() <=
1565 }
1566 
1567 namespace {
1568 
1569 template <typename T>
1570 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1571  std::vector<std::string> expr_strs;
1572  for (const auto& expr : expr_container) {
1573  if (!expr) {
1574  expr_strs.emplace_back("NULL");
1575  } else {
1576  expr_strs.emplace_back(expr->toString());
1577  }
1578  }
1579  return expr_strs;
1580 }
1581 
1582 template <>
1583 std::vector<std::string> expr_container_to_string(
1584  const std::list<Analyzer::OrderEntry>& expr_container) {
1585  std::vector<std::string> expr_strs;
1586  for (const auto& expr : expr_container) {
1587  expr_strs.emplace_back(expr.toString());
1588  }
1589  return expr_strs;
1590 }
1591 
1592 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1593  switch (algorithm) {
1595  return "ResultSet";
1597  return "Speculative Top N";
1599  return "Streaming Top N";
1600  }
1601  UNREACHABLE();
1602  return "";
1603 }
1604 
1605 } // namespace
1606 
1607 std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit) {
1608  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1609  // instead of ra_exec_unit description if possible
1610  std::ostringstream os;
1611  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1612  const auto& scan_desc = input_col_desc->getScanDesc();
1613  os << scan_desc.getTableId() << "," << input_col_desc->getColId() << ","
1614  << scan_desc.getNestLevel();
1615  }
1616  if (!ra_exe_unit.simple_quals.empty()) {
1617  for (const auto& qual : ra_exe_unit.simple_quals) {
1618  if (qual) {
1619  os << qual->toString() << ",";
1620  }
1621  }
1622  }
1623  if (!ra_exe_unit.quals.empty()) {
1624  for (const auto& qual : ra_exe_unit.quals) {
1625  if (qual) {
1626  os << qual->toString() << ",";
1627  }
1628  }
1629  }
1630  if (!ra_exe_unit.join_quals.empty()) {
1631  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1632  const auto& join_condition = ra_exe_unit.join_quals[i];
1633  os << std::to_string(i) << ::toString(join_condition.type);
1634  for (const auto& qual : join_condition.quals) {
1635  if (qual) {
1636  os << qual->toString() << ",";
1637  }
1638  }
1639  }
1640  }
1641  if (!ra_exe_unit.groupby_exprs.empty()) {
1642  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1643  if (qual) {
1644  os << qual->toString() << ",";
1645  }
1646  }
1647  }
1648  for (const auto& expr : ra_exe_unit.target_exprs) {
1649  if (expr) {
1650  os << expr->toString() << ",";
1651  }
1652  }
1653  os << ::toString(ra_exe_unit.estimator == nullptr);
1654  os << std::to_string(ra_exe_unit.scan_limit);
1655  return os.str();
1656 }
1657 
1658 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
1659  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
1660  os << "\n\tTable/Col/Levels: ";
1661  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1662  const auto& scan_desc = input_col_desc->getScanDesc();
1663  os << "(" << scan_desc.getTableId() << ", " << input_col_desc->getColId() << ", "
1664  << scan_desc.getNestLevel() << ") ";
1665  }
1666  if (!ra_exe_unit.simple_quals.empty()) {
1667  os << "\n\tSimple Quals: "
1669  ", ");
1670  }
1671  if (!ra_exe_unit.quals.empty()) {
1672  os << "\n\tQuals: "
1673  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
1674  }
1675  if (!ra_exe_unit.join_quals.empty()) {
1676  os << "\n\tJoin Quals: ";
1677  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1678  const auto& join_condition = ra_exe_unit.join_quals[i];
1679  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
1680  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
1681  }
1682  }
1683  if (!ra_exe_unit.groupby_exprs.empty()) {
1684  os << "\n\tGroup By: "
1686  ", ");
1687  }
1688  os << "\n\tProjected targets: "
1690  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
1691  os << "\n\tSort Info: ";
1692  const auto& sort_info = ra_exe_unit.sort_info;
1693  os << "\n\t Order Entries: "
1694  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
1695  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
1696  os << "\n\t Limit: " << std::to_string(sort_info.limit);
1697  os << "\n\t Offset: " << std::to_string(sort_info.offset);
1698  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
1699  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
1700  if (ra_exe_unit.union_all) {
1701  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
1702  }
1703  return os;
1704 }
1705 
1706 namespace {
1707 
1709  const size_t new_scan_limit) {
1710  return {ra_exe_unit_in.input_descs,
1711  ra_exe_unit_in.input_col_descs,
1712  ra_exe_unit_in.simple_quals,
1713  ra_exe_unit_in.quals,
1714  ra_exe_unit_in.join_quals,
1715  ra_exe_unit_in.groupby_exprs,
1716  ra_exe_unit_in.target_exprs,
1717  ra_exe_unit_in.estimator,
1718  ra_exe_unit_in.sort_info,
1719  new_scan_limit,
1720  ra_exe_unit_in.query_hint,
1721  ra_exe_unit_in.query_plan_dag_hash,
1722  ra_exe_unit_in.hash_table_build_plan_dag,
1723  ra_exe_unit_in.table_id_to_node_map,
1724  ra_exe_unit_in.use_bump_allocator,
1725  ra_exe_unit_in.union_all,
1726  ra_exe_unit_in.query_state};
1727 }
1728 
1729 } // namespace
1730 
1731 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
1732  const bool is_agg,
1733  const std::vector<InputTableInfo>& query_infos,
1734  const RelAlgExecutionUnit& ra_exe_unit_in,
1735  const CompilationOptions& co,
1736  const ExecutionOptions& eo,
1738  RenderInfo* render_info,
1739  const bool has_cardinality_estimation,
1740  ColumnCacheMap& column_cache) {
1741  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1742 
1743  ScopeGuard cleanup_post_execution = [this] {
1744  // cleanup/unpin GPU buffer allocations
1745  // TODO: separate out this state into a single object
1746  plan_state_.reset(nullptr);
1747  if (cgen_state_) {
1748  cgen_state_->in_values_bitmaps_.clear();
1749  cgen_state_->str_dict_translation_mgrs_.clear();
1750  }
1751  };
1752 
1753  try {
1754  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1755  is_agg,
1756  true,
1757  query_infos,
1758  ra_exe_unit_in,
1759  co,
1760  eo,
1761  cat,
1763  render_info,
1764  has_cardinality_estimation,
1765  column_cache);
1766  if (result) {
1767  result->setKernelQueueTime(kernel_queue_time_ms_);
1768  result->addCompilationQueueTime(compilation_queue_time_ms_);
1769  if (eo.just_validate) {
1770  result->setValidationOnlyRes();
1771  }
1772  }
1773  return result;
1774  } catch (const CompilationRetryNewScanLimit& e) {
1775  auto result =
1776  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1777  is_agg,
1778  false,
1779  query_infos,
1780  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1781  co,
1782  eo,
1783  cat,
1785  render_info,
1786  has_cardinality_estimation,
1787  column_cache);
1788  if (result) {
1789  result->setKernelQueueTime(kernel_queue_time_ms_);
1790  result->addCompilationQueueTime(compilation_queue_time_ms_);
1791  if (eo.just_validate) {
1792  result->setValidationOnlyRes();
1793  }
1794  }
1795  return result;
1796  }
1797 }
1798 
1800  size_t& max_groups_buffer_entry_guess,
1801  const bool is_agg,
1802  const bool allow_single_frag_table_opt,
1803  const std::vector<InputTableInfo>& query_infos,
1804  const RelAlgExecutionUnit& ra_exe_unit_in,
1805  const CompilationOptions& co,
1806  const ExecutionOptions& eo,
1808  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1809  RenderInfo* render_info,
1810  const bool has_cardinality_estimation,
1811  ColumnCacheMap& column_cache) {
1812  INJECT_TIMER(Exec_executeWorkUnit);
1813  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1814  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1815  CHECK(!query_infos.empty());
1816  if (!max_groups_buffer_entry_guess) {
1817  // The query has failed the first execution attempt because of running out
1818  // of group by slots. Make the conservative choice: allocate fragment size
1819  // slots and run on the CPU.
1820  CHECK(device_type == ExecutorDeviceType::CPU);
1821  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1822  }
1823 
1824  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
1825  do {
1826  SharedKernelContext shared_context(query_infos);
1827  ColumnFetcher column_fetcher(this, column_cache);
1828  ScopeGuard scope_guard = [&column_fetcher] {
1829  column_fetcher.freeLinearizedBuf();
1830  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
1831  };
1832  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1833  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1834 
1835  if (eo.executor_type == ExecutorType::Native) {
1836  try {
1837  INJECT_TIMER(query_step_compilation);
1838  query_mem_desc_owned =
1839  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1840  crt_min_byte_width,
1841  has_cardinality_estimation,
1842  ra_exe_unit,
1843  query_infos,
1844  deleted_cols_map,
1845  column_fetcher,
1846  {device_type,
1847  co.hoist_literals,
1848  co.opt_level,
1850  co.allow_lazy_fetch,
1852  co.explain_type,
1854  eo,
1855  render_info,
1856  this);
1857  CHECK(query_mem_desc_owned);
1858  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1859  } catch (CompilationRetryNoCompaction&) {
1860  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1861  continue;
1862  }
1863  } else {
1864  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
1865  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1866  CHECK(!query_mem_desc_owned);
1867  query_mem_desc_owned.reset(
1869  }
1870  if (eo.just_explain) {
1871  return executeExplain(*query_comp_desc_owned);
1872  }
1873 
1874  for (const auto target_expr : ra_exe_unit.target_exprs) {
1875  plan_state_->target_exprs_.push_back(target_expr);
1876  }
1877 
1878  if (!eo.just_validate) {
1879  int available_cpus = cpu_threads();
1880  auto available_gpus = get_available_gpus(data_mgr_);
1881 
1882  const auto context_count =
1883  get_context_count(device_type, available_cpus, available_gpus.size());
1884  try {
1885  auto kernels = createKernels(shared_context,
1886  ra_exe_unit,
1887  column_fetcher,
1888  query_infos,
1889  eo,
1890  is_agg,
1891  allow_single_frag_table_opt,
1892  context_count,
1893  *query_comp_desc_owned,
1894  *query_mem_desc_owned,
1895  render_info,
1896  available_gpus,
1897  available_cpus);
1898  launchKernels(
1899  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1900  } catch (QueryExecutionError& e) {
1901  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1902  e.getErrorCode() == ERR_OUT_OF_TIME) {
1904  }
1905  if (e.getErrorCode() == ERR_INTERRUPTED) {
1907  }
1909  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1910  crt_min_byte_width <<= 1;
1911  continue;
1912  }
1913  throw;
1914  }
1915  }
1916  if (is_agg) {
1917  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
1918  // update query status to let user know we are now in the reduction phase
1919  std::string curRunningSession{""};
1920  std::string curRunningQuerySubmittedTime{""};
1921  bool sessionEnrolled = false;
1922  {
1925  curRunningSession = getCurrentQuerySession(session_read_lock);
1926  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1927  sessionEnrolled =
1928  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
1929  }
1930  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1931  sessionEnrolled) {
1932  updateQuerySessionStatus(curRunningSession,
1933  curRunningQuerySubmittedTime,
1935  }
1936  }
1937  try {
1938  return collectAllDeviceResults(shared_context,
1939  ra_exe_unit,
1940  *query_mem_desc_owned,
1941  query_comp_desc_owned->getDeviceType(),
1942  row_set_mem_owner);
1943  } catch (ReductionRanOutOfSlots&) {
1945  } catch (OverflowOrUnderflow&) {
1946  crt_min_byte_width <<= 1;
1947  continue;
1948  } catch (QueryExecutionError& e) {
1949  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
1950  << ", what(): " << e.what();
1951  throw QueryExecutionError(e.getErrorCode());
1952  }
1953  }
1954  return resultsUnion(shared_context, ra_exe_unit);
1955 
1956  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1957 
1958  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1961  nullptr,
1962  catalog_,
1963  blockSize(),
1964  gridSize());
1965 }
1966 
1968  const RelAlgExecutionUnit& ra_exe_unit_in,
1969  const InputTableInfo& table_info,
1970  const CompilationOptions& co,
1971  const ExecutionOptions& eo,
1973  PerFragmentCallBack& cb,
1974  const std::set<size_t>& fragment_indexes_param) {
1975  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
1976  ColumnCacheMap column_cache;
1977 
1978  std::vector<InputTableInfo> table_infos{table_info};
1979  SharedKernelContext kernel_context(table_infos);
1980 
1981  ColumnFetcher column_fetcher(this, column_cache);
1982  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1983  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1984  {
1985  query_mem_desc_owned =
1986  query_comp_desc_owned->compile(0,
1987  8,
1988  /*has_cardinality_estimation=*/false,
1989  ra_exe_unit,
1990  table_infos,
1991  deleted_cols_map,
1992  column_fetcher,
1993  co,
1994  eo,
1995  nullptr,
1996  this);
1997  }
1998  CHECK(query_mem_desc_owned);
1999  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2000  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
2001  const auto& outer_fragments = table_info.info.fragments;
2002 
2003  std::set<size_t> fragment_indexes;
2004  if (fragment_indexes_param.empty()) {
2005  // An empty `fragment_indexes_param` set implies executing
2006  // the query for all fragments in the table. In this
2007  // case, populate `fragment_indexes` with all fragment indexes.
2008  for (size_t i = 0; i < outer_fragments.size(); i++) {
2009  fragment_indexes.emplace(i);
2010  }
2011  } else {
2012  fragment_indexes = fragment_indexes_param;
2013  }
2014 
2015  {
2016  auto clock_begin = timer_start();
2017  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2018  kernel_queue_time_ms_ += timer_stop(clock_begin);
2019 
2020  for (auto fragment_index : fragment_indexes) {
2021  // We may want to consider in the future allowing this to execute on devices other
2022  // than CPU
2023  FragmentsList fragments_list{{table_id, {fragment_index}}};
2024  ExecutionKernel kernel(ra_exe_unit,
2025  co.device_type,
2026  /*device_id=*/0,
2027  eo,
2028  column_fetcher,
2029  *query_comp_desc_owned,
2030  *query_mem_desc_owned,
2031  fragments_list,
2033  /*render_info=*/nullptr,
2034  /*rowid_lookup_key=*/-1);
2035  kernel.run(this, 0, kernel_context);
2036  }
2037  }
2038 
2039  const auto& all_fragment_results = kernel_context.getFragmentResults();
2040 
2041  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2042  CHECK_EQ(result_fragment_indexes.size(), 1);
2043  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2044  }
2045 }
2046 
2048  const TableFunctionExecutionUnit exe_unit,
2049  const std::vector<InputTableInfo>& table_infos,
2050  const CompilationOptions& co,
2051  const ExecutionOptions& eo,
2053  INJECT_TIMER(Exec_executeTableFunction);
2054  if (eo.just_validate) {
2056  /*entry_count=*/0,
2058  /*is_table_function=*/true);
2059  query_mem_desc.setOutputColumnar(true);
2060  return std::make_shared<ResultSet>(
2061  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2062  co.device_type,
2063  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2064  this->getRowSetMemoryOwner(),
2065  this->getCatalog(),
2066  this->blockSize(),
2067  this->gridSize());
2068  }
2069 
2070  // Avoid compile functions that set the sizer at runtime if the device is GPU
2071  // This should be fixed in the python script as well to minimize the number of
2072  // QueryMustRunOnCpu exceptions
2075  throw QueryMustRunOnCpu();
2076  }
2077 
2078  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2079  // framework, we may want to move this up a level
2080 
2081  ColumnFetcher column_fetcher(this, column_cache);
2083 
2084  if (exe_unit.table_func.containsPreFlightFn()) {
2085  std::shared_ptr<CompilationContext> compilation_context;
2086  {
2087  Executor::CgenStateManager cgenstate_manager(*this,
2088  false,
2089  table_infos,
2091  nullptr); // locks compilation_mutex
2093  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2094  compilation_context =
2095  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2096  }
2097  exe_context.execute(exe_unit,
2098  table_infos,
2099  compilation_context,
2100  column_fetcher,
2102  this,
2103  true /* is_pre_launch_udtf */);
2104  }
2105  std::shared_ptr<CompilationContext> compilation_context;
2106  {
2107  Executor::CgenStateManager cgenstate_manager(*this,
2108  false,
2109  table_infos,
2111  nullptr); // locks compilation_mutex
2112  TableFunctionCompilationContext tf_compilation_context(this, co);
2113  compilation_context =
2114  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2115  }
2116  return exe_context.execute(exe_unit,
2117  table_infos,
2118  compilation_context,
2119  column_fetcher,
2120  co.device_type,
2121  this,
2122  false /* is_pre_launch_udtf */);
2123 }
2124 
2126  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2127 }
2128 
2130  const RelAlgExecutionUnit& ra_exe_unit,
2131  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2132  TransientDictIdVisitor dict_id_visitor;
2133 
2134  auto visit_expr =
2135  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2136  if (!expr) {
2137  return;
2138  }
2139  const auto dict_id = dict_id_visitor.visit(expr);
2140  if (dict_id >= 0) {
2141  auto sdp = getStringDictionaryProxy(dict_id, row_set_mem_owner, true);
2142  CHECK(sdp);
2143  TransientStringLiteralsVisitor visitor(sdp, this);
2144  visitor.visit(expr);
2145  }
2146  };
2147 
2148  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2149  visit_expr(group_expr.get());
2150  }
2151 
2152  for (const auto& group_expr : ra_exe_unit.quals) {
2153  visit_expr(group_expr.get());
2154  }
2155 
2156  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2157  visit_expr(group_expr.get());
2158  }
2159 
2160  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2161  const auto& target_type = target_expr->get_type_info();
2162  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2163  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2164  if (agg_expr) {
2165  if (agg_expr->get_aggtype() == kSINGLE_VALUE ||
2166  agg_expr->get_aggtype() == kSAMPLE) {
2167  visit_expr(agg_expr->get_arg());
2168  }
2169  } else {
2170  visit_expr(target_expr);
2171  }
2172  }
2173  };
2174  const auto& target_exprs = ra_exe_unit.target_exprs;
2175  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2176  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2177  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2178 }
2179 
2181  const RelAlgExecutionUnit& ra_exe_unit,
2182  const ExecutorDeviceType requested_device_type) {
2183  for (const auto target_expr : ra_exe_unit.target_exprs) {
2184  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2185  if (!ra_exe_unit.groupby_exprs.empty() &&
2186  !isArchPascalOrLater(requested_device_type)) {
2187  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM) &&
2188  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2189  return ExecutorDeviceType::CPU;
2190  }
2191  }
2192  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2193  return ExecutorDeviceType::CPU;
2194  }
2195  }
2196  return requested_device_type;
2197 }
2198 
2199 namespace {
2200 
2201 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2202  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2203  if (ti.is_fp()) {
2204  if (float_argument_input && ti.get_type() == kFLOAT) {
2205  int64_t float_null_val = 0;
2206  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2207  static_cast<float>(inline_fp_null_val(ti));
2208  return float_null_val;
2209  }
2210  const auto double_null_val = inline_fp_null_val(ti);
2211  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2212  }
2213  return inline_int_null_val(ti);
2214 }
2215 
2216 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2217  std::vector<int64_t>& entry,
2218  const std::vector<Analyzer::Expr*>& target_exprs,
2220  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2221  const auto target_expr = target_exprs[target_idx];
2222  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2223  CHECK(agg_info.is_agg);
2224  target_infos.push_back(agg_info);
2225  if (g_cluster) {
2226  const auto executor = query_mem_desc.getExecutor();
2227  CHECK(executor);
2228  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2229  CHECK(row_set_mem_owner);
2230  const auto& count_distinct_desc =
2231  query_mem_desc.getCountDistinctDescriptor(target_idx);
2232  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2233  CHECK(row_set_mem_owner);
2234  auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2235  count_distinct_desc.bitmapPaddedSizeBytes(),
2236  /*thread_idx=*/0); // TODO: can we detect thread idx here?
2237  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2238  continue;
2239  }
2240  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2241  auto count_distinct_set = new CountDistinctSet();
2242  CHECK(row_set_mem_owner);
2243  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2244  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2245  continue;
2246  }
2247  }
2248  const bool float_argument_input = takes_float_argument(agg_info);
2249  if (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
2250  entry.push_back(0);
2251  } else if (agg_info.agg_kind == kAVG) {
2252  entry.push_back(0);
2253  entry.push_back(0);
2254  } else if (agg_info.agg_kind == kSINGLE_VALUE || agg_info.agg_kind == kSAMPLE) {
2255  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2256  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2257  entry.push_back(0);
2258  }
2259  } else if (agg_info.sql_type.is_varlen()) {
2260  entry.push_back(0);
2261  entry.push_back(0);
2262  } else {
2263  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2264  }
2265  } else {
2266  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2267  }
2268  }
2269 }
2270 
2272  const std::vector<Analyzer::Expr*>& target_exprs_in,
2274  const ExecutorDeviceType device_type) {
2275  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2276  std::vector<Analyzer::Expr*> target_exprs;
2277  for (const auto target_expr : target_exprs_in) {
2278  const auto target_expr_copy =
2279  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2280  CHECK(target_expr_copy);
2281  auto ti = target_expr->get_type_info();
2282  ti.set_notnull(false);
2283  target_expr_copy->set_type_info(ti);
2284  if (target_expr_copy->get_arg()) {
2285  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2286  arg_ti.set_notnull(false);
2287  target_expr_copy->get_arg()->set_type_info(arg_ti);
2288  }
2289  target_exprs_owned_copies.push_back(target_expr_copy);
2290  target_exprs.push_back(target_expr_copy.get());
2291  }
2292  std::vector<TargetInfo> target_infos;
2293  std::vector<int64_t> entry;
2294  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2295  const auto executor = query_mem_desc.getExecutor();
2296  CHECK(executor);
2297  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2298  CHECK(row_set_mem_owner);
2299  auto rs = std::make_shared<ResultSet>(target_infos,
2300  device_type,
2302  row_set_mem_owner,
2303  executor->getCatalog(),
2304  executor->blockSize(),
2305  executor->gridSize());
2306  rs->allocateStorage();
2307  rs->fillOneEntry(entry);
2308  return rs;
2309 }
2310 
2311 } // namespace
2312 
2314  SharedKernelContext& shared_context,
2315  const RelAlgExecutionUnit& ra_exe_unit,
2317  const ExecutorDeviceType device_type,
2318  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2319  auto timer = DEBUG_TIMER(__func__);
2320  auto& result_per_device = shared_context.getFragmentResults();
2321  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2324  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2325  }
2326  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2327  try {
2328  return reduceSpeculativeTopN(
2329  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2330  } catch (const std::bad_alloc&) {
2331  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2332  }
2333  }
2334  const auto shard_count =
2335  device_type == ExecutorDeviceType::GPU
2337  : 0;
2338 
2339  if (shard_count && !result_per_device.empty()) {
2340  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
2341  }
2342  return reduceMultiDeviceResults(
2343  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2344 }
2345 
2346 namespace {
2357 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2358  const QueryMemoryDescriptor& input_query_mem_desc,
2359  const ResultSetStorage* output_storage,
2360  size_t output_row_index,
2361  const QueryMemoryDescriptor& output_query_mem_desc,
2362  const std::vector<uint32_t>& top_permutation) {
2363  const auto output_buffer = output_storage->getUnderlyingBuffer();
2364  const auto input_buffer = input_storage->getUnderlyingBuffer();
2365  for (const auto sorted_idx : top_permutation) {
2366  // permuting all group-columns in this result set into the final buffer:
2367  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2368  group_idx++) {
2369  const auto input_column_ptr =
2370  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2371  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2372  const auto output_column_ptr =
2373  output_buffer +
2374  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2375  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2376  memcpy(output_column_ptr,
2377  input_column_ptr,
2378  output_query_mem_desc.groupColWidth(group_idx));
2379  }
2380  // permuting all agg-columns in this result set into the final buffer:
2381  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2382  slot_idx++) {
2383  const auto input_column_ptr =
2384  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2385  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2386  const auto output_column_ptr =
2387  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2388  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2389  memcpy(output_column_ptr,
2390  input_column_ptr,
2391  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2392  }
2393  ++output_row_index;
2394  }
2395  return output_row_index;
2396 }
2397 
2407 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2408  const ResultSetStorage* output_storage,
2409  size_t output_row_index,
2410  const QueryMemoryDescriptor& output_query_mem_desc,
2411  const std::vector<uint32_t>& top_permutation) {
2412  const auto output_buffer = output_storage->getUnderlyingBuffer();
2413  const auto input_buffer = input_storage->getUnderlyingBuffer();
2414  for (const auto sorted_idx : top_permutation) {
2415  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2416  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2417  row_ptr,
2418  output_query_mem_desc.getRowSize());
2419  ++output_row_index;
2420  }
2421  return output_row_index;
2422 }
2423 } // namespace
2424 
2425 // Collect top results from each device, stitch them together and sort. Partial
2426 // results from each device are guaranteed to be disjunct because we only go on
2427 // this path when one of the columns involved is a shard key.
2429  SharedKernelContext& shared_context,
2430  const RelAlgExecutionUnit& ra_exe_unit) const {
2431  auto& result_per_device = shared_context.getFragmentResults();
2432  const auto first_result_set = result_per_device.front().first;
2433  CHECK(first_result_set);
2434  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2435  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2436  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
2437  top_query_mem_desc.setEntryCount(0);
2438  for (auto& result : result_per_device) {
2439  const auto result_set = result.first;
2440  CHECK(result_set);
2441  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
2442  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2443  top_query_mem_desc.setEntryCount(new_entry_cnt);
2444  }
2445  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2446  first_result_set->getDeviceType(),
2447  top_query_mem_desc,
2448  first_result_set->getRowSetMemOwner(),
2449  catalog_,
2450  blockSize(),
2451  gridSize());
2452  auto top_storage = top_result_set->allocateStorage();
2453  size_t top_output_row_idx{0};
2454  for (auto& result : result_per_device) {
2455  const auto result_set = result.first;
2456  CHECK(result_set);
2457  const auto& top_permutation = result_set->getPermutationBuffer();
2458  CHECK_LE(top_permutation.size(), top_n);
2459  if (top_query_mem_desc.didOutputColumnar()) {
2460  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2461  result_set->getQueryMemDesc(),
2462  top_storage,
2463  top_output_row_idx,
2464  top_query_mem_desc,
2465  top_permutation);
2466  } else {
2467  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2468  top_storage,
2469  top_output_row_idx,
2470  top_query_mem_desc,
2471  top_permutation);
2472  }
2473  }
2474  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2475  return top_result_set;
2476 }
2477 
2478 std::unordered_map<int, const Analyzer::BinOper*> Executor::getInnerTabIdToJoinCond()
2479  const {
2480  std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2481  const auto& join_info = plan_state_->join_info_;
2482  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2483  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2484  int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2485  id_to_cond.insert(
2486  std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2487  }
2488  return id_to_cond;
2489 }
2490 
2491 namespace {
2492 
2493 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2494  for (const auto& col : fetched_cols) {
2495  if (col.is_lazily_fetched) {
2496  return true;
2497  }
2498  }
2499  return false;
2500 }
2501 
2502 } // namespace
2503 
2504 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2505  SharedKernelContext& shared_context,
2506  const RelAlgExecutionUnit& ra_exe_unit,
2507  ColumnFetcher& column_fetcher,
2508  const std::vector<InputTableInfo>& table_infos,
2509  const ExecutionOptions& eo,
2510  const bool is_agg,
2511  const bool allow_single_frag_table_opt,
2512  const size_t context_count,
2513  const QueryCompilationDescriptor& query_comp_desc,
2515  RenderInfo* render_info,
2516  std::unordered_set<int>& available_gpus,
2517  int& available_cpus) {
2518  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2519 
2520  QueryFragmentDescriptor fragment_descriptor(
2521  ra_exe_unit,
2522  table_infos,
2523  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2525  : std::vector<Data_Namespace::MemoryInfo>{},
2528  CHECK(!ra_exe_unit.input_descs.empty());
2529 
2530  const auto device_type = query_comp_desc.getDeviceType();
2531  const bool uses_lazy_fetch =
2532  plan_state_->allow_lazy_fetch_ &&
2534  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2535  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2536  const auto device_count = deviceCount(device_type);
2537  CHECK_GT(device_count, 0);
2538 
2539  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2540  shared_context.getFragOffsets(),
2541  device_count,
2542  device_type,
2543  use_multifrag_kernel,
2545  this);
2546  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2547  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2548  }
2549 
2550  if (use_multifrag_kernel) {
2551  VLOG(1) << "Creating multifrag execution kernels";
2552  VLOG(1) << query_mem_desc.toString();
2553 
2554  // NB: We should never be on this path when the query is retried because of running
2555  // out of group by slots; also, for scan only queries on CPU we want the
2556  // high-granularity, fragment by fragment execution instead. For scan only queries on
2557  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2558  // buffer per fragment.
2559  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2560  &execution_kernels,
2561  &column_fetcher,
2562  &eo,
2563  &query_comp_desc,
2564  &query_mem_desc,
2565  render_info](const int device_id,
2566  const FragmentsList& frag_list,
2567  const int64_t rowid_lookup_key) {
2568  execution_kernels.emplace_back(
2569  std::make_unique<ExecutionKernel>(ra_exe_unit,
2571  device_id,
2572  eo,
2573  column_fetcher,
2574  query_comp_desc,
2575  query_mem_desc,
2576  frag_list,
2578  render_info,
2579  rowid_lookup_key));
2580  };
2581  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2582  } else {
2583  VLOG(1) << "Creating one execution kernel per fragment";
2584  VLOG(1) << query_mem_desc.toString();
2585 
2586  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2588  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2589  const auto max_frag_size =
2590  table_infos.front().info.getFragmentNumTuplesUpperBound();
2591  if (max_frag_size < query_mem_desc.getEntryCount()) {
2592  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2593  << " to match max fragment size " << max_frag_size
2594  << " for kernel per fragment execution path.";
2595  throw CompilationRetryNewScanLimit(max_frag_size);
2596  }
2597  }
2598 
2599  size_t frag_list_idx{0};
2600  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2601  &execution_kernels,
2602  &column_fetcher,
2603  &eo,
2604  &frag_list_idx,
2605  &device_type,
2606  &query_comp_desc,
2607  &query_mem_desc,
2608  render_info](const int device_id,
2609  const FragmentsList& frag_list,
2610  const int64_t rowid_lookup_key) {
2611  if (!frag_list.size()) {
2612  return;
2613  }
2614  CHECK_GE(device_id, 0);
2615 
2616  execution_kernels.emplace_back(
2617  std::make_unique<ExecutionKernel>(ra_exe_unit,
2618  device_type,
2619  device_id,
2620  eo,
2621  column_fetcher,
2622  query_comp_desc,
2623  query_mem_desc,
2624  frag_list,
2626  render_info,
2627  rowid_lookup_key));
2628  ++frag_list_idx;
2629  };
2630 
2631  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2632  ra_exe_unit);
2633  }
2634 
2635  return execution_kernels;
2636 }
2637 
2639  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2640  const ExecutorDeviceType device_type) {
2641  auto clock_begin = timer_start();
2642  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2643  kernel_queue_time_ms_ += timer_stop(clock_begin);
2644 
2646  // A hack to have unused unit for results collection.
2647  const RelAlgExecutionUnit* ra_exe_unit =
2648  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
2649 
2650 #ifdef HAVE_TBB
2651  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
2652  shared_context.setThreadPool(&tg);
2653  }
2654  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
2655 #endif // HAVE_TBB
2656 
2657  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
2658  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
2659  size_t kernel_idx = 1;
2660  for (auto& kernel : kernels) {
2661  CHECK(kernel.get());
2662  tg.run([this,
2663  &kernel,
2664  &shared_context,
2665  parent_thread_id = logger::thread_id(),
2666  crt_kernel_idx = kernel_idx++] {
2667  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
2668  const size_t thread_i = crt_kernel_idx % cpu_threads();
2669  kernel->run(this, thread_i, shared_context);
2670  });
2671  }
2672  tg.wait();
2673 
2674  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2675  // The first arg is used for GPU only, it's not our case.
2676  // TODO: add QueryExecutionContext::getRowSet() interface
2677  // for our case.
2678  if (exec_ctx) {
2679  ResultSetPtr results;
2680  if (ra_exe_unit->estimator) {
2681  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2682  } else {
2683  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2684  }
2685  shared_context.addDeviceResults(std::move(results), {});
2686  }
2687  }
2688 }
2689 
2691  const RelAlgExecutionUnit& ra_exe_unit,
2692  const ExecutorDeviceType device_type,
2693  const size_t table_idx,
2694  const size_t outer_frag_idx,
2695  std::map<int, const TableFragments*>& selected_tables_fragments,
2696  const std::unordered_map<int, const Analyzer::BinOper*>&
2697  inner_table_id_to_join_condition) {
2698  const int table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2699  auto table_frags_it = selected_tables_fragments.find(table_id);
2700  CHECK(table_frags_it != selected_tables_fragments.end());
2701  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2702  const auto outer_table_fragments_it =
2703  selected_tables_fragments.find(outer_input_desc.getTableId());
2704  const auto outer_table_fragments = outer_table_fragments_it->second;
2705  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2706  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2707  if (!table_idx) {
2708  return {outer_frag_idx};
2709  }
2710  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2711  auto& inner_frags = table_frags_it->second;
2712  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
2713  std::vector<size_t> all_frag_ids;
2714  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2715  ++inner_frag_idx) {
2716  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2717  if (skipFragmentPair(outer_fragment_info,
2718  inner_frag_info,
2719  table_idx,
2720  inner_table_id_to_join_condition,
2721  ra_exe_unit,
2722  device_type)) {
2723  continue;
2724  }
2725  all_frag_ids.push_back(inner_frag_idx);
2726  }
2727  return all_frag_ids;
2728 }
2729 
2730 // Returns true iff the join between two fragments cannot yield any results, per
2731 // shard information. The pair can be skipped to avoid full broadcast.
2733  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
2734  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
2735  const int table_idx,
2736  const std::unordered_map<int, const Analyzer::BinOper*>&
2737  inner_table_id_to_join_condition,
2738  const RelAlgExecutionUnit& ra_exe_unit,
2739  const ExecutorDeviceType device_type) {
2740  if (device_type != ExecutorDeviceType::GPU) {
2741  return false;
2742  }
2743  CHECK(table_idx >= 0 &&
2744  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
2745  const int inner_table_id = ra_exe_unit.input_descs[table_idx].getTableId();
2746  // Both tables need to be sharded the same way.
2747  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
2748  outer_fragment_info.shard == inner_fragment_info.shard) {
2749  return false;
2750  }
2751  const Analyzer::BinOper* join_condition{nullptr};
2752  if (ra_exe_unit.join_quals.empty()) {
2753  CHECK(!inner_table_id_to_join_condition.empty());
2754  auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2755  CHECK(condition_it != inner_table_id_to_join_condition.end());
2756  join_condition = condition_it->second;
2757  CHECK(join_condition);
2758  } else {
2759  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
2760  plan_state_->join_info_.join_hash_tables_.size());
2761  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2762  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2763  table_idx) {
2764  CHECK(!join_condition);
2765  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
2766  }
2767  }
2768  }
2769  if (!join_condition) {
2770  return false;
2771  }
2772  // TODO(adb): support fragment skipping based on the overlaps operator
2773  if (join_condition->is_overlaps_oper()) {
2774  return false;
2775  }
2776  size_t shard_count{0};
2777  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2778  join_condition->get_left_operand())) {
2779  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
2780  join_condition, *getCatalog(), getTemporaryTables())
2781  .first;
2783  join_condition, this, inner_outer_pairs);
2784  } else {
2785  shard_count = get_shard_count(join_condition, this);
2786  }
2787  if (shard_count && !ra_exe_unit.join_quals.empty()) {
2788  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2789  }
2790  return shard_count;
2791 }
2792 
2793 namespace {
2794 
2797  const int table_id = col_desc->getScanDesc().getTableId();
2798  const int col_id = col_desc->getColId();
2799  return get_column_descriptor_maybe(col_id, table_id, cat);
2800 }
2801 
2802 } // namespace
2803 
2804 std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2805  const std::vector<InputDescriptor>& input_descs,
2806  const std::map<int, const TableFragments*>& all_tables_fragments) {
2807  std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2808  for (auto& desc : input_descs) {
2809  const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2810  CHECK(fragments_it != all_tables_fragments.end());
2811  const auto& fragments = *fragments_it->second;
2812  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2813  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
2814  frag_offsets[i] = off;
2815  off += fragments[i].getNumTuples();
2816  }
2817  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2818  }
2819  return tab_id_to_frag_offsets;
2820 }
2821 
2822 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2824  const RelAlgExecutionUnit& ra_exe_unit,
2825  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2826  const std::vector<InputDescriptor>& input_descs,
2827  const std::map<int, const TableFragments*>& all_tables_fragments) {
2828  std::vector<std::vector<int64_t>> all_num_rows;
2829  std::vector<std::vector<uint64_t>> all_frag_offsets;
2830  const auto tab_id_to_frag_offsets =
2831  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
2832  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2833  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2834  std::vector<int64_t> num_rows;
2835  std::vector<uint64_t> frag_offsets;
2836  if (!ra_exe_unit.union_all) {
2837  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2838  }
2839  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2840  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
2841  const auto fragments_it =
2842  all_tables_fragments.find(input_descs[tab_idx].getTableId());
2843  CHECK(fragments_it != all_tables_fragments.end());
2844  const auto& fragments = *fragments_it->second;
2845  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
2846  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2847  const auto& fragment = fragments[frag_id];
2848  num_rows.push_back(fragment.getNumTuples());
2849  } else {
2850  size_t total_row_count{0};
2851  for (const auto& fragment : fragments) {
2852  total_row_count += fragment.getNumTuples();
2853  }
2854  num_rows.push_back(total_row_count);
2855  }
2856  const auto frag_offsets_it =
2857  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2858  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2859  const auto& offsets = frag_offsets_it->second;
2860  CHECK_LT(frag_id, offsets.size());
2861  frag_offsets.push_back(offsets[frag_id]);
2862  }
2863  all_num_rows.push_back(num_rows);
2864  // Fragment offsets of outer table should be ONLY used by rowid for now.
2865  all_frag_offsets.push_back(frag_offsets);
2866  }
2867  return {all_num_rows, all_frag_offsets};
2868 }
2869 
2870 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
2871 // all the table fragments.
2873  const RelAlgExecutionUnit& ra_exe_unit,
2874  const FragmentsList& selected_fragments) const {
2875  const auto& input_descs = ra_exe_unit.input_descs;
2876  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2877  if (nest_level < 1 ||
2878  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
2879  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
2880  (ra_exe_unit.join_quals.empty() &&
2881  plan_state_->isLazyFetchColumn(inner_col_desc))) {
2882  return false;
2883  }
2884  const int table_id = inner_col_desc.getScanDesc().getTableId();
2885  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2886  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2887  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2888  return fragments.size() > 1;
2889 }
2890 
2892  const ColumnDescriptor* cd,
2893  const InputColDescriptor& inner_col_desc,
2894  const RelAlgExecutionUnit& ra_exe_unit,
2895  const FragmentsList& selected_fragments,
2896  const Data_Namespace::MemoryLevel memory_level) const {
2897  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
2898  const int table_id = inner_col_desc.getScanDesc().getTableId();
2899  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2900  CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2901  const auto& fragments = selected_fragments[nest_level].fragment_ids;
2902  auto need_linearize =
2903  cd->columnType.is_array() ||
2905  return table_id > 0 && need_linearize && fragments.size() > 1;
2906 }
2907 
2908 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
2909  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
2910  << " num_rows" << shared::printContainer(fetch_result.num_rows)
2911  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
2912 }
2913 
2915  const ColumnFetcher& column_fetcher,
2916  const RelAlgExecutionUnit& ra_exe_unit,
2917  const int device_id,
2918  const Data_Namespace::MemoryLevel memory_level,
2919  const std::map<int, const TableFragments*>& all_tables_fragments,
2920  const FragmentsList& selected_fragments,
2922  std::list<ChunkIter>& chunk_iterators,
2923  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2924  DeviceAllocator* device_allocator,
2925  const size_t thread_idx,
2926  const bool allow_runtime_interrupt) {
2927  auto timer = DEBUG_TIMER(__func__);
2929  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2930  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2931  std::vector<size_t> local_col_to_frag_pos;
2932  buildSelectedFragsMapping(selected_fragments_crossjoin,
2933  local_col_to_frag_pos,
2934  col_global_ids,
2935  selected_fragments,
2936  ra_exe_unit);
2937 
2939  selected_fragments_crossjoin);
2940  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2941  std::vector<std::vector<int64_t>> all_num_rows;
2942  std::vector<std::vector<uint64_t>> all_frag_offsets;
2943  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2944  std::vector<const int8_t*> frag_col_buffers(
2945  plan_state_->global_to_local_col_ids_.size());
2946  for (const auto& col_id : col_global_ids) {
2947  if (allow_runtime_interrupt) {
2948  bool isInterrupted = false;
2949  {
2952  const auto query_session = getCurrentQuerySession(session_read_lock);
2953  isInterrupted =
2954  checkIsQuerySessionInterrupted(query_session, session_read_lock);
2955  }
2956  if (isInterrupted) {
2958  }
2959  }
2960  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2962  }
2963  CHECK(col_id);
2964  const int table_id = col_id->getScanDesc().getTableId();
2965  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2966  if (cd && cd->isVirtualCol) {
2967  CHECK_EQ("rowid", cd->columnName);
2968  continue;
2969  }
2970  const auto fragments_it = all_tables_fragments.find(table_id);
2971  CHECK(fragments_it != all_tables_fragments.end());
2972  const auto fragments = fragments_it->second;
2973  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2974  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2975  CHECK_LT(static_cast<size_t>(it->second),
2976  plan_state_->global_to_local_col_ids_.size());
2977  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2978  if (!fragments->size()) {
2979  return {};
2980  }
2981  CHECK_LT(frag_id, fragments->size());
2982  auto memory_level_for_column = memory_level;
2983  auto tbl_col_ids =
2984  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId());
2985  if (plan_state_->columns_to_fetch_.find(tbl_col_ids) ==
2986  plan_state_->columns_to_fetch_.end()) {
2987  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2988  }
2989  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2990  frag_col_buffers[it->second] =
2991  column_fetcher.getResultSetColumn(col_id.get(),
2992  memory_level_for_column,
2993  device_id,
2994  device_allocator,
2995  thread_idx);
2996  } else {
2997  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2998  // determine if we need special treatment to linearlize multi-frag table
2999  // i.e., a column that is classified as varlen type, i.e., array
3000  // for now, we only support fixed-length array that contains
3001  // geo point coordianates but we can support more types in this way
3003  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3004  bool for_lazy_fetch = false;
3005  if (plan_state_->columns_to_not_fetch_.find(tbl_col_ids) !=
3006  plan_state_->columns_to_not_fetch_.end()) {
3007  for_lazy_fetch = true;
3008  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3009  << ", col_name: " << cd->columnName << ")";
3010  }
3011  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3012  table_id,
3013  col_id->getColId(),
3014  all_tables_fragments,
3015  chunks,
3016  chunk_iterators,
3017  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3018  for_lazy_fetch ? 0 : device_id,
3019  device_allocator,
3020  thread_idx);
3021  } else {
3022  frag_col_buffers[it->second] =
3023  column_fetcher.getAllTableColumnFragments(table_id,
3024  col_id->getColId(),
3025  all_tables_fragments,
3026  memory_level_for_column,
3027  device_id,
3028  device_allocator,
3029  thread_idx);
3030  }
3031  } else {
3032  frag_col_buffers[it->second] =
3033  column_fetcher.getOneTableColumnFragment(table_id,
3034  frag_id,
3035  col_id->getColId(),
3036  all_tables_fragments,
3037  chunks,
3038  chunk_iterators,
3039  memory_level_for_column,
3040  device_id,
3041  device_allocator);
3042  }
3043  }
3044  }
3045  all_frag_col_buffers.push_back(frag_col_buffers);
3046  }
3047  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3048  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3049  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3050 }
3051 
3052 namespace {
3053 size_t get_selected_input_descs_index(int const table_id,
3054  std::vector<InputDescriptor> const& input_descs) {
3055  auto const has_table_id = [table_id](InputDescriptor const& input_desc) {
3056  return table_id == input_desc.getTableId();
3057  };
3058  return std::find_if(input_descs.begin(), input_descs.end(), has_table_id) -
3059  input_descs.begin();
3060 }
3061 
3063  int const table_id,
3064  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3065  auto const has_table_id = [table_id](auto const& input_desc) {
3066  return table_id == input_desc->getScanDesc().getTableId();
3067  };
3068  return std::distance(
3069  input_col_descs.begin(),
3070  std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_id));
3071 }
3072 
3073 std::list<std::shared_ptr<const InputColDescriptor>> get_selected_input_col_descs(
3074  int const table_id,
3075  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3076  std::list<std::shared_ptr<const InputColDescriptor>> selected;
3077  for (auto const& input_col_desc : input_col_descs) {
3078  if (table_id == input_col_desc->getScanDesc().getTableId()) {
3079  selected.push_back(input_col_desc);
3080  }
3081  }
3082  return selected;
3083 }
3084 
3085 // Set N consecutive elements of frag_col_buffers to ptr in the range of local_col_id.
3086 void set_mod_range(std::vector<int8_t const*>& frag_col_buffers,
3087  int8_t const* const ptr,
3088  size_t const local_col_id,
3089  size_t const N) {
3090  size_t const begin = local_col_id - local_col_id % N; // N divides begin
3091  size_t const end = begin + N;
3092  CHECK_LE(end, frag_col_buffers.size()) << (void*)ptr << ' ' << local_col_id << ' ' << N;
3093  for (size_t i = begin; i < end; ++i) {
3094  frag_col_buffers[i] = ptr;
3095  }
3096 }
3097 } // namespace
3098 
3099 // fetchChunks() assumes that multiple inputs implies a JOIN.
3100 // fetchUnionChunks() assumes that multiple inputs implies a UNION ALL.
3102  const ColumnFetcher& column_fetcher,
3103  const RelAlgExecutionUnit& ra_exe_unit,
3104  const int device_id,
3105  const Data_Namespace::MemoryLevel memory_level,
3106  const std::map<int, const TableFragments*>& all_tables_fragments,
3107  const FragmentsList& selected_fragments,
3109  std::list<ChunkIter>& chunk_iterators,
3110  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3111  DeviceAllocator* device_allocator,
3112  const size_t thread_idx,
3113  const bool allow_runtime_interrupt) {
3114  auto timer = DEBUG_TIMER(__func__);
3116 
3117  CHECK_EQ(1u, selected_fragments.size());
3118  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3119  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3120  auto const& input_descs = ra_exe_unit.input_descs;
3121  using TableId = int;
3122  TableId const selected_table_id = selected_fragments.front().table_id;
3123  size_t const input_descs_index =
3124  get_selected_input_descs_index(selected_table_id, input_descs);
3125  CHECK_LT(input_descs_index, input_descs.size());
3126  size_t const input_col_descs_index =
3127  get_selected_input_col_descs_index(selected_table_id, ra_exe_unit.input_col_descs);
3128  CHECK_LT(input_col_descs_index, ra_exe_unit.input_col_descs.size());
3129  VLOG(2) << "selected_table_id=" << selected_table_id
3130  << " input_descs_index=" << input_descs_index
3131  << " input_col_descs_index=" << input_col_descs_index
3132  << " input_descs=" << shared::printContainer(input_descs)
3133  << " ra_exe_unit.input_col_descs="
3134  << shared::printContainer(ra_exe_unit.input_col_descs);
3135 
3136  std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3137  get_selected_input_col_descs(selected_table_id, ra_exe_unit.input_col_descs);
3138  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3139 
3141  selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3142 
3144  selected_fragments_crossjoin);
3145 
3146  if (allow_runtime_interrupt) {
3147  bool isInterrupted = false;
3148  {
3151  const auto query_session = getCurrentQuerySession(session_read_lock);
3152  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3153  }
3154  if (isInterrupted) {
3156  }
3157  }
3158  std::vector<const int8_t*> frag_col_buffers(
3159  plan_state_->global_to_local_col_ids_.size());
3160  for (const auto& col_id : selected_input_col_descs) {
3161  CHECK(col_id);
3162  const auto cd = try_get_column_descriptor(col_id.get(), cat);
3163  if (cd && cd->isVirtualCol) {
3164  CHECK_EQ("rowid", cd->columnName);
3165  continue;
3166  }
3167  const auto fragments_it = all_tables_fragments.find(selected_table_id);
3168  CHECK(fragments_it != all_tables_fragments.end());
3169  const auto fragments = fragments_it->second;
3170  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3171  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3172  size_t const local_col_id = it->second;
3173  CHECK_LT(local_col_id, plan_state_->global_to_local_col_ids_.size());
3174  constexpr size_t frag_id = 0;
3175  if (fragments->empty()) {
3176  return {};
3177  }
3178  MemoryLevel const memory_level_for_column =
3179  plan_state_->columns_to_fetch_.count({selected_table_id, col_id->getColId()})
3180  ? memory_level
3182  int8_t const* ptr;
3183  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3184  ptr = column_fetcher.getResultSetColumn(
3185  col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3186  } else if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3187  ptr = column_fetcher.getAllTableColumnFragments(selected_table_id,
3188  col_id->getColId(),
3189  all_tables_fragments,
3190  memory_level_for_column,
3191  device_id,
3192  device_allocator,
3193  thread_idx);
3194  } else {
3195  ptr = column_fetcher.getOneTableColumnFragment(selected_table_id,
3196  frag_id,
3197  col_id->getColId(),
3198  all_tables_fragments,
3199  chunks,
3200  chunk_iterators,
3201  memory_level_for_column,
3202  device_id,
3203  device_allocator);
3204  }
3205  // Set frag_col_buffers[i]=ptr for i in mod input_descs.size() range of local_col_id.
3206  set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3207  }
3208  auto const [num_rows, frag_offsets] = getRowCountAndOffsetForAllFrags(
3209  ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3210 
3211  VLOG(2) << "frag_col_buffers=" << shared::printContainer(frag_col_buffers)
3212  << " num_rows=" << shared::printContainer(num_rows)
3213  << " frag_offsets=" << shared::printContainer(frag_offsets)
3214  << " input_descs_index=" << input_descs_index
3215  << " input_col_descs_index=" << input_col_descs_index;
3216  return {{std::move(frag_col_buffers)},
3217  {{num_rows[0][input_descs_index]}},
3218  {{frag_offsets[0][input_descs_index]}}};
3219 }
3220 
3221 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3222  const size_t scan_idx,
3223  const RelAlgExecutionUnit& ra_exe_unit) {
3224  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3225  scan_idx > 0 &&
3226  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3227  !selected_fragments[scan_idx].fragment_ids.empty()) {
3228  // Fetch all fragments
3229  return {size_t(0)};
3230  }
3231 
3232  return selected_fragments[scan_idx].fragment_ids;
3233 }
3234 
3236  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3237  std::vector<size_t>& local_col_to_frag_pos,
3238  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3239  const FragmentsList& selected_fragments,
3240  const RelAlgExecutionUnit& ra_exe_unit) {
3241  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3242  size_t frag_pos{0};
3243  const auto& input_descs = ra_exe_unit.input_descs;
3244  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3245  const int table_id = input_descs[scan_idx].getTableId();
3246  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
3247  selected_fragments_crossjoin.push_back(
3248  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3249  for (const auto& col_id : col_global_ids) {
3250  CHECK(col_id);
3251  const auto& input_desc = col_id->getScanDesc();
3252  if (input_desc.getTableId() != table_id ||
3253  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3254  continue;
3255  }
3256  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3257  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3258  CHECK_LT(static_cast<size_t>(it->second),
3259  plan_state_->global_to_local_col_ids_.size());
3260  local_col_to_frag_pos[it->second] = frag_pos;
3261  }
3262  ++frag_pos;
3263  }
3264 }
3265 
3267  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3268  const FragmentsList& selected_fragments,
3269  const RelAlgExecutionUnit& ra_exe_unit) {
3270  const auto& input_descs = ra_exe_unit.input_descs;
3271  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3272  // selected_fragments is set in assignFragsToKernelDispatch execution_kernel.fragments
3273  if (selected_fragments[0].table_id == input_descs[scan_idx].getTableId()) {
3274  selected_fragments_crossjoin.push_back({size_t(1)});
3275  }
3276  }
3277 }
3278 
3279 namespace {
3280 
3282  public:
3283  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3285  for (auto out : out_vec_) {
3286  delete[] out;
3287  }
3288  }
3289 
3290  private:
3291  std::vector<int64_t*> out_vec_;
3292 };
3293 } // namespace
3294 
3296  const RelAlgExecutionUnit& ra_exe_unit,
3297  const CompilationResult& compilation_result,
3298  const bool hoist_literals,
3299  ResultSetPtr* results,
3300  const std::vector<Analyzer::Expr*>& target_exprs,
3301  const ExecutorDeviceType device_type,
3302  std::vector<std::vector<const int8_t*>>& col_buffers,
3303  QueryExecutionContext* query_exe_context,
3304  const std::vector<std::vector<int64_t>>& num_rows,
3305  const std::vector<std::vector<uint64_t>>& frag_offsets,
3306  Data_Namespace::DataMgr* data_mgr,
3307  const int device_id,
3308  const uint32_t start_rowid,
3309  const uint32_t num_tables,
3310  const bool allow_runtime_interrupt,
3311  RenderInfo* render_info,
3312  const int64_t rows_to_process) {
3314  auto timer = DEBUG_TIMER(__func__);
3315  CHECK(!results || !(*results));
3316  if (col_buffers.empty()) {
3317  return 0;
3318  }
3319 
3320  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3321  if (render_info) {
3322  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3323  // here, we are in non-insitu mode.
3324  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
3325  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3326  "currently unsupported.";
3327  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3328  }
3329 
3330  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3331  std::vector<int64_t*> out_vec;
3332  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3333  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3334  std::unique_ptr<OutVecOwner> output_memory_scope;
3335  if (allow_runtime_interrupt) {
3336  bool isInterrupted = false;
3337  {
3340  const auto query_session = getCurrentQuerySession(session_read_lock);
3341  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3342  }
3343  if (isInterrupted) {
3345  }
3346  }
3347  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3349  }
3350  if (device_type == ExecutorDeviceType::CPU) {
3351  CpuCompilationContext* cpu_generated_code =
3352  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3353  CHECK(cpu_generated_code);
3354  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3355  cpu_generated_code,
3356  hoist_literals,
3357  hoist_buf,
3358  col_buffers,
3359  num_rows,
3360  frag_offsets,
3361  0,
3362  &error_code,
3363  num_tables,
3364  join_hash_table_ptrs,
3365  rows_to_process);
3366  output_memory_scope.reset(new OutVecOwner(out_vec));
3367  } else {
3368  GpuCompilationContext* gpu_generated_code =
3369  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3370  CHECK(gpu_generated_code);
3371  try {
3372  out_vec = query_exe_context->launchGpuCode(
3373  ra_exe_unit,
3374  gpu_generated_code,
3375  hoist_literals,
3376  hoist_buf,
3377  col_buffers,
3378  num_rows,
3379  frag_offsets,
3380  0,
3381  data_mgr,
3382  blockSize(),
3383  gridSize(),
3384  device_id,
3385  compilation_result.gpu_smem_context.getSharedMemorySize(),
3386  &error_code,
3387  num_tables,
3388  allow_runtime_interrupt,
3389  join_hash_table_ptrs,
3390  render_allocator_map_ptr);
3391  output_memory_scope.reset(new OutVecOwner(out_vec));
3392  } catch (const OutOfMemory&) {
3393  return ERR_OUT_OF_GPU_MEM;
3394  } catch (const std::exception& e) {
3395  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3396  }
3397  }
3398  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3399  error_code == Executor::ERR_DIV_BY_ZERO ||
3400  error_code == Executor::ERR_OUT_OF_TIME ||
3401  error_code == Executor::ERR_INTERRUPTED ||
3403  error_code == Executor::ERR_GEOS ||
3405  return error_code;
3406  }
3407  if (ra_exe_unit.estimator) {
3408  CHECK(!error_code);
3409  if (results) {
3410  *results =
3411  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3412  }
3413  return 0;
3414  }
3415  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3416  CHECK(results);
3417  std::vector<int64_t> reduced_outs;
3418  const auto num_frags = col_buffers.size();
3419  const size_t entry_count =
3420  device_type == ExecutorDeviceType::GPU
3421  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3422  ? 1
3423  : blockSize() * gridSize() * num_frags)
3424  : num_frags;
3425  if (size_t(1) == entry_count) {
3426  for (auto out : out_vec) {
3427  CHECK(out);
3428  reduced_outs.push_back(*out);
3429  }
3430  } else {
3431  size_t out_vec_idx = 0;
3432 
3433  for (const auto target_expr : target_exprs) {
3434  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3435  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3436  << target_expr->toString();
3437 
3438  const int num_iterations = agg_info.sql_type.is_geometry()
3439  ? agg_info.sql_type.get_physical_coord_cols()
3440  : 1;
3441 
3442  for (int i = 0; i < num_iterations; i++) {
3443  int64_t val1;
3444  const bool float_argument_input = takes_float_argument(agg_info);
3445  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_QUANTILE) {
3446  CHECK(agg_info.agg_kind == kCOUNT ||
3447  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
3448  agg_info.agg_kind == kAPPROX_QUANTILE);
3449  val1 = out_vec[out_vec_idx][0];
3450  error_code = 0;
3451  } else {
3452  const auto chosen_bytes = static_cast<size_t>(
3453  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3454  std::tie(val1, error_code) = Executor::reduceResults(
3455  agg_info.agg_kind,
3456  agg_info.sql_type,
3457  query_exe_context->getAggInitValForIndex(out_vec_idx),
3458  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3459  out_vec[out_vec_idx],
3460  entry_count,
3461  false,
3462  float_argument_input);
3463  }
3464  if (error_code) {
3465  break;
3466  }
3467  reduced_outs.push_back(val1);
3468  if (agg_info.agg_kind == kAVG ||
3469  (agg_info.agg_kind == kSAMPLE &&
3470  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3471  const auto chosen_bytes = static_cast<size_t>(
3472  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3473  1));
3474  int64_t val2;
3475  std::tie(val2, error_code) = Executor::reduceResults(
3476  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3477  agg_info.sql_type,
3478  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3479  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3480  out_vec[out_vec_idx + 1],
3481  entry_count,
3482  false,
3483  false);
3484  if (error_code) {
3485  break;
3486  }
3487  reduced_outs.push_back(val2);
3488  ++out_vec_idx;
3489  }
3490  ++out_vec_idx;
3491  }
3492  }
3493  }
3494 
3495  if (error_code) {
3496  return error_code;
3497  }
3498 
3499  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3500  auto rows_ptr = std::shared_ptr<ResultSet>(
3501  query_exe_context->query_buffers_->result_sets_[0].release());
3502  rows_ptr->fillOneEntry(reduced_outs);
3503  *results = std::move(rows_ptr);
3504  return error_code;
3505 }
3506 
3507 namespace {
3508 
3509 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
3510  CHECK(scan_limit);
3511  return results && results->rowCount() < scan_limit;
3512 }
3513 
3514 } // namespace
3515 
3517  const RelAlgExecutionUnit& ra_exe_unit,
3518  const CompilationResult& compilation_result,
3519  const bool hoist_literals,
3520  ResultSetPtr* results,
3521  const ExecutorDeviceType device_type,
3522  std::vector<std::vector<const int8_t*>>& col_buffers,
3523  const std::vector<size_t> outer_tab_frag_ids,
3524  QueryExecutionContext* query_exe_context,
3525  const std::vector<std::vector<int64_t>>& num_rows,
3526  const std::vector<std::vector<uint64_t>>& frag_offsets,
3527  Data_Namespace::DataMgr* data_mgr,
3528  const int device_id,
3529  const int outer_table_id,
3530  const int64_t scan_limit,
3531  const uint32_t start_rowid,
3532  const uint32_t num_tables,
3533  const bool allow_runtime_interrupt,
3534  RenderInfo* render_info,
3535  const int64_t rows_to_process) {
3536  auto timer = DEBUG_TIMER(__func__);
3538  // TODO: get results via a separate method, but need to do something with literals.
3539  CHECK(!results || !(*results));
3540  if (col_buffers.empty()) {
3541  return 0;
3542  }
3543  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3544  // TODO(alex):
3545  // 1. Optimize size (make keys more compact).
3546  // 2. Resize on overflow.
3547  // 3. Optimize runtime.
3548  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3549  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3550  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3551  if (allow_runtime_interrupt) {
3552  bool isInterrupted = false;
3553  {
3556  const auto query_session = getCurrentQuerySession(session_read_lock);
3557  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3558  }
3559  if (isInterrupted) {
3561  }
3562  }
3563  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3564  return ERR_INTERRUPTED;
3565  }
3566 
3567  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3568  if (render_info && render_info->useCudaBuffers()) {
3569  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3570  }
3571 
3572  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3573  << " ra_exe_unit.input_descs="
3574  << shared::printContainer(ra_exe_unit.input_descs)
3575  << " ra_exe_unit.input_col_descs="
3576  << shared::printContainer(ra_exe_unit.input_col_descs)
3577  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3578  << " num_rows=" << shared::printContainer(num_rows)
3579  << " frag_offsets=" << shared::printContainer(frag_offsets)
3580  << " query_exe_context->query_buffers_->num_rows_="
3581  << query_exe_context->query_buffers_->num_rows_
3582  << " query_exe_context->query_mem_desc_.getEntryCount()="
3583  << query_exe_context->query_mem_desc_.getEntryCount()
3584  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3585  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3586  << " num_tables=" << num_tables;
3587 
3588  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3589  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3590  // with outer_table_id.
3591  if (ra_exe_unit_copy.union_all) {
3592  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3593  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3594  ra_exe_unit_copy.input_descs.end(),
3595  [outer_table_id](auto const& a, auto const& b) {
3596  return a.getTableId() == outer_table_id &&
3597  b.getTableId() != outer_table_id;
3598  });
3599  while (!ra_exe_unit_copy.input_descs.empty() &&
3600  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3601  ra_exe_unit_copy.input_descs.pop_back();
3602  }
3603  // Filter ra_exe_unit_copy.input_col_descs.
3604  ra_exe_unit_copy.input_col_descs.remove_if(
3605  [outer_table_id](auto const& input_col_desc) {
3606  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3607  });
3608  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3609  }
3610 
3611  if (device_type == ExecutorDeviceType::CPU) {
3612  const int32_t scan_limit_for_query =
3613  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3614  const int32_t max_matched = scan_limit_for_query == 0
3615  ? query_exe_context->query_mem_desc_.getEntryCount()
3616  : scan_limit_for_query;
3617  CpuCompilationContext* cpu_generated_code =
3618  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3619  CHECK(cpu_generated_code);
3620  query_exe_context->launchCpuCode(ra_exe_unit_copy,
3621  cpu_generated_code,
3622  hoist_literals,
3623  hoist_buf,
3624  col_buffers,
3625  num_rows,
3626  frag_offsets,
3627  max_matched,
3628  &error_code,
3629  num_tables,
3630  join_hash_table_ptrs,
3631  rows_to_process);
3632  } else {
3633  try {
3634  GpuCompilationContext* gpu_generated_code =
3635  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3636  CHECK(gpu_generated_code);
3637  query_exe_context->launchGpuCode(
3638  ra_exe_unit_copy,
3639  gpu_generated_code,
3640  hoist_literals,
3641  hoist_buf,
3642  col_buffers,
3643  num_rows,
3644  frag_offsets,
3645  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3646  data_mgr,
3647  blockSize(),
3648  gridSize(),
3649  device_id,
3650  compilation_result.gpu_smem_context.getSharedMemorySize(),
3651  &error_code,
3652  num_tables,
3653  allow_runtime_interrupt,
3654  join_hash_table_ptrs,
3655  render_allocator_map_ptr);
3656  } catch (const OutOfMemory&) {
3657  return ERR_OUT_OF_GPU_MEM;
3658  } catch (const OutOfRenderMemory&) {
3659  return ERR_OUT_OF_RENDER_MEM;
3660  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3662  } catch (const std::exception& e) {
3663  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3664  }
3665  }
3666 
3667  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3668  error_code == Executor::ERR_DIV_BY_ZERO ||
3669  error_code == Executor::ERR_OUT_OF_TIME ||
3670  error_code == Executor::ERR_INTERRUPTED ||
3672  error_code == Executor::ERR_GEOS ||
3674  return error_code;
3675  }
3676 
3677  if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3678  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3679  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
3680  query_exe_context->query_mem_desc_);
3681  CHECK(*results);
3682  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
3683  (*results)->holdLiterals(hoist_buf);
3684  }
3685  if (error_code < 0 && render_allocator_map_ptr) {
3686  auto const adjusted_scan_limit =
3687  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3688  // More rows passed the filter than available slots. We don't have a count to check,
3689  // so assume we met the limit if a scan limit is set
3690  if (adjusted_scan_limit != 0) {
3691  return 0;
3692  } else {
3693  return error_code;
3694  }
3695  }
3696  if (results && error_code &&
3697  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
3698  return error_code; // unlucky, not enough results and we ran out of slots
3699  }
3700 
3701  return 0;
3702 }
3703 
3704 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
3705  const int device_id) {
3706  std::vector<int8_t*> table_ptrs;
3707  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
3708  for (auto hash_table : join_hash_tables) {
3709  if (!hash_table) {
3710  CHECK(table_ptrs.empty());
3711  return {};
3712  }
3713  table_ptrs.push_back(hash_table->getJoinHashBuffer(
3714  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
3715  }
3716  return table_ptrs;
3717 }
3718 
3719 void Executor::nukeOldState(const bool allow_lazy_fetch,
3720  const std::vector<InputTableInfo>& query_infos,
3721  const PlanState::DeletedColumnsMap& deleted_cols_map,
3722  const RelAlgExecutionUnit* ra_exe_unit) {
3725  const bool contains_left_deep_outer_join =
3726  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
3727  ra_exe_unit->join_quals.end(),
3728  [](const JoinCondition& join_condition) {
3729  return join_condition.type == JoinType::LEFT;
3730  }) != ra_exe_unit->join_quals.end();
3731  cgen_state_.reset(
3732  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
3733  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
3734  query_infos,
3735  deleted_cols_map,
3736  this));
3737 }
3738 
3739 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
3740  const std::vector<InputTableInfo>& query_infos) {
3742  const auto ld_count = input_descs.size();
3743  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
3744  for (size_t i = 0; i < ld_count; ++i) {
3745  CHECK_LT(i, query_infos.size());
3746  const auto frag_count = query_infos[i].info.fragments.size();
3747  if (i > 0) {
3748  cgen_state_->frag_offsets_.push_back(nullptr);
3749  } else {
3750  if (frag_count > 1) {
3751  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
3752  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3753  } else {
3754  cgen_state_->frag_offsets_.push_back(nullptr);
3755  }
3756  }
3757  }
3758 }
3759 
3761  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3762  const std::vector<InputTableInfo>& query_infos,
3763  const MemoryLevel memory_level,
3764  const JoinType join_type,
3765  const HashType preferred_hash_type,
3766  ColumnCacheMap& column_cache,
3767  const HashTableBuildDagMap& hashtable_build_dag_map,
3768  const RegisteredQueryHint& query_hint,
3769  const TableIdToNodeMap& table_id_to_node_map) {
3770  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3771  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3772  }
3773  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3775  }
3776  try {
3777  auto tbl = HashJoin::getInstance(qual_bin_oper,
3778  query_infos,
3779  memory_level,
3780  join_type,
3781  preferred_hash_type,
3782  deviceCountForMemoryLevel(memory_level),
3783  column_cache,
3784  this,
3785  hashtable_build_dag_map,
3786  query_hint,
3787  table_id_to_node_map);
3788  return {tbl, ""};
3789  } catch (const HashJoinFail& e) {
3790  return {nullptr, e.what()};
3791  }
3792 }
3793 
3794 int8_t Executor::warpSize() const {
3795  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3796  CHECK(!dev_props.empty());
3797  return dev_props.front().warpSize;
3798 }
3799 
3800 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
3801 // exist?
3802 unsigned Executor::gridSize() const {
3803  CHECK(data_mgr_);
3804  const auto cuda_mgr = data_mgr_->getCudaMgr();
3805  if (!cuda_mgr) {
3806  return 0;
3807  }
3808  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
3809 }
3810 
3811 unsigned Executor::numBlocksPerMP() const {
3812  return grid_size_x_ ? std::ceil(grid_size_x_ / cudaMgr()->getMinNumMPsForAllDevices())
3813  : 2;
3814 }
3815 
3816 unsigned Executor::blockSize() const {
3817  CHECK(data_mgr_);
3818  const auto cuda_mgr = data_mgr_->getCudaMgr();
3819  if (!cuda_mgr) {
3820  return 0;
3821  }
3822  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3823  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3824 }
3825 
3827  return max_gpu_slab_size_;
3828 }
3829 
3830 int64_t Executor::deviceCycles(int milliseconds) const {
3831  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
3832  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3833 }
3834 
3835 llvm::Value* Executor::castToFP(llvm::Value* value,
3836  SQLTypeInfo const& from_ti,
3837  SQLTypeInfo const& to_ti) {
3839  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3840  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3841  llvm::Type* fp_type{nullptr};
3842  switch (to_ti.get_size()) {
3843  case 4:
3844  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3845  break;
3846  case 8:
3847  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3848  break;
3849  default:
3850  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3851  }
3852  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3853  if (from_ti.get_scale()) {
3854  value = cgen_state_->ir_builder_.CreateFDiv(
3855  value,
3856  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3857  }
3858  }
3859  return value;
3860 }
3861 
3862 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
3864  CHECK(val->getType()->isPointerTy());
3865 
3866  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3867  const auto val_type = val_ptr_type->getElementType();
3868  size_t val_width = 0;
3869  if (val_type->isIntegerTy()) {
3870  val_width = val_type->getIntegerBitWidth();
3871  } else {
3872  if (val_type->isFloatTy()) {
3873  val_width = 32;
3874  } else {
3875  CHECK(val_type->isDoubleTy());
3876  val_width = 64;
3877  }
3878  }
3879  CHECK_LT(size_t(0), val_width);
3880  if (bitWidth == val_width) {
3881  return val;
3882  }
3883  return cgen_state_->ir_builder_.CreateBitCast(
3884  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3885 }
3886 
3887 #define EXECUTE_INCLUDE
3888 #include "ArrayOps.cpp"
3889 #include "DateAdd.cpp"
3890 #include "GeoOps.cpp"
3891 #include "StringFunctions.cpp"
3893 #undef EXECUTE_INCLUDE
3894 
3895 namespace {
3897  const ColumnDescriptor* deleted_cd) {
3898  auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3899  if (deleted_cols_it == deleted_cols_map.end()) {
3900  CHECK(
3901  deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd)).second);
3902  } else {
3903  CHECK_EQ(deleted_cd, deleted_cols_it->second);
3904  }
3905 }
3906 } // namespace
3907 
3908 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
3909  const RelAlgExecutionUnit& ra_exe_unit,
3910  const CompilationOptions& co) {
3911  if (!co.filter_on_deleted_column) {
3912  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3913  }
3914  auto ra_exe_unit_with_deleted = ra_exe_unit;
3915  PlanState::DeletedColumnsMap deleted_cols_map;
3916  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3917  if (input_table.getSourceType() != InputSourceType::TABLE) {
3918  continue;
3919  }
3920  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3921  CHECK(td);
3922  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3923  if (!deleted_cd) {
3924  continue;
3925  }
3926  CHECK(deleted_cd->columnType.is_boolean());
3927  // check deleted column is not already present
3928  bool found = false;
3929  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3930  if (input_col.get()->getColId() == deleted_cd->columnId &&
3931  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3932  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3933  found = true;
3934  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3935  break;
3936  }
3937  }
3938  if (!found) {
3939  // add deleted column
3940  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3941  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3942  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3943  }
3944  }
3945  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3946 }
3947 
3948 namespace {
3949 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
3950 // scaled epoch value and `false` for overflow/underflow values as the first argument of
3951 // return type.
3952 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
3953  const int64_t chunk_min,
3954  const int64_t chunk_max,
3955  const SQLTypeInfo& lhs_type,
3956  const SQLTypeInfo& rhs_type) {
3957  const int32_t ldim = lhs_type.get_dimension();
3958  const int32_t rdim = rhs_type.get_dimension();
3959  CHECK(ldim != rdim);
3960  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
3961  if (ldim > rdim) {
3962  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
3963  return {true, chunk_min / scale, chunk_max / scale};
3964  }
3965 
3966  using checked_int64_t = boost::multiprecision::number<
3967  boost::multiprecision::cpp_int_backend<64,
3968  64,
3969  boost::multiprecision::signed_magnitude,
3970  boost::multiprecision::checked,
3971  void>>;
3972 
3973  try {
3974  auto ret =
3975  std::make_tuple(true,
3976  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
3977  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
3978  return ret;
3979  } catch (const std::overflow_error& e) {
3980  // noop
3981  }
3982  return std::make_tuple(false, chunk_min, chunk_max);
3983 }
3984 
3985 } // namespace
3986 
3988  const int table_id,
3989  const Fragmenter_Namespace::FragmentInfo& fragment) {
3990  // Skip temporary tables
3991  if (table_id < 0) {
3992  return false;
3993  }
3994 
3995  const auto td = catalog_->getMetadataForTable(fragment.physicalTableId);
3996  CHECK(td);
3997  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3998  if (!deleted_cd) {
3999  return false;
4000  }
4001 
4002  const auto& chunk_type = deleted_cd->columnType;
4003  CHECK(chunk_type.is_boolean());
4004 
4005  const auto deleted_col_id = deleted_cd->columnId;
4006  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4007  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4008  const int64_t chunk_min =
4009  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4010  const int64_t chunk_max =
4011  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4012  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4013  // is true (signifying all rows deleted)
4014  return true;
4015  }
4016  }
4017  return false;
4018 }
4019 
4021  const Analyzer::BinOper* comp_expr,
4022  const Analyzer::ColumnVar* lhs_col,
4023  const Fragmenter_Namespace::FragmentInfo& fragment,
4024  const Analyzer::Constant* rhs_const) const {
4025  const int col_id = lhs_col->get_column_id();
4026  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4027  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4029  }
4030  double chunk_min{0.};
4031  double chunk_max{0.};
4032  const auto& chunk_type = lhs_col->get_type_info();
4033  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4034  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4035  if (chunk_min > chunk_max) {
4037  }
4038 
4039  const auto datum_fp = rhs_const->get_constval();
4040  const auto rhs_type = rhs_const->get_type_info().get_type();
4041  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4042 
4043  // Do we need to codegen the constant like the integer path does?
4044  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4045 
4046  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4047  // slightly tricky due to do cleanly as we do not have rowid on this path
4048  switch (comp_expr->get_optype()) {
4049  case kGE:
4050  if (chunk_max < rhs_val) {
4052  }
4053  break;
4054  case kGT:
4055  if (chunk_max <= rhs_val) {
4057  }
4058  break;
4059  case kLE:
4060  if (chunk_min > rhs_val) {
4062  }
4063  break;
4064  case kLT:
4065  if (chunk_min >= rhs_val) {
4067  }
4068  break;
4069  case kEQ:
4070  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4072  }
4073  break;
4074  default:
4075  break;
4076  }
4078 }
4079 
4080 std::pair<bool, int64_t> Executor::skipFragment(
4081  const InputDescriptor& table_desc,
4082  const Fragmenter_Namespace::FragmentInfo& fragment,
4083  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4084  const std::vector<uint64_t>& frag_offsets,
4085  const size_t frag_idx) {
4086  const int table_id = table_desc.getTableId();
4087 
4088  // First check to see if all of fragment is deleted, in which case we know we can skip
4089  if (isFragmentFullyDeleted(table_id, fragment)) {
4090  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4091  << ", fragment id: " << frag_idx;
4092  return {true, -1};
4093  }
4094 
4095  for (const auto& simple_qual : simple_quals) {
4096  const auto comp_expr =
4097  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4098  if (!comp_expr) {
4099  // is this possible?
4100  return {false, -1};
4101  }
4102  const auto lhs = comp_expr->get_left_operand();
4103  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4104  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4105  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4106  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4107  if (lhs_uexpr) {
4108  CHECK(lhs_uexpr->get_optype() ==
4109  kCAST); // We should have only been passed a cast expression
4110  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4111  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4112  continue;
4113  }
4114  } else {
4115  continue;
4116  }
4117  }
4118  const auto rhs = comp_expr->get_right_operand();
4119  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4120  if (!rhs_const) {
4121  // is this possible?
4122  return {false, -1};
4123  }
4124  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4125  !lhs->get_type_info().is_fp()) {
4126  continue;
4127  }
4128 
4129  if (lhs->get_type_info().is_fp()) {
4130  const auto fragment_skip_status =
4131  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4132  switch (fragment_skip_status) {
4134  return {true, -1};
4136  return {false, -1};
4138  continue;
4139  default:
4140  UNREACHABLE();
4141  }
4142  }
4143 
4144  // Everything below is logic for integer and integer-backed timestamps
4145  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4146 
4147  const int col_id = lhs_col->get_column_id();
4148  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4149  int64_t chunk_min{0};
4150  int64_t chunk_max{0};
4151  bool is_rowid{false};
4152  size_t start_rowid{0};
4153  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4154  auto cd = get_column_descriptor(col_id, table_id, *catalog_);
4155  if (cd->isVirtualCol) {
4156  CHECK(cd->columnName == "rowid");
4157  const auto& table_generation = getTableGeneration(table_id);
4158  start_rowid = table_generation.start_rowid;
4159  chunk_min = frag_offsets[frag_idx] + start_rowid;
4160  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4161  is_rowid = true;
4162  }
4163  } else {
4164  const auto& chunk_type = lhs_col->get_type_info();
4165  chunk_min =
4166  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4167  chunk_max =
4168  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4169  }
4170  if (chunk_min > chunk_max) {
4171  // invalid metadata range, do not skip fragment
4172  return {false, -1};
4173  }
4174  if (lhs->get_type_info().is_timestamp() &&
4175  (lhs_col->get_type_info().get_dimension() !=
4176  rhs_const->get_type_info().get_dimension()) &&
4177  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4178  rhs_const->get_type_info().is_high_precision_timestamp())) {
4179  // If original timestamp lhs col has different precision,
4180  // column metadata holds value in original precision
4181  // therefore adjust rhs value to match lhs precision
4182 
4183  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4184  // artificially limit the lhs column range. RHS overflow/underflow is already
4185  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4186  bool is_valid;
4187  std::tie(is_valid, chunk_min, chunk_max) =
4189  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4190  if (!is_valid) {
4191  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4192  "value: "
4193  << std::to_string(chunk_min)
4194  << "\nChunk max value: " << std::to_string(chunk_max)
4195  << "\nLHS col precision is: "
4196  << std::to_string(lhs_col->get_type_info().get_dimension())
4197  << "\nRHS precision is: "
4198  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4199  return {false, -1};
4200  }
4201  }
4202  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4203  // It is obvious that a cast from timestamp to date is happening here,
4204  // so we have to correct the chunk min and max values to lower the precision as of
4205  // the date
4206  chunk_min = DateTruncateHighPrecisionToDate(
4207  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4208  chunk_max = DateTruncateHighPrecisionToDate(
4209  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4210  }
4211  llvm::LLVMContext local_context;
4212  CgenState local_cgen_state(local_context);
4213  CodeGenerator code_generator(&local_cgen_state, nullptr);
4214 
4215  const auto rhs_val =
4216  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4217 
4218  switch (comp_expr->get_optype()) {
4219  case kGE:
4220  if (chunk_max < rhs_val) {
4221  return {true, -1};
4222  }
4223  break;
4224  case kGT:
4225  if (chunk_max <= rhs_val) {
4226  return {true, -1};
4227  }
4228  break;
4229  case kLE:
4230  if (chunk_min > rhs_val) {
4231  return {true, -1};
4232  }
4233  break;
4234  case kLT:
4235  if (chunk_min >= rhs_val) {
4236  return {true, -1};
4237  }
4238  break;
4239  case kEQ:
4240  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4241  return {true, -1};
4242  } else if (is_rowid) {
4243  return {false, rhs_val - start_rowid};
4244  }
4245  break;
4246  default:
4247  break;
4248  }
4249  }
4250  return {false, -1};
4251 }
4252 
4253 /*
4254  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4255  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4256  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4257  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4258  * if at least one of these skipFragment calls return a true statment in its first value.
4259  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4260  * second value only if its first value is "false".
4261  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4262  * i.e., we only skip if none of the quals trigger the code to update the
4263  * rowid_lookup_key
4264  * - Only AND operations are valid and considered:
4265  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4266  * the skip
4267  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4268  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4269  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4270  * implemented differently, which cause the first one to skip correctly, but the second
4271  * one will not skip.
4272  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4273  * possible
4274  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4275  * projection, no skipping
4276  */
4277 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4278  const InputDescriptor& table_desc,
4279  const RelAlgExecutionUnit& ra_exe_unit,
4280  const Fragmenter_Namespace::FragmentInfo& fragment,
4281  const std::vector<uint64_t>& frag_offsets,
4282  const size_t frag_idx) {
4283  std::pair<bool, int64_t> skip_frag{false, -1};
4284  for (auto& inner_join : ra_exe_unit.join_quals) {
4285  if (inner_join.type != JoinType::INNER) {
4286  continue;
4287  }
4288 
4289  // extracting all the conjunctive simple_quals from the quals stored for the inner
4290  // join
4291  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4292  for (auto& qual : inner_join.quals) {
4293  auto temp_qual = qual_to_conjunctive_form(qual);
4294  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4295  temp_qual.simple_quals.begin(),
4296  temp_qual.simple_quals.end());
4297  }
4298  auto temp_skip_frag = skipFragment(
4299  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4300  if (temp_skip_frag.second != -1) {
4301  skip_frag.second = temp_skip_frag.second;
4302  return skip_frag;
4303  } else {
4304  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4305  }
4306  }
4307  return skip_frag;
4308 }
4309 
4311  const std::unordered_set<PhysicalInput>& phys_inputs) {
4312  AggregatedColRange agg_col_range_cache;
4313  CHECK(catalog_);
4314  std::unordered_set<int> phys_table_ids;
4315  for (const auto& phys_input : phys_inputs) {
4316  phys_table_ids.insert(phys_input.table_id);
4317  }
4318  std::vector<InputTableInfo> query_infos;
4319  for (const int table_id : phys_table_ids) {
4320  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
4321  }
4322  for (const auto& phys_input : phys_inputs) {
4323  const auto cd =
4324  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4325  CHECK(cd);
4326  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4327  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
4328  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
4329  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4330  agg_col_range_cache.setColRange(phys_input, col_range);
4331  }
4332  }
4333  return agg_col_range_cache;
4334 }
4335 
4337  const std::unordered_set<PhysicalInput>& phys_inputs) {
4338  StringDictionaryGenerations string_dictionary_generations;
4339  CHECK(catalog_);
4340  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4341  // the case then we need to populate them here to make sure that the generations are set
4342  // correctly.
4343  prepare_string_dictionaries(phys_inputs, *catalog_);
4344  for (const auto& phys_input : phys_inputs) {
4345  const auto cd =
4346  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4347  CHECK(cd);
4348  const auto& col_ti =
4349  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4350  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4351  const int dict_id = col_ti.get_comp_param();
4352  const auto dd = catalog_->getMetadataForDict(dict_id);
4353  CHECK(dd && dd->stringDict);
4354  string_dictionary_generations.setGeneration(dict_id,
4355  dd->stringDict->storageEntryCount());
4356  }
4357  }
4358  return string_dictionary_generations;
4359 }
4360 
4362  std::unordered_set<int> phys_table_ids) {
4363  TableGenerations table_generations;
4364  for (const int table_id : phys_table_ids) {
4365  const auto table_info = getTableInfo(table_id);
4366  table_generations.setGeneration(
4367  table_id,
4368  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4369  }
4370  return table_generations;
4371 }
4372 
4373 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4374  const std::unordered_set<int>& phys_table_ids) {
4375  CHECK(catalog_);
4377  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
4378  row_set_mem_owner_->setDictionaryGenerations(
4379  computeStringDictionaryGenerations(phys_inputs));
4381  table_generations_ = computeTableGenerations(phys_table_ids);
4382 }
4383 
4385  return recycler_mutex_;
4386 }
4387 
4389  return query_plan_dag_cache_;
4390 }
4391 
4394 }
4395 
4397  return executor_session_mutex_;
4398 }
4399 
4402  return current_query_session_;
4403 }
4404 
4406  const QuerySessionId& candidate_query_session,
4408  // if current_query_session is equal to the candidate_query_session,
4409  // or it is empty session we consider
4410  return !candidate_query_session.empty() &&
4411  (current_query_session_ == candidate_query_session);
4412 }
4413 
4414 // used only for testing
4416  const QuerySessionId& candidate_query_session,
4418  if (queries_session_map_.count(candidate_query_session) &&
4419  !queries_session_map_.at(candidate_query_session).empty()) {
4420  return queries_session_map_.at(candidate_query_session)
4421  .begin()
4422  ->second.getQueryStatus();
4423  }
4424  return QuerySessionStatus::QueryStatus::UNDEFINED;
4425 }
4426 
4430 }
4431 
4433  const QuerySessionId& query_session_id,
4434  const std::string& query_str,
4435  const std::string& query_submitted_time) {
4436  if (!query_session_id.empty()) {
4437  // if session is valid, do update 1) the exact executor id and 2) query status
4440  query_session_id, query_submitted_time, executor_id_, write_lock);
4441  updateQuerySessionStatusWithLock(query_session_id,
4442  query_submitted_time,
4443  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4444  write_lock);
4445  }
4446  return {query_session_id, query_str};
4447 }
4448 
4450  // check whether we are okay to execute the "pending" query
4451  // i.e., before running the query check if this query session is "ALREADY" interrupted
4453  if (query_session.empty()) {
4454  return;
4455  }
4456  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
4457  // something goes wrong since we assume this is caller's responsibility
4458  // (call this function only for enrolled query session)
4459  if (!queries_session_map_.count(query_session)) {
4460  VLOG(1) << "Interrupting pending query is not available since the query session is "
4461  "not enrolled";
4462  } else {
4463  // here the query session is enrolled but the interrupt flag is not registered
4464  VLOG(1)
4465  << "Interrupting pending query is not available since its interrupt flag is "
4466  "not registered";
4467  }
4468  return;
4469  }
4470  if (queries_interrupt_flag_[query_session]) {
4472  }
4473 }
4474 
4476  const std::string& submitted_time_str) {
4478  // clear the interrupt-related info for a finished query
4479  if (query_session.empty()) {
4480  return;
4481  }
4482  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
4483  if (query_session.compare(current_query_session_) == 0) {
4484  invalidateRunningQuerySession(session_write_lock);
4485  resetInterrupt();
4486  }
4487 }
4488 
4490  const QuerySessionId& query_session,
4491  const std::string& submitted_time_str,
4492  const QuerySessionStatus::QueryStatus new_query_status) {
4493  // update the running query session's the current status
4495  if (query_session.empty()) {
4496  return;
4497  }
4498  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4499  current_query_session_ = query_session;
4500  }
4502  query_session, submitted_time_str, new_query_status, session_write_lock);
4503 }
4504 
4506  const QuerySessionId& query_session,
4507  const std::string& query_str,
4508  const std::string& submitted_time_str,
4509  const size_t executor_id,
4510  const QuerySessionStatus::QueryStatus query_session_status) {
4511  // enroll the query session into the Executor's session map
4513  if (query_session.empty()) {
4514  return;
4515  }
4516 
4517  addToQuerySessionList(query_session,
4518  query_str,
4519  submitted_time_str,
4520  executor_id,
4521  query_session_status,
4522  session_write_lock);
4523 
4524  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4525  current_query_session_ = query_session;
4526  }
4527 }
4528 
4531  return queries_session_map_.size();
4532 }
4533 
4535  const QuerySessionId& query_session,
4536  const std::string& query_str,
4537  const std::string& submitted_time_str,
4538  const size_t executor_id,
4539  const QuerySessionStatus::QueryStatus query_status,
4541  // an internal API that enrolls the query session into the Executor's session map
4542  if (queries_session_map_.count(query_session)) {
4543  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
4544  queries_session_map_.at(query_session).erase(submitted_time_str);
4545  queries_session_map_.at(query_session)
4546  .emplace(submitted_time_str,
4547  QuerySessionStatus(query_session,
4548  executor_id,
4549  query_str,
4550  submitted_time_str,
4551  query_status));
4552  } else {
4553  queries_session_map_.at(query_session)
4554  .emplace(submitted_time_str,
4555  QuerySessionStatus(query_session,
4556  executor_id,
4557  query_str,
4558  submitted_time_str,
4559  query_status));
4560  }
4561  } else {
4562  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4563  executor_per_query_map.emplace(
4564  submitted_time_str,
4566  query_session, executor_id, query_str, submitted_time_str, query_status));
4567  queries_session_map_.emplace(query_session, executor_per_query_map);
4568  }
4569  return queries_interrupt_flag_.emplace(query_session, false).second;
4570 }
4571 
4573  const QuerySessionId& query_session,
4574  const std::string& submitted_time_str,
4575  const QuerySessionStatus::QueryStatus updated_query_status,
4577  // an internal API that updates query session status
4578  if (query_session.empty()) {
4579  return false;
4580  }
4581  if (queries_session_map_.count(query_session)) {
4582  for (auto& query_status : queries_session_map_.at(query_session)) {
4583  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4584  // no time difference --> found the target query status
4585  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4586  auto prev_status = query_status.second.getQueryStatus();
4587  if (prev_status == updated_query_status) {
4588  return false;
4589  }
4590  query_status.second.setQueryStatus(updated_query_status);
4591  return true;
4592  }
4593  }
4594  }
4595  return false;
4596 }
4597 
4599  const QuerySessionId& query_session,
4600  const std::string& submitted_time_str,
4601  const size_t executor_id,
4603  // update the executor id of the query session
4604  if (query_session.empty()) {
4605  return false;
4606  }
4607  if (queries_session_map_.count(query_session)) {
4608  auto storage = queries_session_map_.at(query_session);
4609  for (auto it = storage.begin(); it != storage.end(); it++) {
4610  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4611  // no time difference --> found the target query status
4612  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4613  queries_session_map_.at(query_session)
4614  .at(submitted_time_str)
4615  .setExecutorId(executor_id);
4616  return true;
4617  }
4618  }
4619  }
4620  return false;
4621 }
4622 
4624  const QuerySessionId& query_session,
4625  const std::string& submitted_time_str,
4627  if (query_session.empty()) {
4628  return false;
4629  }
4630  if (queries_session_map_.count(query_session)) {
4631  auto& storage = queries_session_map_.at(query_session);
4632  if (storage.size() > 1) {
4633  // in this case we only remove query executor info
4634  for (auto it = storage.begin(); it != storage.end(); it++) {
4635  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4636  // no time difference && have the same executor id--> found the target query
4637  if (it->second.getExecutorId() == executor_id_ &&
4638  submitted_time_str.compare(target_submitted_t_str) == 0) {
4639  storage.erase(it);
4640  return true;
4641  }
4642  }
4643  } else if (storage.size() == 1) {
4644  // here this session only has a single query executor
4645  // so we clear both executor info and its interrupt flag
4646  queries_session_map_.erase(query_session);
4647  queries_interrupt_flag_.erase(query_session);
4648  if (interrupted_.load()) {
4649  interrupted_.store(false);
4650  }
4651  return true;
4652  }
4653  }
4654  return false;
4655 }
4656 
4658  const QuerySessionId& query_session,
4660  if (query_session.empty()) {
4661  return;
4662  }
4663  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
4664  queries_interrupt_flag_[query_session] = true;
4665  }
4666 }
4667 
4669  const QuerySessionId& query_session,
4671  if (query_session.empty()) {
4672  return false;
4673  }
4674  auto flag_it = queries_interrupt_flag_.find(query_session);
4675  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4676  flag_it->second;
4677 }
4678 
4680  const QuerySessionId& query_session,
4682  if (query_session.empty()) {
4683  return false;
4684  }
4685  return !query_session.empty() && queries_session_map_.count(query_session);
4686 }
4687 
4689  const double runtime_query_check_freq,
4690  const unsigned pending_query_check_freq) const {
4691  // The only one scenario that we intentionally call this function is
4692  // to allow runtime query interrupt in QueryRunner for test cases.
4693  // Because test machine's default setting does not allow runtime query interrupt,
4694  // so we have to turn it on within test code if necessary.
4696  g_pending_query_interrupt_freq = pending_query_check_freq;
4697  g_running_query_interrupt_freq = runtime_query_check_freq;
4700  }
4701 }
4702 
4703 void Executor::addToCardinalityCache(const std::string& cache_key,
4704  const size_t cache_value) {
4707  cardinality_cache_[cache_key] = cache_value;
4708  VLOG(1) << "Put estimated cardinality to the cache";
4709  }
4710 }
4711 
4715  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
4716  VLOG(1) << "Reuse cached cardinality";
4717  return {true, cardinality_cache_[cache_key]};
4718  }
4719  return {false, -1};
4720 }
4721 
4722 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
4723  const QuerySessionId& query_session,
4725  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
4726  auto& query_infos = queries_session_map_.at(query_session);
4727  std::vector<QuerySessionStatus> ret;
4728  for (auto& info : query_infos) {
4729  ret.push_back(QuerySessionStatus(query_session,
4730  info.second.getExecutorId(),
4731  info.second.getQueryStr(),
4732  info.second.getQuerySubmittedTime(),
4733  info.second.getQueryStatus()));
4734  }
4735  return ret;
4736  }
4737  return {};
4738 }
4739 
4740 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
4741  const QuerySessionId& interrupt_session) const {
4742  std::vector<size_t> res;
4744  auto it = queries_session_map_.find(interrupt_session);
4745  if (it != queries_session_map_.end()) {
4746  for (auto& kv : it->second) {
4747  if (kv.second.getQueryStatus() ==
4748  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4749  res.push_back(kv.second.getExecutorId());
4750  }
4751  }
4752  }
4753  return res;
4754 }
4755 
4757  // this function should be called within an executor which is assigned
4758  // to the specific query thread (that indicates we already enroll the session)
4759  // check whether this is called from non unitary executor
4761  return false;
4762  };
4764  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
4765  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
4766  flag_it->second;
4767 }
4768 
4770  // this function is called under the recycler lock
4771  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
4772  latest_query_plan_extracted_ = query_plan_dag;
4773 }
4774 
4778 }
4779 
4780 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
4781 
4782 // contain the interrupt flag's status per query session
4784 // contain a list of queries per query session
4786 // session lock
4788 
4791 
4795 
4797 std::mutex Executor::kernel_mutex_;
4798 
4801 std::unordered_map<std::string, size_t> Executor::cardinality_cache_;
4802 // Executor has a single global result set recycler holder
4803 // which contains two recyclers related to query resultset
4806 
4807 // Useful for debugging.
4808 std::string Executor::dumpCache() const {
4809  std::stringstream ss;
4810  ss << "colRangeCache: ";
4811  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
4812  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
4813  << "} = " << exp_range.toString() << ", ";
4814  }
4815  ss << "stringDictGenerations: ";
4816  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4817  ss << "{" << key << "} = " << val << ", ";
4818  }
4819  ss << "tableGenerations: ";
4820  for (auto& [key, val] : table_generations_.asMap()) {
4821  ss << "{" << key << "} = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
4822  }
4823  ss << "\n";
4824  return ss.str();
4825 }
int get_table_id() const
Definition: Analyzer.h:200
const TableGeneration & getGeneration(const uint32_t id) const
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:224
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:1967
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
std::vector< Analyzer::Expr * > target_exprs
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4310
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:4688
constexpr size_t kArenaBlockOverhead
SQLAgg
Definition: sqldefs.h:72
#define CHECK_EQ(x, y)
Definition: Logger.h:230
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
Definition: Execute.cpp:4775
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:2504
std::vector< int > ChunkKey
Definition: types.h:36
double g_running_query_interrupt_freq
Definition: Execute.cpp:129
ExtModuleKinds
Definition: Execute.h:469
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
Definition: Execute.h:1309
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2795
static QuerySessionMap queries_session_map_
Definition: Execute.h:1304
CudaMgr_Namespace::CudaMgr * cudaMgr() const
Definition: Execute.h:672
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3516
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4668
int64_t kernel_queue_time_ms_
Definition: Execute.h:1286
JoinType
Definition: sqldefs.h:151
size_t maxGpuSlabSize() const
Definition: Execute.cpp:3826
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool useCudaBuffers() const
Definition: RenderInfo.cpp:73
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::DataMgr * data_mgr_
Definition: Execute.h:1282
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
Definition: Execute.h:1287
std::string cat(Ts &&...args)
size_t g_cpu_sub_task_size
Definition: Execute.cpp:83
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
Definition: Execute.cpp:1234
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
Definition: Execute.cpp:268
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:595
void checkPendingQueryStatus(const QuerySessionId &query_session)
Definition: Execute.cpp:4449
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
Definition: Execute.cpp:577
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1607
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1346
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3704
void setEntryCount(const size_t val)
input_table_info_cache_(this)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1545
heavyai::shared_lock< heavyai::shared_mutex > read_lock
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
Definition: Execute.cpp:3086
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:651
std::atomic< bool > interrupted_
Definition: Execute.h:1266
static ResultSetRecyclerHolder resultset_recycler_holder_
Definition: Execute.h:1331
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
Definition: Execute.cpp:2690
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
Definition: Execute.cpp:3952
ExecutorDeviceType
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
Definition: Execute.cpp:2047
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
double extract_max_stat_fp_type(const ChunkStats &stats, const SQLTypeInfo &ti)
static const int max_gpu_count
Definition: Execute.h:1259
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
Definition: Execute.cpp:3283
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:128
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
Definition: Execute.h:153
#define LOG(tag)
Definition: Logger.h:216
void freeLinearizedBuf()
std::string QueryPlanDAG
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
Definition: Execute.h:679
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:989
bool is_fp() const
Definition: sqltypes.h:514
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
Definition: Execute.h:85
Definition: sqldefs.h:34
const std::list< Analyzer::OrderEntry > order_entries
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:188
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:714
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
Definition: Execute.cpp:4489
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:437
Definition: sqldefs.h:35
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
Definition: Execute.cpp:1423
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::string join(T const &container, std::string const &delim)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3908
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
Definition: Execute.cpp:4361
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:703
#define UNREACHABLE()
Definition: Logger.h:266
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
Definition: Execute.cpp:195
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1461
ExpressionRange getColRange(const PhysicalInput &) const
debug_dir_(debug_dir)
#define CHECK_GE(x, y)
Definition: Logger.h:235
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:1038
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:48
Definition: sqldefs.h:29
Functions to support geospatial operations used by the executor.
QuerySessionId current_query_session_
Definition: Execute.h:1300
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:376
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
Definition: Execute.cpp:3295
heavyai::shared_mutex & getSessionLock()
Definition: Execute.cpp:4396
static const int32_t ERR_GEOS
Definition: Execute.h:1352
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1296
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
Definition: Execute.h:1264
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1226
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Definition: Execute.cpp:2216
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
Definition: Execute.cpp:4505
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1449
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:111
unsigned g_trivial_loop_join_threshold
Definition: Execute.cpp:89
static uint32_t gpu_active_modules_device_mask_
Definition: Execute.h:1263
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
Definition: Execute.cpp:2638
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
Definition: Execute.cpp:4020
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:1023
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:613
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
Definition: Execute.cpp:3862
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
Definition: Execute.cpp:671
void reset(bool discard_runtime_modules_only=false)
Definition: Execute.cpp:297
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
Definition: Execute.h:1373
bool g_is_test_env
Definition: Execute.cpp:141
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3811
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
bool is_number() const
Definition: sqltypes.h:515
#define CHECK_GT(x, y)
Definition: Logger.h:234
Container for compilation results and assorted options for a single execution unit.
bool isCPUOnly() const
Definition: Execute.cpp:621
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Definition: Execute.cpp:4405
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
Definition: Execute.cpp:2129
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:2407
std::vector< FragmentsPerTable > FragmentsList
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
bool is_time() const
Definition: sqltypes.h:516
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2872
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:97
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::string to_string(char const *&&v)
static size_t literalBytes(const CgenState::LiteralValue &lit)
Definition: CgenState.h:394
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4572
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const int source_dict_id, const int dest_dict_id, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
Definition: Execute.cpp:558
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:102
bool checkNonKernelTimeInterrupted() const
Definition: Execute.cpp:4756
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:492
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:91
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4623
std::vector< Analyzer::Expr * > target_exprs_union
void populate_string_dictionary(const int32_t table_id, const int32_t col_id, const Catalog_Namespace::Catalog &catalog)
Definition: Execute.cpp:205
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:659
constexpr double a
Definition: Utm.h:32
bool g_enable_string_functions
static const size_t high_scan_limit
Definition: Execute.h:602
Definition: sqldefs.h:74
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:81
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3739
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
Definition: Execute.cpp:3987
SQLOps get_optype() const
Definition: Analyzer.h:446
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
Definition: Execute.cpp:3101
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1350
const ExecutorId executor_id_
Definition: Execute.h:1200
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:228
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
Definition: Execute.cpp:4598
int8_t warpSize() const
Definition: Execute.cpp:3794
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
Definition: Execute.h:86
const size_t limit
const size_t max_gpu_slab_size_
Definition: Execute.h:1277
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
Definition: Execute.cpp:1255
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1396
std::string get_cuda_home(void)
Definition: CudaMgr.cpp:465
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:2313
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
Definition: Execute.cpp:632
int8_t groupColWidth(const size_t key_idx) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1338
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::list< std::shared_ptr< const InputColDescriptor > > get_selected_input_col_descs(int const table_id, std::list< std::shared_ptr< InputColDescriptor const >> const &input_col_descs)
Definition: Execute.cpp:3073
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:166
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
int getDeviceCount() const
Definition: CudaMgr.h:87
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1437
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
int64_t deviceCycles(int milliseconds) const
Definition: Execute.cpp:3830
ExecutorType executor_type
void init(LogOptions const &log_opts)
Definition: Logger.cpp:308
std::mutex str_dict_mutex_
Definition: Execute.h:1268
bool is_integer() const
Definition: sqltypes.h:512
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1281
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:3650
#define INJECT_TIMER(DESC)
Definition: measure.h:93