19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118 256}; // minimum memory allocation required for projection query output buffer
119 // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131 4096}; // GPU shared memory threshold (in bytes), if larger
132 // buffer sizes are required we do not use GPU shared
133 // memory optimizations Setting this to 0 means unlimited
134 // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136 true}; // enable use of shared memory when performing group-by with select non-count
138 bool g_enable_smem_non_grouped_agg{
139 true}; // enable optimizations for using GPU shared memory in implementation of
140 // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142 // limits the allocation for the output buffer arena
143 // and data recycler test
144 size_t g_enable_parallel_linearization{
145 10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
158 size_t g_approx_quantile_buffer{1000};
159 size_t g_approx_quantile_centroids{300};
161 bool g_enable_automatic_ir_metadata{true};
163 size_t g_max_log_length{500};
165 extern bool g_cache_string_hash;
167 int const Executor::max_gpu_count;
169 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
171 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
173 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
174 const std::string& udf_ir_filename,
175 llvm::LLVMContext& ctx);
176 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
177 const std::string& udf_ir_filename,
178 llvm::LLVMContext& ctx,
179 bool is_gpu = false);
180 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
181 const std::string& udf_ir_string,
182 llvm::LLVMContext& ctx,
183 bool is_gpu = false);
185 CodeCacheAccessor<CpuCompilationContext> Executor::s_stubs_accessor(
186 Executor::code_cache_size,
188 CodeCacheAccessor<CpuCompilationContext> Executor::s_code_accessor(
189 Executor::code_cache_size,
191 CodeCacheAccessor<CpuCompilationContext> Executor::cpu_code_accessor(
192 Executor::code_cache_size,
194 CodeCacheAccessor<GpuCompilationContext> Executor::gpu_code_accessor(
195 Executor::code_cache_size,
197 CodeCacheAccessor<CompilationContext> Executor::tf_code_accessor(
198 Executor::code_cache_size,
202 // This function is notably different from that in RelAlgExecutor because it already
203 // expects SPI values and therefore needs to avoid that transformation.
204 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs,
205 const Catalog_Namespace::Catalog& catalog) {
206 for (const auto [col_id, table_id] : phys_inputs) {
207 foreign_storage::populate_string_dictionary(table_id, col_id, catalog);
211 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
212 const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
213 // The fragmenter always returns at least one fragment, even when the table is empty.
214 return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
218 namespace foreign_storage {
219 // Foreign tables skip the population of dictionaries during metadata scan. This function
220 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
222 const int32_t col_id,
224 if (
const auto foreign_table = dynamic_cast<const ForeignTable*>(
227 if (col_desc->columnType.is_dict_encoded_type()) {
228 auto& fragmenter = foreign_table->fragmenter;
229 CHECK(fragmenter !=
nullptr);
233 for (
const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
242 CHECK(metadata_map.find(col_id) != metadata_map.end());
262 const size_t block_size_x,
263 const size_t grid_size_x,
264 const size_t max_gpu_slab_size,
265 const std::string& debug_dir,
266 const std::string& debug_file)
267 : executor_id_(executor_id)
268 , context_(new llvm::LLVMContext())
281 update_extension_modules();
289 auto template_path = root_path +
"/QueryEngine/RuntimeFunctions.bc";
290 CHECK(boost::filesystem::exists(template_path));
294 auto rt_geos_path = root_path +
"/QueryEngine/GeosRuntime.bc";
295 CHECK(boost::filesystem::exists(rt_geos_path));
300 auto rt_libdevice_path =
get_cuda_home() +
"/nvvm/libdevice/libdevice.10.bc";
301 if (boost::filesystem::exists(rt_libdevice_path)) {
306 <<
" does not exist; support for some UDF "
307 "functions might not be available.";
315 s_code_accessor.clear();
316 s_stubs_accessor.clear();
317 cpu_code_accessor.clear();
318 gpu_code_accessor.clear();
319 tf_code_accessor.clear();
321 if (discard_runtime_modules_only) {
326 cgen_state_->module_ =
nullptr;
328 extension_modules_.clear();
330 context_.reset(
new llvm::LLVMContext());
331 cgen_state_.reset(
new CgenState({},
false,
this));
337 const std::string& source) {
343 CHECK(!source.empty());
344 switch (module_kind) {
364 return std::unique_ptr<llvm::Module>();
369 bool erase_not_found =
false) {
372 auto llvm_module = read_module(module_kind, it->second);
374 extension_modules_[module_kind] = std::move(llvm_module);
375 }
else if (erase_not_found) {
376 extension_modules_.erase(module_kind);
378 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
380 <<
" LLVM module. The module will be unavailable.";
383 <<
" LLVM module. Using the existing module.";
387 if (erase_not_found) {
388 extension_modules_.erase(module_kind);
390 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
392 <<
" LLVM module is unavailable. The module will be unavailable.";
395 <<
" LLVM module is unavailable. Using the existing module.";
401 if (!update_runtime_modules_only) {
424 : executor_(executor)
426 , lock_(executor_.compilation_mutex_)
427 , cgen_state_(std::move(executor_.cgen_state_))
435 const bool allow_lazy_fetch,
436 const std::vector<InputTableInfo>& query_infos,
439 : executor_(executor)
449 executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
454 for (
auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
455 auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
456 if (inst && inst->getNumUses() == 0 && inst->getParent() ==
nullptr) {
459 p.first->deleteValue();
462 executor_.cgen_state_->row_func_hoisted_literals_.clear();
468 for (
auto& str_dict_translation_mgr :
469 executor_.cgen_state_->str_dict_translation_mgrs_) {
470 cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
472 executor_.cgen_state_->str_dict_translation_mgrs_.clear();
474 for (
auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
477 executor_.cgen_state_->in_values_bitmaps_.clear();
480 executor_.cgen_state_.reset(
cgen_state_.release());
485 const std::string& debug_dir,
486 const std::string& debug_file,
496 auto executor = std::make_shared<Executor>(executor_id,
503 CHECK(
executors_.insert(std::make_pair(executor_id, executor)).second);
508 switch (memory_level) {
511 mapd_unique_lock<mapd_shared_mutex> flush_lock(
526 throw std::runtime_error(
527 "Clearing memory levels other than the CPU level or GPU level is not "
538 const int dict_id_in,
539 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
540 const bool with_generation)
const {
541 CHECK(row_set_mem_owner);
542 std::lock_guard<std::mutex> lock(
544 return row_set_mem_owner->getOrAddStringDictProxy(
545 dict_id_in, with_generation,
catalog_);
549 const int dict_id_in,
550 const bool with_generation,
552 const int dict_id{dict_id_in < 0 ?
REGULAR_DICT(dict_id_in) : dict_id_in};
556 CHECK(dd->stringDict);
558 const int64_t generation =
559 with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
560 return addStringDict(dd->stringDict, dict_id, generation);
563 if (!lit_str_dict_proxy_) {
565 std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
567 lit_str_dict_proxy_ =
568 std::make_shared<StringDictionaryProxy>(tsd, literal_dict_ref.dictId, 0);
570 return lit_str_dict_proxy_.get();
574 const int source_dict_id,
575 const int dest_dict_id,
577 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
578 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
579 const bool with_generation)
const {
580 CHECK(row_set_mem_owner);
581 std::lock_guard<std::mutex> lock(
583 return row_set_mem_owner->getOrAddStringProxyTranslationMap(source_dict_id,
595 const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
596 const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
597 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
const {
598 CHECK(row_set_mem_owner);
599 std::lock_guard<std::mutex> lock(
602 if (!dest_string_op_infos.empty()) {
603 row_set_mem_owner->addStringProxyUnionTranslationMap(
604 dest_proxy, dest_proxy, dest_string_op_infos);
606 return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
607 source_proxy, dest_proxy, source_string_op_infos);
611 const int source_dict_id_in,
612 const int dest_dict_id_in,
613 const bool with_generation,
615 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
617 const auto source_proxy =
618 getOrAddStringDictProxy(source_dict_id_in, with_generation, catalog);
619 auto dest_proxy = getOrAddStringDictProxy(dest_dict_id_in, with_generation, catalog);
621 return addStringProxyIntersectionTranslationMap(
622 source_proxy, dest_proxy, string_op_infos);
624 return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
629 std::lock_guard<std::mutex> lock(state_mutex_);
631 .emplace_back(std::make_unique<quantile::TDigest>(
651 if (!cd || n > cd->columnType.get_physical_cols()) {
687 size_t num_bytes = 0;
691 for (
const auto& fetched_col_pair :
plan_state_->columns_to_fetch_) {
692 if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
696 if (fetched_col_pair.first < 0) {
705 if (!ti.is_logical_geo_type()) {
719 const std::vector<Analyzer::Expr*>& target_exprs)
const {
721 for (
const auto target_expr : target_exprs) {
730 const std::vector<Analyzer::Expr*>& target_exprs)
const {
733 std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
734 for (
const auto target_expr : target_exprs) {
735 if (!
plan_state_->isLazyFetchColumn(target_expr)) {
736 col_lazy_fetch_info.emplace_back(
741 auto col_id = col_var->get_column_id();
742 auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
743 auto cd = (col_var->get_table_id() > 0)
746 if (cd &&
IS_GEO(cd->columnType.get_type())) {
753 CHECK(!cd0->isVirtualCol);
754 auto col0_var = makeExpr<Analyzer::ColumnVar>(
755 col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
756 auto local_col0_id =
plan_state_->getLocalColumnId(col0_var.get(),
false);
757 col_lazy_fetch_info.emplace_back(
761 auto local_col_id =
plan_state_->getLocalColumnId(col_var,
false);
762 const auto& col_ti = col_var->get_type_info();
767 return col_lazy_fetch_info;
777 const std::unordered_map<int, CgenState::LiteralValues>& literals,
778 const int device_id) {
779 if (literals.empty()) {
782 const auto dev_literals_it = literals.find(device_id);
783 CHECK(dev_literals_it != literals.end());
784 const auto& dev_literals = dev_literals_it->second;
785 size_t lit_buf_size{0};
786 std::vector<std::string> real_strings;
787 std::vector<std::vector<double>> double_array_literals;
788 std::vector<std::vector<int8_t>> align64_int8_array_literals;
789 std::vector<std::vector<int32_t>> int32_array_literals;
790 std::vector<std::vector<int8_t>> align32_int8_array_literals;
791 std::vector<std::vector<int8_t>> int8_array_literals;
792 for (
const auto& lit : dev_literals) {
794 if (lit.which() == 7) {
795 const auto p = boost::get<std::string>(&lit);
797 real_strings.push_back(*p);
798 }
else if (lit.which() == 8) {
799 const auto p = boost::get<std::vector<double>>(&lit);
801 double_array_literals.push_back(*p);
802 }
else if (lit.which() == 9) {
803 const auto p = boost::get<std::vector<int32_t>>(&lit);
805 int32_array_literals.push_back(*p);
806 }
else if (lit.which() == 10) {
807 const auto p = boost::get<std::vector<int8_t>>(&lit);
809 int8_array_literals.push_back(*p);
810 }
else if (lit.which() == 11) {
811 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
813 if (p->second == 64) {
814 align64_int8_array_literals.push_back(p->first);
815 }
else if (p->second == 32) {
816 align32_int8_array_literals.push_back(p->first);
822 if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
825 int16_t crt_real_str_off = lit_buf_size;
826 for (
const auto& real_str : real_strings) {
827 CHECK_LE(real_str.size(),
static_cast<size_t>(std::numeric_limits<int16_t>::max()));
828 lit_buf_size += real_str.size();
830 if (double_array_literals.size() > 0) {
831 lit_buf_size =
align(lit_buf_size,
sizeof(
double));
833 int16_t crt_double_arr_lit_off = lit_buf_size;
834 for (
const auto& double_array_literal : double_array_literals) {
835 CHECK_LE(double_array_literal.size(),
836 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
837 lit_buf_size += double_array_literal.size() *
sizeof(double);
839 if (align64_int8_array_literals.size() > 0) {
840 lit_buf_size =
align(lit_buf_size,
sizeof(uint64_t));
842 int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
843 for (
const auto& align64_int8_array_literal : align64_int8_array_literals) {
844 CHECK_LE(align64_int8_array_literals.size(),
845 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
846 lit_buf_size += align64_int8_array_literal.size();
848 if (int32_array_literals.size() > 0) {
849 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
851 int16_t crt_int32_arr_lit_off = lit_buf_size;
852 for (
const auto& int32_array_literal : int32_array_literals) {
853 CHECK_LE(int32_array_literal.size(),
854 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
855 lit_buf_size += int32_array_literal.size() *
sizeof(int32_t);
857 if (align32_int8_array_literals.size() > 0) {
858 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
860 int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
861 for (
const auto& align32_int8_array_literal : align32_int8_array_literals) {
862 CHECK_LE(align32_int8_array_literals.size(),
863 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
864 lit_buf_size += align32_int8_array_literal.size();
866 int16_t crt_int8_arr_lit_off = lit_buf_size;
867 for (
const auto& int8_array_literal : int8_array_literals) {
869 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
870 lit_buf_size += int8_array_literal.size();
872 unsigned crt_real_str_idx = 0;
873 unsigned crt_double_arr_lit_idx = 0;
874 unsigned crt_align64_int8_arr_lit_idx = 0;
875 unsigned crt_int32_arr_lit_idx = 0;
876 unsigned crt_align32_int8_arr_lit_idx = 0;
877 unsigned crt_int8_arr_lit_idx = 0;
878 std::vector<int8_t> serialized(lit_buf_size);
880 for (
const auto& lit : dev_literals) {
883 switch (lit.which()) {
885 const auto p = boost::get<int8_t>(&lit);
887 serialized[off - lit_bytes] = *p;
891 const auto p = boost::get<int16_t>(&lit);
893 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
897 const auto p = boost::get<int32_t>(&lit);
899 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
903 const auto p = boost::get<int64_t>(&lit);
905 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
909 const auto p = boost::get<float>(&lit);
911 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
915 const auto p = boost::get<double>(&lit);
917 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
921 const auto p = boost::get<std::pair<std::string, int>>(&lit);
929 memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
933 const auto p = boost::get<std::string>(&lit);
935 int32_t off_and_len = crt_real_str_off << 16;
936 const auto& crt_real_str = real_strings[crt_real_str_idx];
937 off_and_len |=
static_cast<int16_t
>(crt_real_str.size());
938 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
939 memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
941 crt_real_str_off += crt_real_str.size();
945 const auto p = boost::get<std::vector<double>>(&lit);
947 int32_t off_and_len = crt_double_arr_lit_off << 16;
948 const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
949 int32_t len = crt_double_arr_lit.size();
951 off_and_len |=
static_cast<int16_t
>(len);
952 int32_t double_array_bytesize = len *
sizeof(double);
953 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
954 memcpy(&serialized[crt_double_arr_lit_off],
955 crt_double_arr_lit.data(),
956 double_array_bytesize);
957 ++crt_double_arr_lit_idx;
958 crt_double_arr_lit_off += double_array_bytesize;
962 const auto p = boost::get<std::vector<int32_t>>(&lit);
964 int32_t off_and_len = crt_int32_arr_lit_off << 16;
965 const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
966 int32_t len = crt_int32_arr_lit.size();
968 off_and_len |=
static_cast<int16_t
>(len);
969 int32_t int32_array_bytesize = len *
sizeof(int32_t);
970 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
971 memcpy(&serialized[crt_int32_arr_lit_off],
972 crt_int32_arr_lit.data(),
973 int32_array_bytesize);
974 ++crt_int32_arr_lit_idx;
975 crt_int32_arr_lit_off += int32_array_bytesize;
979 const auto p = boost::get<std::vector<int8_t>>(&lit);
981 int32_t off_and_len = crt_int8_arr_lit_off << 16;
982 const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
983 int32_t len = crt_int8_arr_lit.size();
985 off_and_len |=
static_cast<int16_t
>(len);
986 int32_t int8_array_bytesize = len;
987 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
988 memcpy(&serialized[crt_int8_arr_lit_off],
989 crt_int8_arr_lit.data(),
990 int8_array_bytesize);
991 ++crt_int8_arr_lit_idx;
992 crt_int8_arr_lit_off += int8_array_bytesize;
996 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
998 if (p->second == 64) {
999 int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1000 const auto& crt_align64_int8_arr_lit =
1001 align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1002 int32_t len = crt_align64_int8_arr_lit.size();
1004 off_and_len |=
static_cast<int16_t
>(len);
1005 int32_t align64_int8_array_bytesize = len;
1006 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1007 memcpy(&serialized[crt_align64_int8_arr_lit_off],
1008 crt_align64_int8_arr_lit.data(),
1009 align64_int8_array_bytesize);
1010 ++crt_align64_int8_arr_lit_idx;
1011 crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1012 }
else if (p->second == 32) {
1013 int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1014 const auto& crt_align32_int8_arr_lit =
1015 align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1016 int32_t len = crt_align32_int8_arr_lit.size();
1018 off_and_len |=
static_cast<int16_t
>(len);
1019 int32_t align32_int8_array_bytesize = len;
1020 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1021 memcpy(&serialized[crt_align32_int8_arr_lit_off],
1022 crt_align32_int8_arr_lit.data(),
1023 align32_int8_array_bytesize);
1024 ++crt_align32_int8_arr_lit_idx;
1025 crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1055 const int64_t agg_init_val,
1056 const int8_t out_byte_width,
1057 const int64_t* out_vec,
1058 const size_t out_vec_sz,
1059 const bool is_group_by,
1060 const bool float_argument_input) {
1064 if (0 != agg_init_val) {
1066 int64_t agg_result = agg_init_val;
1067 for (
size_t i = 0; i < out_vec_sz; ++i) {
1070 return {agg_result, 0};
1073 switch (out_byte_width) {
1075 int agg_result =
static_cast<int32_t
>(agg_init_val);
1076 for (
size_t i = 0; i < out_vec_sz; ++i) {
1079 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1080 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1082 const int64_t converted_bin =
1083 float_argument_input
1084 ?
static_cast<int64_t
>(agg_result)
1086 return {converted_bin, 0};
1090 int64_t agg_result = agg_init_val;
1091 for (
size_t i = 0; i < out_vec_sz; ++i) {
1094 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1095 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1097 return {agg_result, 0};
1106 int64_t agg_result = 0;
1107 for (
size_t i = 0; i < out_vec_sz; ++i) {
1108 agg_result += out_vec[i];
1110 return {agg_result, 0};
1113 switch (out_byte_width) {
1116 for (
size_t i = 0; i < out_vec_sz; ++i) {
1117 r += *
reinterpret_cast<const float*
>(may_alias_ptr(&out_vec[i]));
1119 const auto float_bin = *
reinterpret_cast<const int32_t*
>(may_alias_ptr(&r));
1120 const int64_t converted_bin =
1122 return {converted_bin, 0};
1126 for (
size_t i = 0; i < out_vec_sz; ++i) {
1127 r += *
reinterpret_cast<const double*
>(may_alias_ptr(&out_vec[i]));
1129 return {*
reinterpret_cast<const int64_t*
>(may_alias_ptr(&r)), 0};
1137 uint64_t agg_result = 0;
1138 for (
size_t i = 0; i < out_vec_sz; ++i) {
1139 const uint64_t out =
static_cast<uint64_t
>(out_vec[i]);
1142 return {
static_cast<int64_t
>(agg_result), 0};
1146 int64_t agg_result = agg_init_val;
1147 for (
size_t i = 0; i < out_vec_sz; ++i) {
1150 return {agg_result, 0};
1152 switch (out_byte_width) {
1154 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1155 for (
size_t i = 0; i < out_vec_sz; ++i) {
1158 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1159 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1161 const int64_t converted_bin =
1162 float_argument_input
1163 ?
static_cast<int64_t
>(agg_result)
1165 return {converted_bin, 0};
1168 int64_t agg_result = agg_init_val;
1169 for (
size_t i = 0; i < out_vec_sz; ++i) {
1172 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1173 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1175 return {agg_result, 0};
1184 int64_t agg_result = agg_init_val;
1185 for (
size_t i = 0; i < out_vec_sz; ++i) {
1188 return {agg_result, 0};
1190 switch (out_byte_width) {
1192 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1193 for (
size_t i = 0; i < out_vec_sz; ++i) {
1196 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1197 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1199 const int64_t converted_bin =
1200 float_argument_input ?
static_cast<int64_t
>(agg_result)
1202 return {converted_bin, 0};
1205 int64_t agg_result = agg_init_val;
1206 for (
size_t i = 0; i < out_vec_sz; ++i) {
1209 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1210 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1212 return {agg_result, 0};
1219 int64_t agg_result = agg_init_val;
1220 for (
size_t i = 0; i < out_vec_sz; ++i) {
1221 if (out_vec[i] != agg_init_val) {
1222 if (agg_result == agg_init_val) {
1223 agg_result = out_vec[i];
1224 }
else if (out_vec[i] != agg_result) {
1229 return {agg_result, 0};
1232 int64_t agg_result = agg_init_val;
1233 for (
size_t i = 0; i < out_vec_sz; ++i) {
1234 if (out_vec[i] != agg_init_val) {
1235 agg_result = out_vec[i];
1239 return {agg_result, 0};
1250 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1251 std::vector<TargetInfo>
const& targets) {
1252 auto& first = results_per_device.front().first;
1255 if (first_target_idx) {
1256 first->translateDictEncodedColumns(targets, *first_target_idx);
1258 for (
size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1259 const auto& next = results_per_device[dev_idx].first;
1261 if (first_target_idx) {
1262 next->translateDictEncodedColumns(targets, *first_target_idx);
1264 first->append(*next);
1266 return std::move(first);
1281 auto const targets = shared::transform<std::vector<TargetInfo>>(
1283 if (results_per_device.empty()) {
1284 return std::make_shared<ResultSet>(targets,
1292 using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1294 results_per_device.end(),
1295 [](
const IndexedResultSet& lhs,
const IndexedResultSet& rhs) {
1296 CHECK_GE(lhs.second.size(), size_t(1));
1297 CHECK_GE(rhs.second.size(), size_t(1));
1298 return lhs.second.front() < rhs.second.front();
1306 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1307 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1314 if (results_per_device.empty()) {
1315 auto const targets = shared::transform<std::vector<TargetInfo>>(
1317 return std::make_shared<ResultSet>(targets,
1335 const size_t executor_id,
1336 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1337 int64_t* compilation_queue_time) {
1340 *compilation_queue_time =
timer_stop(clock_begin);
1341 const auto& this_result_set = results_per_device[0].first;
1343 this_result_set->getTargetInfos(),
1344 this_result_set->getTargetInitVals(),
1346 return reduction_jit.
codegen();
1352 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1353 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1356 std::shared_ptr<ResultSet> reduced_results;
1358 const auto& first = results_per_device.front().first;
1362 results_per_device.size() > 1) {
1364 results_per_device.begin(),
1365 results_per_device.end(),
1367 [](
const size_t init,
const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1368 const auto& r = rs.first;
1369 return init + r->getQueryMemDesc().getEntryCount();
1371 CHECK(total_entry_count);
1372 auto query_mem_desc = first->getQueryMemDesc();
1374 reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1381 auto result_storage = reduced_results->allocateStorage(
plan_state_->init_agg_vals_);
1382 reduced_results->initializeStorage();
1383 switch (query_mem_desc.getEffectiveKeyWidth()) {
1385 first->getStorage()->moveEntriesToBuffer<int32_t>(
1386 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1389 first->getStorage()->moveEntriesToBuffer<int64_t>(
1390 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1396 reduced_results = first;
1399 int64_t compilation_queue_time = 0;
1400 const auto reduction_code =
1403 for (
size_t i = 1; i < results_per_device.size(); ++i) {
1404 reduced_results->getStorage()->reduce(
1405 *(results_per_device[i].first->getStorage()), {}, reduction_code,
executor_id_);
1407 reduced_results->addCompilationQueueTime(compilation_queue_time);
1408 return reduced_results;
1413 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1414 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1416 if (results_per_device.size() == 1) {
1417 return std::move(results_per_device.front().first);
1421 for (
const auto&
result : results_per_device) {
1422 auto rows =
result.first;
1430 std::max(
size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1435 return m.
asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc,
this, top_n, desc);
1440 std::unordered_set<int> available_gpus;
1445 for (
int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1446 available_gpus.insert(gpu_id);
1449 return available_gpus;
1453 const size_t cpu_count,
1454 const size_t gpu_count) {
1456 :
static_cast<size_t>(cpu_count);
1467 using checked_size_t = boost::multiprecision::number<
1468 boost::multiprecision::cpp_int_backend<64,
1470 boost::multiprecision::unsigned_magnitude,
1471 boost::multiprecision::checked,
1473 checked_size_t max_groups_buffer_entry_guess = 1;
1474 for (
const auto& query_info : query_infos) {
1475 CHECK(!query_info.info.fragments.empty());
1476 auto it = std::max_element(query_info.info.fragments.begin(),
1477 query_info.info.fragments.end(),
1478 [](
const FragmentInfo& f1,
const FragmentInfo& f2) {
1479 return f1.getNumTuples() < f2.getNumTuples();
1481 max_groups_buffer_entry_guess *= it->getNumTuples();
1485 constexpr
size_t max_groups_buffer_entry_guess_cap = 100000000;
1487 return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1488 max_groups_buffer_entry_guess_cap);
1490 return max_groups_buffer_entry_guess_cap;
1500 return td->tableName;
1507 const int device_count) {
1515 const std::vector<InputTableInfo>& table_infos,
1518 const int device_count) {
1519 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1520 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1524 if (!ra_exe_unit.
scan_limit && table_infos.size() == 1 &&
1538 std::vector<std::string> table_names;
1539 const auto& input_descs = ra_exe_unit.
input_descs;
1540 for (
const auto& input_desc : input_descs) {
1545 "Projection query would require a scan without a limit on table(s): " +
1549 "Projection query output result set on table(s): " +
1552 " rows, which is more than the current system limit of " +
1568 const auto inner_table_id = ra_exe_unit.
input_descs.back().getTableId();
1570 std::optional<size_t> inner_table_idx;
1571 for (
size_t i = 0; i < query_infos.size(); ++i) {
1572 if (query_infos[i].table_id == inner_table_id) {
1573 inner_table_idx = i;
1577 CHECK(inner_table_idx);
1578 return query_infos[*inner_table_idx].info.getNumTuples() <=
1584 template <
typename T>
1586 std::vector<std::string> expr_strs;
1587 for (
const auto& expr : expr_container) {
1589 expr_strs.emplace_back(
"NULL");
1591 expr_strs.emplace_back(expr->toString());
1599 const std::list<Analyzer::OrderEntry>& expr_container) {
1600 std::vector<std::string> expr_strs;
1601 for (
const auto& expr : expr_container) {
1602 expr_strs.emplace_back(expr.toString());
1608 switch (algorithm) {
1612 return "Speculative Top N";
1614 return "Streaming Top N";
1625 std::ostringstream os;
1627 const auto& scan_desc = input_col_desc->getScanDesc();
1628 os << scan_desc.getTableId() <<
"," << input_col_desc->getColId() <<
","
1629 << scan_desc.getNestLevel();
1634 os << qual->toString() <<
",";
1638 if (!ra_exe_unit.
quals.empty()) {
1639 for (
const auto& qual : ra_exe_unit.
quals) {
1641 os << qual->toString() <<
",";
1646 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1647 const auto& join_condition = ra_exe_unit.
join_quals[i];
1649 for (
const auto& qual : join_condition.quals) {
1651 os << qual->toString() <<
",";
1659 os << qual->toString() <<
",";
1665 os << expr->toString() <<
",";
1675 os <<
"\n\tTable/Col/Levels: ";
1677 const auto& scan_desc = input_col_desc->getScanDesc();
1678 os <<
"(" << scan_desc.getTableId() <<
", " << input_col_desc->getColId() <<
", "
1679 << scan_desc.getNestLevel() <<
") ";
1682 os <<
"\n\tSimple Quals: "
1686 if (!ra_exe_unit.
quals.empty()) {
1691 os <<
"\n\tJoin Quals: ";
1692 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1693 const auto& join_condition = ra_exe_unit.
join_quals[i];
1699 os <<
"\n\tGroup By: "
1703 os <<
"\n\tProjected targets: "
1706 os <<
"\n\tSort Info: ";
1707 const auto& sort_info = ra_exe_unit.
sort_info;
1708 os <<
"\n\t Order Entries: "
1716 os <<
"\n\tUnion: " << std::string(*ra_exe_unit.
union_all ?
"UNION ALL" :
"UNION");
1724 const size_t new_scan_limit) {
1728 ra_exe_unit_in.
quals,
1748 const std::vector<InputTableInfo>& query_infos,
1754 const bool has_cardinality_estimation,
1756 VLOG(1) <<
"Executor " <<
executor_id_ <<
" is executing work unit:" << ra_exe_unit_in;
1779 has_cardinality_estimation,
1785 result->setValidationOnlyRes();
1801 has_cardinality_estimation,
1807 result->setValidationOnlyRes();
1815 size_t& max_groups_buffer_entry_guess,
1817 const bool allow_single_frag_table_opt,
1818 const std::vector<InputTableInfo>& query_infos,
1823 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1825 const bool has_cardinality_estimation,
1828 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1830 CHECK(!query_infos.empty());
1831 if (!max_groups_buffer_entry_guess) {
1847 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1848 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1853 query_mem_desc_owned =
1854 query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1856 has_cardinality_estimation,
1872 CHECK(query_mem_desc_owned);
1873 crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1880 plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1881 CHECK(!query_mem_desc_owned);
1882 query_mem_desc_owned.reset(
1889 for (
const auto target_expr : ra_exe_unit.target_exprs) {
1890 plan_state_->target_exprs_.push_back(target_expr);
1897 const auto context_count =
1906 allow_single_frag_table_opt,
1908 *query_comp_desc_owned,
1909 *query_mem_desc_owned,
1914 shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1924 static_cast<size_t>(crt_min_byte_width << 1) <=
sizeof(int64_t)) {
1925 crt_min_byte_width <<= 1;
1934 std::string curRunningSession{
""};
1935 std::string curRunningQuerySubmittedTime{
""};
1936 bool sessionEnrolled =
false;
1940 curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1944 if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1947 curRunningQuerySubmittedTime,
1954 *query_mem_desc_owned,
1955 query_comp_desc_owned->getDeviceType(),
1960 crt_min_byte_width <<= 1;
1964 <<
", what(): " << e.what();
1970 }
while (static_cast<size_t>(crt_min_byte_width) <=
sizeof(int64_t));
1972 return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1988 const std::set<size_t>& fragment_indexes_param) {
1989 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1992 std::vector<InputTableInfo> table_infos{table_info};
1996 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1997 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1999 query_mem_desc_owned =
2000 query_comp_desc_owned->compile(0,
2012 CHECK(query_mem_desc_owned);
2013 CHECK_EQ(
size_t(1), ra_exe_unit.input_descs.size());
2014 const auto table_id = ra_exe_unit.input_descs[0].getTableId();
2017 std::set<size_t> fragment_indexes;
2018 if (fragment_indexes_param.empty()) {
2022 for (
size_t i = 0; i < outer_fragments.size(); i++) {
2023 fragment_indexes.emplace(i);
2026 fragment_indexes = fragment_indexes_param;
2034 for (
auto fragment_index : fragment_indexes) {
2043 *query_comp_desc_owned,
2044 *query_mem_desc_owned,
2049 kernel.
run(
this, 0, kernel_context);
2055 for (
const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2056 CHECK_EQ(result_fragment_indexes.size(), 1);
2057 cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2063 const std::vector<InputTableInfo>& table_infos,
2074 return std::make_shared<ResultSet>(
2099 std::shared_ptr<CompilationContext> compilation_context;
2108 compilation_context =
2109 tf_compilation_context.
compile(exe_unit,
true );
2113 compilation_context,
2119 std::shared_ptr<CompilationContext> compilation_context;
2127 compilation_context =
2128 tf_compilation_context.compile(exe_unit,
false );
2130 return exe_context.
execute(exe_unit,
2132 compilation_context,
2140 return std::make_shared<ResultSet>(query_comp_desc.
getIR());
2145 const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2149 [
this, &dict_id_visitor, &row_set_mem_owner](
const Analyzer::Expr* expr) {
2153 const auto dict_id = dict_id_visitor.
visit(expr);
2158 visitor.
visit(expr);
2163 visit_expr(group_expr.get());
2166 for (
const auto& group_expr : ra_exe_unit.
quals) {
2167 visit_expr(group_expr.get());
2170 for (
const auto& group_expr : ra_exe_unit.
simple_quals) {
2171 visit_expr(group_expr.get());
2174 const auto visit_target_expr = [&](
const Analyzer::Expr* target_expr) {
2175 const auto& target_type = target_expr->get_type_info();
2176 if (!target_type.is_string() || target_type.get_compression() ==
kENCODING_DICT) {
2180 agg_expr->get_aggtype() ==
kSAMPLE) {
2181 visit_expr(agg_expr->get_arg());
2184 visit_expr(target_expr);
2189 std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2191 std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2197 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
2201 if ((agg_info.agg_kind ==
kAVG || agg_info.agg_kind ==
kSUM) &&
2202 agg_info.agg_arg_type.get_type() ==
kDOUBLE) {
2206 if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2210 return requested_device_type;
2219 int64_t float_null_val = 0;
2220 *
reinterpret_cast<float*
>(may_alias_ptr(&float_null_val)) =
2222 return float_null_val;
2225 return *
reinterpret_cast<const int64_t*
>(may_alias_ptr(&double_null_val));
2231 std::vector<int64_t>& entry,
2232 const std::vector<Analyzer::Expr*>& target_exprs,
2234 for (
size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2235 const auto target_expr = target_exprs[target_idx];
2237 CHECK(agg_info.is_agg);
2238 target_infos.push_back(agg_info);
2240 const auto executor = query_mem_desc.
getExecutor();
2242 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2243 CHECK(row_set_mem_owner);
2244 const auto& count_distinct_desc =
2247 CHECK(row_set_mem_owner);
2248 auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2249 count_distinct_desc.bitmapPaddedSizeBytes(),
2251 entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2256 CHECK(row_set_mem_owner);
2257 row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2258 entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2265 }
else if (agg_info.agg_kind ==
kAVG) {
2269 if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2270 for (
int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2273 }
else if (agg_info.sql_type.is_varlen()) {
2277 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2280 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2286 const std::vector<Analyzer::Expr*>& target_exprs_in,
2289 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2290 std::vector<Analyzer::Expr*> target_exprs;
2291 for (
const auto target_expr : target_exprs_in) {
2292 const auto target_expr_copy =
2294 CHECK(target_expr_copy);
2295 auto ti = target_expr->get_type_info();
2297 target_expr_copy->set_type_info(ti);
2298 if (target_expr_copy->get_arg()) {
2299 auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2300 arg_ti.set_notnull(
false);
2301 target_expr_copy->get_arg()->set_type_info(arg_ti);
2303 target_exprs_owned_copies.push_back(target_expr_copy);
2304 target_exprs.push_back(target_expr_copy.get());
2306 std::vector<TargetInfo> target_infos;
2307 std::vector<int64_t> entry;
2309 const auto executor = query_mem_desc.
getExecutor();
2311 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2312 CHECK(row_set_mem_owner);
2313 auto rs = std::make_shared<ResultSet>(target_infos,
2317 executor->getCatalog(),
2318 executor->blockSize(),
2319 executor->gridSize());
2320 rs->allocateStorage();
2321 rs->fillOneEntry(entry);
2332 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2338 ra_exe_unit.
target_exprs, query_mem_desc, device_type);
2343 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2344 }
catch (
const std::bad_alloc&) {
2348 const auto shard_count =
2353 if (shard_count && !result_per_device.empty()) {
2357 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2374 size_t output_row_index,
2376 const std::vector<uint32_t>& top_permutation) {
2379 for (
const auto sorted_idx : top_permutation) {
2381 for (
size_t group_idx = 0; group_idx < input_query_mem_desc.
getKeyCount();
2383 const auto input_column_ptr =
2386 const auto output_column_ptr =
2389 output_row_index * output_query_mem_desc.
groupColWidth(group_idx);
2390 memcpy(output_column_ptr,
2395 for (
size_t slot_idx = 0; slot_idx < input_query_mem_desc.
getSlotCount();
2397 const auto input_column_ptr =
2400 const auto output_column_ptr =
2403 memcpy(output_column_ptr,
2409 return output_row_index;
2423 size_t output_row_index,
2425 const std::vector<uint32_t>& top_permutation) {
2428 for (
const auto sorted_idx : top_permutation) {
2429 const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.
getRowSize();
2430 memcpy(output_buffer + output_row_index * output_query_mem_desc.
getRowSize(),
2435 return output_row_index;
2446 const auto first_result_set = result_per_device.front().first;
2447 CHECK(first_result_set);
2448 auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2449 CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2451 top_query_mem_desc.setEntryCount(0);
2452 for (
auto&
result : result_per_device) {
2453 const auto result_set =
result.first;
2456 size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2457 top_query_mem_desc.setEntryCount(new_entry_cnt);
2459 auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2460 first_result_set->getDeviceType(),
2462 first_result_set->getRowSetMemOwner(),
2466 auto top_storage = top_result_set->allocateStorage();
2467 size_t top_output_row_idx{0};
2468 for (
auto&
result : result_per_device) {
2469 const auto result_set =
result.first;
2471 const auto& top_permutation = result_set->getPermutationBuffer();
2472 CHECK_LE(top_permutation.size(), top_n);
2473 if (top_query_mem_desc.didOutputColumnar()) {
2475 result_set->getQueryMemDesc(),
2488 CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2489 return top_result_set;
2494 std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
2496 CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2497 for (
size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2498 int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
2500 std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
2508 for (
const auto& col : fetched_cols) {
2509 if (col.is_lazily_fetched) {
2522 const std::vector<InputTableInfo>& table_infos,
2525 const bool allow_single_frag_table_opt,
2526 const size_t context_count,
2530 std::unordered_set<int>& available_gpus,
2531 int& available_cpus) {
2532 std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2539 : std::vector<Data_Namespace::MemoryInfo>{},
2545 const bool uses_lazy_fetch =
2550 const auto device_count =
deviceCount(device_type);
2553 fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2557 use_multifrag_kernel,
2560 if (eo.
with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2564 if (use_multifrag_kernel) {
2565 VLOG(1) <<
"Creating multifrag execution kernels";
2573 auto multifrag_kernel_dispatch = [&ra_exe_unit,
2579 render_info](
const int device_id,
2581 const int64_t rowid_lookup_key) {
2582 execution_kernels.emplace_back(
2583 std::make_unique<ExecutionKernel>(ra_exe_unit,
2595 fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2597 VLOG(1) <<
"Creating one execution kernel per fragment";
2602 table_infos.size() == 1 && table_infos.front().table_id > 0) {
2603 const auto max_frag_size =
2604 table_infos.front().info.getFragmentNumTuplesUpperBound();
2607 <<
" to match max fragment size " << max_frag_size
2608 <<
" for kernel per fragment execution path.";
2613 size_t frag_list_idx{0};
2614 auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2622 render_info](
const int device_id,
2624 const int64_t rowid_lookup_key) {
2625 if (!frag_list.size()) {
2630 execution_kernels.emplace_back(
2631 std::make_unique<ExecutionKernel>(ra_exe_unit,
2645 fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2649 return execution_kernels;
2653 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2662 kernels.empty() ?
nullptr : &kernels[0]->ra_exe_unit_;
2666 shared_context.setThreadPool(&tg);
2668 ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(
nullptr); });
2671 VLOG(1) <<
"Launching " << kernels.size() <<
" kernels for query on "
2673 size_t kernel_idx = 1;
2674 for (
auto& kernel : kernels) {
2675 CHECK(kernel.get());
2680 crt_kernel_idx = kernel_idx++] {
2682 const size_t thread_i = crt_kernel_idx %
cpu_threads();
2683 kernel->run(
this, thread_i, shared_context);
2688 for (
auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2695 results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2697 results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2707 const size_t table_idx,
2708 const size_t outer_frag_idx,
2709 std::map<int, const TableFragments*>& selected_tables_fragments,
2710 const std::unordered_map<int, const Analyzer::BinOper*>&
2711 inner_table_id_to_join_condition) {
2712 const int table_id = ra_exe_unit.
input_descs[table_idx].getTableId();
2713 auto table_frags_it = selected_tables_fragments.find(table_id);
2714 CHECK(table_frags_it != selected_tables_fragments.end());
2715 const auto& outer_input_desc = ra_exe_unit.
input_descs[0];
2716 const auto outer_table_fragments_it =
2717 selected_tables_fragments.find(outer_input_desc.getTableId());
2718 const auto outer_table_fragments = outer_table_fragments_it->second;
2719 CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2720 CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2722 return {outer_frag_idx};
2724 const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2725 auto& inner_frags = table_frags_it->second;
2727 std::vector<size_t> all_frag_ids;
2728 for (
size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2730 const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2734 inner_table_id_to_join_condition,
2739 all_frag_ids.push_back(inner_frag_idx);
2741 return all_frag_ids;
2749 const int table_idx,
2750 const std::unordered_map<int, const Analyzer::BinOper*>&
2751 inner_table_id_to_join_condition,
2757 CHECK(table_idx >= 0 &&
2758 static_cast<size_t>(table_idx) < ra_exe_unit.
input_descs.size());
2759 const int inner_table_id = ra_exe_unit.
input_descs[table_idx].getTableId();
2761 if (outer_fragment_info.
shard == -1 || inner_fragment_info.
shard == -1 ||
2762 outer_fragment_info.
shard == inner_fragment_info.
shard) {
2767 CHECK(!inner_table_id_to_join_condition.empty());
2768 auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2769 CHECK(condition_it != inner_table_id_to_join_condition.end());
2770 join_condition = condition_it->second;
2771 CHECK(join_condition);
2774 plan_state_->join_info_.join_hash_tables_.size());
2775 for (
size_t i = 0; i <
plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2776 if (
plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2778 CHECK(!join_condition);
2779 join_condition =
plan_state_->join_info_.equi_join_tautologies_[i].get();
2783 if (!join_condition) {
2787 if (join_condition->is_overlaps_oper()) {
2790 size_t shard_count{0};
2791 if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2792 join_condition->get_left_operand())) {
2797 join_condition,
this, inner_outer_pairs);
2801 if (shard_count && !ra_exe_unit.
join_quals.empty()) {
2802 plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2812 const int col_id = col_desc->
getColId();
2819 const std::vector<InputDescriptor>& input_descs,
2820 const std::map<int, const TableFragments*>& all_tables_fragments) {
2821 std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2822 for (
auto& desc : input_descs) {
2823 const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2824 CHECK(fragments_it != all_tables_fragments.end());
2825 const auto& fragments = *fragments_it->second;
2826 std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2827 for (
size_t i = 0, off = 0; i < fragments.size(); ++i) {
2828 frag_offsets[i] = off;
2829 off += fragments[i].getNumTuples();
2831 tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2833 return tab_id_to_frag_offsets;
2836 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2839 const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2840 const std::vector<InputDescriptor>& input_descs,
2841 const std::map<int, const TableFragments*>& all_tables_fragments) {
2842 std::vector<std::vector<int64_t>> all_num_rows;
2843 std::vector<std::vector<uint64_t>> all_frag_offsets;
2844 const auto tab_id_to_frag_offsets =
2846 std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2847 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2848 std::vector<int64_t> num_rows;
2849 std::vector<uint64_t> frag_offsets;
2851 CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2853 for (
size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2854 const auto frag_id = ra_exe_unit.
union_all ? 0 : selected_frag_ids[tab_idx];
2855 const auto fragments_it =
2856 all_tables_fragments.find(input_descs[tab_idx].getTableId());
2857 CHECK(fragments_it != all_tables_fragments.end());
2858 const auto& fragments = *fragments_it->second;
2859 if (ra_exe_unit.
join_quals.empty() || tab_idx == 0 ||
2860 plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2861 const auto& fragment = fragments[frag_id];
2862 num_rows.push_back(fragment.getNumTuples());
2864 size_t total_row_count{0};
2865 for (
const auto& fragment : fragments) {
2866 total_row_count += fragment.getNumTuples();
2868 num_rows.push_back(total_row_count);
2870 const auto frag_offsets_it =
2871 tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2872 CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2873 const auto& offsets = frag_offsets_it->second;
2875 frag_offsets.push_back(offsets[frag_id]);
2877 all_num_rows.push_back(num_rows);
2879 all_frag_offsets.push_back(frag_offsets);
2881 return {all_num_rows, all_frag_offsets};
2889 const auto& input_descs = ra_exe_unit.
input_descs;
2891 if (nest_level < 1 ||
2893 ra_exe_unit.
join_quals.empty() || input_descs.size() < 2 ||
2895 plan_state_->isLazyFetchColumn(inner_col_desc))) {
2899 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2900 CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2901 const auto& fragments = selected_fragments[nest_level].fragment_ids;
2902 return fragments.size() > 1;
2913 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2914 CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2915 const auto& fragments = selected_fragments[nest_level].fragment_ids;
2916 auto need_linearize =
2919 return table_id > 0 && need_linearize && fragments.size() > 1;
2931 const int device_id,
2933 const std::map<int, const TableFragments*>& all_tables_fragments,
2936 std::list<ChunkIter>& chunk_iterators,
2937 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2939 const size_t thread_idx,
2940 const bool allow_runtime_interrupt) {
2944 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2945 std::vector<size_t> local_col_to_frag_pos;
2947 local_col_to_frag_pos,
2953 selected_fragments_crossjoin);
2954 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2955 std::vector<std::vector<int64_t>> all_num_rows;
2956 std::vector<std::vector<uint64_t>> all_frag_offsets;
2957 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2958 std::vector<const int8_t*> frag_col_buffers(
2960 for (
const auto& col_id : col_global_ids) {
2961 if (allow_runtime_interrupt) {
2962 bool isInterrupted =
false;
2969 if (isInterrupted) {
2977 const int table_id = col_id->getScanDesc().getTableId();
2979 if (cd && cd->isVirtualCol) {
2983 const auto fragments_it = all_tables_fragments.find(table_id);
2984 CHECK(fragments_it != all_tables_fragments.end());
2985 const auto fragments = fragments_it->second;
2986 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2988 CHECK_LT(static_cast<size_t>(it->second),
2990 const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2991 if (!fragments->size()) {
2994 CHECK_LT(frag_id, fragments->size());
2995 auto memory_level_for_column = memory_level;
2997 std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId());
2998 if (
plan_state_->columns_to_fetch_.find(tbl_col_ids) ==
3003 frag_col_buffers[it->second] =
3005 memory_level_for_column,
3016 cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3017 bool for_lazy_fetch =
false;
3018 if (
plan_state_->columns_to_not_fetch_.find(tbl_col_ids) !=
3020 for_lazy_fetch =
true;
3021 VLOG(2) <<
"Try to linearize lazy fetch column (col_id: " << cd->columnId
3022 <<
", col_name: " << cd->columnName <<
")";
3027 all_tables_fragments,
3031 for_lazy_fetch ? 0 : device_id,
3035 frag_col_buffers[it->second] =
3038 all_tables_fragments,
3039 memory_level_for_column,
3045 frag_col_buffers[it->second] =
3049 all_tables_fragments,
3052 memory_level_for_column,
3058 all_frag_col_buffers.push_back(frag_col_buffers);
3061 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
3062 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3070 const int device_id,
3072 const std::map<int, const TableFragments*>& all_tables_fragments,
3075 std::list<ChunkIter>& chunk_iterators,
3076 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3078 const size_t thread_idx,
3079 const bool allow_runtime_interrupt) {
3083 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3084 std::vector<std::vector<int64_t>> all_num_rows;
3085 std::vector<std::vector<uint64_t>> all_frag_offsets;
3087 CHECK(!selected_fragments.empty());
3090 using TableId = int;
3091 TableId
const selected_table_id = selected_fragments.front().table_id;
3092 bool const input_descs_index =
3093 selected_table_id == ra_exe_unit.
input_descs[1].getTableId();
3094 if (!input_descs_index) {
3097 bool const input_col_descs_index =
3098 selected_table_id ==
3099 (*std::next(ra_exe_unit.
input_col_descs.begin()))->getScanDesc().getTableId();
3100 if (!input_col_descs_index) {
3104 VLOG(2) <<
"selected_fragments.size()=" << selected_fragments.size()
3105 <<
" selected_table_id=" << selected_table_id
3106 <<
" input_descs_index=" << int(input_descs_index)
3107 <<
" input_col_descs_index=" << int(input_col_descs_index)
3108 <<
" ra_exe_unit.input_descs="
3110 <<
" ra_exe_unit.input_col_descs="
3114 std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
3115 table_id_to_input_col_descs;
3117 TableId
const table_id = input_col_desc->getScanDesc().getTableId();
3118 table_id_to_input_col_descs[table_id].push_back(input_col_desc);
3120 for (
auto const& pair : table_id_to_input_col_descs) {
3121 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3122 std::vector<size_t> local_col_to_frag_pos;
3125 local_col_to_frag_pos,
3131 selected_fragments_crossjoin);
3133 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
3134 if (allow_runtime_interrupt) {
3135 bool isInterrupted =
false;
3142 if (isInterrupted) {
3146 std::vector<const int8_t*> frag_col_buffers(
3148 for (
const auto& col_id : pair.second) {
3150 const int table_id = col_id->getScanDesc().getTableId();
3153 if (cd && cd->isVirtualCol) {
3157 const auto fragments_it = all_tables_fragments.find(table_id);
3158 CHECK(fragments_it != all_tables_fragments.end());
3159 const auto fragments = fragments_it->second;
3160 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3162 CHECK_LT(static_cast<size_t>(it->second),
3164 const size_t frag_id = ra_exe_unit.
union_all
3166 : selected_frag_ids[local_col_to_frag_pos[it->second]];
3167 if (!fragments->size()) {
3170 CHECK_LT(frag_id, fragments->size());
3171 auto memory_level_for_column = memory_level;
3173 std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
3178 frag_col_buffers[it->second] =
3180 memory_level_for_column,
3186 frag_col_buffers[it->second] =
3189 all_tables_fragments,
3190 memory_level_for_column,
3195 frag_col_buffers[it->second] =
3199 all_tables_fragments,
3202 memory_level_for_column,
3208 all_frag_col_buffers.push_back(frag_col_buffers);
3210 std::vector<std::vector<int64_t>> num_rows;
3211 std::vector<std::vector<uint64_t>> frag_offsets;
3213 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
3214 all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
3215 all_frag_offsets.insert(
3216 all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
3223 if (all_frag_col_buffers[0].size() > 1 && all_frag_col_buffers[0][0] &&
3224 !all_frag_col_buffers[0][1]) {
3225 std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
3229 for (
size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
3230 all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
3232 if (input_descs_index == input_col_descs_index) {
3233 std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
3239 <<
" input_col_descs_index=" << input_col_descs_index;
3240 return {{all_frag_col_buffers[input_descs_index]},
3241 {{all_num_rows[0][input_descs_index]}},
3242 {{all_frag_offsets[0][input_descs_index]}}};
3246 const size_t scan_idx,
3250 !
plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3251 !selected_fragments[scan_idx].fragment_ids.empty()) {
3256 return selected_fragments[scan_idx].fragment_ids;
3260 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3261 std::vector<size_t>& local_col_to_frag_pos,
3262 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3265 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
3267 const auto& input_descs = ra_exe_unit.
input_descs;
3268 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3269 const int table_id = input_descs[scan_idx].getTableId();
3270 CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
3271 selected_fragments_crossjoin.push_back(
3273 for (
const auto& col_id : col_global_ids) {
3275 const auto& input_desc = col_id->getScanDesc();
3276 if (input_desc.getTableId() != table_id ||
3277 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
3280 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3282 CHECK_LT(static_cast<size_t>(it->second),
3284 local_col_to_frag_pos[it->second] = frag_pos;
3291 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3292 std::vector<size_t>& local_col_to_frag_pos,
3293 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3296 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
3298 const auto& input_descs = ra_exe_unit.
input_descs;
3299 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3300 const int table_id = input_descs[scan_idx].getTableId();
3303 if (selected_fragments[0].table_id != table_id) {
3307 selected_fragments_crossjoin.push_back(
3310 for (
const auto& col_id : col_global_ids) {
3312 const auto& input_desc = col_id->getScanDesc();
3313 if (input_desc.getTableId() != table_id ||
3314 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
3317 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3319 CHECK_LT(static_cast<size_t>(it->second),
3321 local_col_to_frag_pos[it->second] = frag_pos;
3331 OutVecOwner(
const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3333 for (
auto out : out_vec_) {
3346 const bool hoist_literals,
3348 const std::vector<Analyzer::Expr*>& target_exprs,
3350 std::vector<std::vector<const int8_t*>>& col_buffers,
3352 const std::vector<std::vector<int64_t>>& num_rows,
3353 const std::vector<std::vector<uint64_t>>& frag_offsets,
3355 const int device_id,
3356 const uint32_t start_rowid,
3357 const uint32_t num_tables,
3358 const bool allow_runtime_interrupt,
3360 const int64_t rows_to_process) {
3363 CHECK(!results || !(*results));
3364 if (col_buffers.empty()) {
3373 <<
"CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3374 "currently unsupported.";
3379 std::vector<int64_t*> out_vec;
3382 std::unique_ptr<OutVecOwner> output_memory_scope;
3383 if (allow_runtime_interrupt) {
3384 bool isInterrupted =
false;
3390 if (isInterrupted) {
3400 CHECK(cpu_generated_code);
3411 join_hash_table_ptrs,
3413 output_memory_scope.reset(
new OutVecOwner(out_vec));
3417 CHECK(gpu_generated_code);
3435 allow_runtime_interrupt,
3436 join_hash_table_ptrs,
3437 render_allocator_map_ptr);
3438 output_memory_scope.reset(
new OutVecOwner(out_vec));
3441 }
catch (
const std::exception& e) {
3442 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3464 std::vector<int64_t> reduced_outs;
3465 const auto num_frags = col_buffers.size();
3466 const size_t entry_count =
3472 if (
size_t(1) == entry_count) {
3473 for (
auto out : out_vec) {
3475 reduced_outs.push_back(*out);
3478 size_t out_vec_idx = 0;
3480 for (
const auto target_expr : target_exprs) {
3482 CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3483 << target_expr->toString();
3485 const int num_iterations = agg_info.sql_type.is_geometry()
3486 ? agg_info.sql_type.get_physical_coord_cols()
3489 for (
int i = 0; i < num_iterations; i++) {
3496 val1 = out_vec[out_vec_idx][0];
3499 const auto chosen_bytes =
static_cast<size_t>(
3505 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3506 out_vec[out_vec_idx],
3509 float_argument_input);
3514 reduced_outs.push_back(val1);
3515 if (agg_info.agg_kind ==
kAVG ||
3516 (agg_info.agg_kind ==
kSAMPLE &&
3517 (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3518 const auto chosen_bytes =
static_cast<size_t>(
3523 agg_info.agg_kind ==
kAVG ?
kCOUNT : agg_info.agg_kind,
3526 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3527 out_vec[out_vec_idx + 1],
3534 reduced_outs.push_back(val2);
3547 auto rows_ptr = std::shared_ptr<ResultSet>(
3549 rows_ptr->fillOneEntry(reduced_outs);
3550 *results = std::move(rows_ptr);
3558 return results && results->rowCount() < scan_limit;
3566 const bool hoist_literals,
3569 std::vector<std::vector<const int8_t*>>& col_buffers,
3570 const std::vector<size_t> outer_tab_frag_ids,
3572 const std::vector<std::vector<int64_t>>& num_rows,
3573 const std::vector<std::vector<uint64_t>>& frag_offsets,
3575 const int device_id,
3576 const int outer_table_id,
3577 const int64_t scan_limit,
3578 const uint32_t start_rowid,
3579 const uint32_t num_tables,
3580 const bool allow_runtime_interrupt,
3582 const int64_t rows_to_process) {
3586 CHECK(!results || !(*results));
3587 if (col_buffers.empty()) {
3598 if (allow_runtime_interrupt) {
3599 bool isInterrupted =
false;
3605 if (isInterrupted) {
3618 VLOG(2) <<
"bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.
union_all)
3619 <<
" ra_exe_unit.input_descs="
3621 <<
" ra_exe_unit.input_col_descs="
3623 <<
" ra_exe_unit.scan_limit=" << ra_exe_unit.
scan_limit
3626 <<
" query_exe_context->query_buffers_->num_rows_="
3628 <<
" query_exe_context->query_mem_desc_.getEntryCount()="
3630 <<
" device_id=" << device_id <<
" outer_table_id=" << outer_table_id
3631 <<
" scan_limit=" << scan_limit <<
" start_rowid=" << start_rowid
3632 <<
" num_tables=" << num_tables;
3639 std::stable_sort(ra_exe_unit_copy.
input_descs.begin(),
3641 [outer_table_id](
auto const&
a,
auto const& b) {
3642 return a.getTableId() == outer_table_id &&
3643 b.getTableId() != outer_table_id;
3646 ra_exe_unit_copy.
input_descs.back().getTableId() != outer_table_id) {
3651 [outer_table_id](
auto const& input_col_desc) {
3652 return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3658 const int32_t scan_limit_for_query =
3660 const int32_t max_matched = scan_limit_for_query == 0
3662 : scan_limit_for_query;
3665 CHECK(cpu_generated_code);
3676 join_hash_table_ptrs,
3682 CHECK(gpu_generated_code);
3699 allow_runtime_interrupt,
3700 join_hash_table_ptrs,
3701 render_allocator_map_ptr);
3708 }
catch (
const std::exception& e) {
3709 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3725 *results = query_exe_context->
getRowSet(ra_exe_unit_copy,
3728 VLOG(2) <<
"results->rowCount()=" << (*results)->rowCount();
3729 (*results)->holdLiterals(hoist_buf);
3731 if (error_code < 0 && render_allocator_map_ptr) {
3732 auto const adjusted_scan_limit =
3736 if (adjusted_scan_limit != 0) {
3742 if (results && error_code &&
3751 const int device_id) {
3752 std::vector<int8_t*> table_ptrs;
3753 const auto& join_hash_tables =
plan_state_->join_info_.join_hash_tables_;
3754 for (
auto hash_table : join_hash_tables) {
3756 CHECK(table_ptrs.empty());
3759 table_ptrs.push_back(hash_table->getJoinHashBuffer(
3766 const std::vector<InputTableInfo>& query_infos,
3771 const bool contains_left_deep_outer_join =
3772 ra_exe_unit && std::find_if(ra_exe_unit->
join_quals.begin(),
3778 new CgenState(query_infos.size(), contains_left_deep_outer_join,
this));
3786 const std::vector<InputTableInfo>& query_infos) {
3788 const auto ld_count = input_descs.size();
3790 for (
size_t i = 0; i < ld_count; ++i) {
3792 const auto frag_count = query_infos[i].info.fragments.size();
3796 if (frag_count > 1) {
3798 frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3807 const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3808 const std::vector<InputTableInfo>& query_infos,
3811 const HashType preferred_hash_type,
3817 return {
nullptr,
"Overlaps hash join disabled, attempting to fall back to loop join"};
3827 preferred_hash_type,
3831 hashtable_build_dag_map,
3833 table_id_to_node_map);
3836 return {
nullptr, e.what()};
3842 CHECK(!dev_props.empty());
3843 return dev_props.front().warpSize;
3878 return static_cast<int64_t
>(dev_props.front().clockKhz) * milliseconds;
3885 if (value->getType()->isIntegerTy() && from_ti.
is_number() && to_ti.
is_fp() &&
3890 fp_type = llvm::Type::getFloatTy(
cgen_state_->context_);
3893 fp_type = llvm::Type::getDoubleTy(
cgen_state_->context_);
3898 value =
cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3910 CHECK(val->getType()->isPointerTy());
3912 const auto val_ptr_type =
static_cast<llvm::PointerType*
>(val->getType());
3913 const auto val_type = val_ptr_type->getElementType();
3914 size_t val_width = 0;
3915 if (val_type->isIntegerTy()) {
3916 val_width = val_type->getIntegerBitWidth();
3918 if (val_type->isFloatTy()) {
3921 CHECK(val_type->isDoubleTy());
3926 if (bitWidth == val_width) {
3933 #define EXECUTE_INCLUDE
3939 #undef EXECUTE_INCLUDE
3944 auto deleted_cols_it = deleted_cols_map.find(deleted_cd->
tableId);
3945 if (deleted_cols_it == deleted_cols_map.end()) {
3947 deleted_cols_map.insert(std::make_pair(deleted_cd->
tableId, deleted_cd)).second);
3949 CHECK_EQ(deleted_cd, deleted_cols_it->second);
3960 auto ra_exe_unit_with_deleted = ra_exe_unit;
3962 for (
const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3972 CHECK(deleted_cd->columnType.is_boolean());
3975 for (
const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3976 if (input_col.get()->getColId() == deleted_cd->columnId &&
3977 input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3978 input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3987 deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3991 return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3999 const int64_t chunk_min,
4000 const int64_t chunk_max,
4005 CHECK(ldim != rdim);
4009 return {
true, chunk_min / scale, chunk_max / scale};
4013 boost::multiprecision::cpp_int_backend<64,
4015 boost::multiprecision::signed_magnitude,
4016 boost::multiprecision::checked,
4021 std::make_tuple(
true,
4025 }
catch (
const std::overflow_error& e) {
4028 return std::make_tuple(
false, chunk_min, chunk_max);
4048 const auto& chunk_type = deleted_cd->
columnType;
4049 CHECK(chunk_type.is_boolean());
4051 const auto deleted_col_id = deleted_cd->columnId;
4054 const int64_t chunk_min =
4056 const int64_t chunk_max =
4058 if (chunk_min == 1 && chunk_max == 1) {
4076 double chunk_min{0.};
4077 double chunk_max{0.};
4081 if (chunk_min > chunk_max) {
4090 const auto rhs_val = rhs_type ==
kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4096 if (chunk_max < rhs_val) {
4101 if (chunk_max <= rhs_val) {
4106 if (chunk_min > rhs_val) {
4111 if (chunk_min >= rhs_val) {
4116 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4129 const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4130 const std::vector<uint64_t>& frag_offsets,
4131 const size_t frag_idx) {
4132 const int table_id = table_desc.
getTableId();
4137 <<
", fragment id: " << frag_idx;
4141 for (
const auto& simple_qual : simple_quals) {
4142 const auto comp_expr =
4150 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4154 CHECK(lhs_uexpr->get_optype() ==
4157 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
4164 const auto rhs = comp_expr->get_right_operand();
4170 if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4171 !lhs->get_type_info().is_fp()) {
4175 if (lhs->get_type_info().is_fp()) {
4176 const auto fragment_skip_status =
4178 switch (fragment_skip_status) {
4193 const int col_id = lhs_col->get_column_id();
4195 int64_t chunk_min{0};
4196 int64_t chunk_max{0};
4197 bool is_rowid{
false};
4198 size_t start_rowid{0};
4201 if (cd->isVirtualCol) {
4202 CHECK(cd->columnName ==
"rowid");
4204 start_rowid = table_generation.start_rowid;
4205 chunk_min = frag_offsets[frag_idx] + start_rowid;
4206 chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4210 const auto& chunk_type = lhs_col->get_type_info();
4216 if (chunk_min > chunk_max) {
4220 if (lhs->get_type_info().is_timestamp() &&
4221 (lhs_col->get_type_info().get_dimension() !=
4222 rhs_const->get_type_info().get_dimension()) &&
4223 (lhs_col->get_type_info().is_high_precision_timestamp() ||
4224 rhs_const->get_type_info().is_high_precision_timestamp())) {
4233 std::tie(is_valid, chunk_min, chunk_max) =
4235 chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4237 VLOG(4) <<
"Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4241 <<
"\nLHS col precision is: "
4243 <<
"\nRHS precision is: "
4244 <<
std::to_string(rhs_const->get_type_info().get_dimension()) <<
".";
4248 if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4253 chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4255 chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4257 llvm::LLVMContext local_context;
4258 CgenState local_cgen_state(local_context);
4261 const auto rhs_val =
4264 switch (comp_expr->get_optype()) {
4266 if (chunk_max < rhs_val) {
4271 if (chunk_max <= rhs_val) {
4276 if (chunk_min > rhs_val) {
4281 if (chunk_min >= rhs_val) {
4286 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4288 }
else if (is_rowid) {
4289 return {
false, rhs_val - start_rowid};
4327 const std::vector<uint64_t>& frag_offsets,
4328 const size_t frag_idx) {
4329 std::pair<bool, int64_t> skip_frag{
false, -1};
4330 for (
auto& inner_join : ra_exe_unit.
join_quals) {
4337 std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4338 for (
auto& qual : inner_join.quals) {
4340 inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4341 temp_qual.simple_quals.begin(),
4342 temp_qual.simple_quals.end());
4345 table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4346 if (temp_skip_frag.second != -1) {
4347 skip_frag.second = temp_skip_frag.second;
4350 skip_frag.first = skip_frag.first || temp_skip_frag.first;
4357 const std::unordered_set<PhysicalInput>& phys_inputs) {
4360 std::unordered_set<int> phys_table_ids;
4361 for (
const auto& phys_input : phys_inputs) {
4362 phys_table_ids.insert(phys_input.table_id);
4364 std::vector<InputTableInfo> query_infos;
4365 for (
const int table_id : phys_table_ids) {
4368 for (
const auto& phys_input : phys_inputs) {
4373 const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
4374 cd->columnType, phys_input.table_id, phys_input.col_id, 0);
4376 agg_col_range_cache.
setColRange(phys_input, col_range);
4379 return agg_col_range_cache;
4383 const std::unordered_set<PhysicalInput>& phys_inputs) {
4390 for (
const auto& phys_input : phys_inputs) {
4394 const auto& col_ti =
4395 cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4396 if (col_ti.is_string() && col_ti.get_compression() ==
kENCODING_DICT) {
4397 const int dict_id = col_ti.get_comp_param();
4399 CHECK(dd && dd->stringDict);
4401 dd->stringDict->storageEntryCount());
4404 return string_dictionary_generations;
4408 std::unordered_set<int> phys_table_ids) {
4410 for (
const int table_id : phys_table_ids) {
4414 TableGeneration{
static_cast<int64_t
>(table_info.getPhysicalNumTuples()), 0});
4416 return table_generations;
4420 const std::unordered_set<int>& phys_table_ids) {
4447 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4452 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4455 return !candidate_query_session.empty() &&
4462 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4467 ->second.getQueryStatus();
4469 return QuerySessionStatus::QueryStatus::UNDEFINED;
4473 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4479 const std::string& query_str,
4480 const std::string& query_submitted_time) {
4481 if (!query_session_id.empty()) {
4485 query_session_id, query_submitted_time,
executor_id_, write_lock);
4487 query_submitted_time,
4488 QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4491 return {query_session_id, query_str};
4498 if (query_session.empty()) {
4505 VLOG(1) <<
"Interrupting pending query is not available since the query session is "
4510 <<
"Interrupting pending query is not available since its interrupt flag is "
4521 const std::string& submitted_time_str) {
4524 if (query_session.empty()) {
4536 const std::string& submitted_time_str,
4540 if (query_session.empty()) {
4543 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4547 query_session, submitted_time_str, new_query_status, session_write_lock);
4552 const std::string& query_str,
4553 const std::string& submitted_time_str,
4554 const size_t executor_id,
4558 if (query_session.empty()) {
4566 query_session_status,
4567 session_write_lock);
4569 if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4580 const std::string& query_str,
4581 const std::string& submitted_time_str,
4582 const size_t executor_id,
4584 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4590 .emplace(submitted_time_str,
4598 .emplace(submitted_time_str,
4606 std::map<std::string, QuerySessionStatus> executor_per_query_map;
4607 executor_per_query_map.emplace(
4610 query_session, executor_id, query_str, submitted_time_str, query_status));
4618 const std::string& submitted_time_str,
4620 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4622 if (query_session.empty()) {
4627 auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4629 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4630 auto prev_status = query_status.second.getQueryStatus();
4631 if (prev_status == updated_query_status) {
4634 query_status.second.setQueryStatus(updated_query_status);
4644 const std::string& submitted_time_str,
4645 const size_t executor_id,
4646 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4648 if (query_session.empty()) {
4653 for (
auto it = storage.begin(); it != storage.end(); it++) {
4654 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4656 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4658 .at(submitted_time_str)
4659 .setExecutorId(executor_id);
4669 const std::string& submitted_time_str,
4670 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4671 if (query_session.empty()) {
4676 if (storage.size() > 1) {
4678 for (
auto it = storage.begin(); it != storage.end(); it++) {
4679 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4682 submitted_time_str.compare(target_submitted_t_str) == 0) {
4687 }
else if (storage.size() == 1) {
4703 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
4704 if (query_session.empty()) {
4714 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4715 if (query_session.empty()) {
4725 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4726 if (query_session.empty()) {
4733 const double runtime_query_check_freq,
4734 const unsigned pending_query_check_freq)
const {
4748 const size_t cache_value) {
4752 VLOG(1) <<
"Put estimated cardinality to the cache";
4760 VLOG(1) <<
"Reuse cached cardinality";
4768 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
4771 std::vector<QuerySessionStatus> ret;
4772 for (
auto& info : query_infos) {
4774 info.second.getExecutorId(),
4775 info.second.getQueryStr(),
4776 info.second.getQuerySubmittedTime(),
4777 info.second.getQueryStatus()));
4786 std::vector<size_t>
res;
4790 for (
auto& kv : it->second) {
4791 if (kv.second.getQueryStatus() ==
4792 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4793 res.push_back(kv.second.getExecutorId());
4853 std::stringstream ss;
4854 ss <<
"colRangeCache: ";
4856 ss <<
"{" << phys_input.col_id <<
", " << phys_input.table_id
4857 <<
"} = " << exp_range.toString() <<
", ";
4859 ss <<
"stringDictGenerations: ";
4860 for (
auto& [key, val] :
row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4861 ss <<
"{" << key <<
"} = " << val <<
", ";
4863 ss <<
"tableGenerations: ";
4865 ss <<
"{" << key <<
"} = {" << val.tuple_count <<
", " << val.start_rowid <<
"}, ";
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
const TableGeneration & getGeneration(const uint32_t id) const
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
static mapd_shared_mutex executor_session_mutex_
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
size_t getSlotCount() const
constexpr size_t kArenaBlockOverhead
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
std::vector< int > ChunkKey
double g_running_query_interrupt_freq
robin_hood::unordered_set< int64_t > CountDistinctSet
void reduce(SpeculativeTopNMap &that)
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
static QuerySessionMap queries_session_map_
CudaMgr_Namespace::CudaMgr * cudaMgr() const
static mapd_shared_mutex execute_mutex_
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
int64_t kernel_queue_time_ms_
size_t maxGpuSlabSize() const
HOST DEVICE int get_size() const
size_t getEntryCount() const
bool useCudaBuffers() const
Data_Namespace::DataMgr * data_mgr_
size_t getKeyCount() const
int32_t getErrorCode() const
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
size_t g_cpu_sub_task_size
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
void checkPendingQueryStatus(const QuerySessionId &query_session)
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
static const int32_t ERR_INTERRUPTED
class for a per-database catalog. also includes metadata for the current database and the current use...
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
void setEntryCount(const size_t val)
input_table_info_cache_(this)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
grid_size_x_(grid_size_x)
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
std::atomic< bool > interrupted_
static ResultSetRecyclerHolder resultset_recycler_holder_
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
Data_Namespace::DataMgr & getDataMgr() const
static const int max_gpu_count
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
bool with_dynamic_watchdog
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
HOST DEVICE int get_scale() const
std::pair< QuerySessionId, std::string > CurrentQueryStatus
static mapd_shared_mutex executors_cache_mutex_
const std::list< Analyzer::OrderEntry > order_entries
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs, const Catalog_Namespace::Catalog &catalog)
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
void clearMemory(const MemoryLevel memLevel)
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
ExpressionRange getColRange(const PhysicalInput &) const
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
TypeR::rep timer_stop(Type clock_begin)
Functions to support geospatial operations used by the executor.
QuerySessionId current_query_session_
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const int64_t rows_to_process=-1)
static const int32_t ERR_GEOS
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
AggregatedColRange agg_col_range_cache_
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
std::vector< FragmentInfo > fragments
std::unique_ptr< CgenState > cgen_state_
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
bool with_dynamic_watchdog
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
unsigned g_trivial_loop_join_threshold
static uint32_t gpu_active_modules_device_mask_
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
HOST DEVICE SQLTypes get_type() const
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
static void invalidateCaches()
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
int deviceCount(const ExecutorDeviceType) const
quantile::TDigest * nullTDigest(double const q)
bool isSharedMemoryUsed() const
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
void reset(bool discard_runtime_modules_only=false)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
unsigned numBlocksPerMP() const
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
Container for compilation results and assorted options for a single execution unit.
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
std::vector< FragmentsPerTable > FragmentsList
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t literalBytes(const CgenState::LiteralValue &lit)
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
bool filter_on_deleted_column
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const int source_dict_id, const int dest_dict_id, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
mapd_shared_mutex & getSessionLock()
bool g_enable_overlaps_hashjoin
bool checkNonKernelTimeInterrupted() const
size_t getRowSize() const
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
std::vector< Analyzer::Expr * > target_exprs_union
void populate_string_dictionary(const int32_t table_id, const int32_t col_id, const Catalog_Namespace::Catalog &catalog)
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
bool g_enable_string_functions
static const size_t high_scan_limit
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
bool isFragmentFullyDeleted(const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
SQLOps get_optype() const
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
const ExecutorId executor_id_
Data_Namespace::DataMgr & getDataMgr() const
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
const size_t max_gpu_slab_size_
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
std::string get_cuda_home(void)
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
int8_t groupColWidth(const size_t key_idx) const
bool containsPreFlightFn() const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
static SysCatalog & instance()
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
int getDeviceCount() const
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
int64_t deviceCycles(int milliseconds) const
D