19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
84 bool g_enable_filter_function{true};
85 unsigned g_dynamic_watchdog_time_limit{10000};
86 bool g_allow_cpu_retry{true};
87 bool g_allow_query_step_cpu_retry{true};
88 bool g_null_div_by_zero{false};
89 unsigned g_trivial_loop_join_threshold{1000};
90 bool g_from_table_reordering{true};
91 bool g_inner_join_fragment_skipping{true};
92 extern bool g_enable_smem_group_by;
93 extern std::unique_ptr<llvm::Module> udf_gpu_module;
94 extern std::unique_ptr<llvm::Module> udf_cpu_module;
95 bool g_enable_filter_push_down{false};
96 float g_filter_push_down_low_frac{-1.0f};
97 float g_filter_push_down_high_frac{-1.0f};
98 size_t g_filter_push_down_passing_row_ubound{0};
99 bool g_enable_columnar_output{false};
100 bool g_enable_left_join_filter_hoisting{true};
101 bool g_optimize_row_initialization{true};
102 bool g_enable_overlaps_hashjoin{true};
103 bool g_enable_distance_rangejoin{true};
104 bool g_enable_hashjoin_many_to_many{false};
105 size_t g_overlaps_max_table_size_bytes{1024 * 1024 * 1024};
106 double g_overlaps_target_entries_per_bin{1.3};
107 bool g_strip_join_covered_quals{false};
108 size_t g_constrained_by_in_threshold{10};
109 size_t g_default_max_groups_buffer_entry_guess{16384};
110 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
111 bool g_enable_window_functions{true};
112 bool g_enable_table_functions{true};
113 bool g_enable_dev_table_functions{false};
114 bool g_enable_geo_ops_on_uncompressed_coords{true};
115 bool g_enable_rf_prop_table_functions{true};
116 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
117 size_t g_min_memory_allocation_size{
118 256}; // minimum memory allocation required for projection query output buffer
119 // without pre-flight count
120 bool g_enable_bump_allocator{false};
121 double g_bump_allocator_step_reduction{0.75};
122 bool g_enable_direct_columnarization{true};
123 extern bool g_enable_string_functions;
124 bool g_enable_lazy_fetch{true};
125 bool g_enable_runtime_query_interrupt{true};
126 bool g_enable_non_kernel_time_query_interrupt{true};
127 bool g_use_estimator_result_cache{true};
128 unsigned g_pending_query_interrupt_freq{1000};
129 double g_running_query_interrupt_freq{0.1};
130 size_t g_gpu_smem_threshold{
131 4096}; // GPU shared memory threshold (in bytes), if larger
132 // buffer sizes are required we do not use GPU shared
133 // memory optimizations Setting this to 0 means unlimited
134 // (subject to other dynamically calculated caps)
135 bool g_enable_smem_grouped_non_count_agg{
136 true}; // enable use of shared memory when performing group-by with select non-count
138 bool g_enable_smem_non_grouped_agg{
139 true}; // enable optimizations for using GPU shared memory in implementation of
140 // non-grouped aggregates
141 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
142 // limits the allocation for the output buffer arena
143 // and data recycler test
144 size_t g_enable_parallel_linearization{
145 10000}; // # rows that we are trying to linearize varlen col in parallel
146 bool g_enable_data_recycler{true};
147 bool g_use_hashtable_cache{true};
148 bool g_use_query_resultset_cache{true};
149 bool g_use_chunk_metadata_cache{true};
150 bool g_allow_auto_resultset_caching{false};
151 bool g_allow_query_step_skipping{true};
152 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
153 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
154 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
155 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
156 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
157 bool g_optimize_cuda_block_and_grid_sizes{false};
159 size_t g_approx_quantile_buffer{1000};
160 size_t g_approx_quantile_centroids{300};
162 bool g_enable_automatic_ir_metadata{true};
164 size_t g_max_log_length{500};
166 extern bool g_cache_string_hash;
168 int const Executor::max_gpu_count;
170 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
172 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
174 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
175 const std::string& udf_ir_filename,
176 llvm::LLVMContext& ctx);
177 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
178 const std::string& udf_ir_filename,
179 llvm::LLVMContext& ctx,
180 bool is_gpu = false);
181 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
182 const std::string& udf_ir_string,
183 llvm::LLVMContext& ctx,
184 bool is_gpu = false);
187 // This function is notably different from that in RelAlgExecutor because it already
188 // expects SPI values and therefore needs to avoid that transformation.
189 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
190 for (const auto [col_id, table_id, db_id] : phys_inputs) {
191 foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
195 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
196 const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
197 // The fragmenter always returns at least one fragment, even when the table is empty.
198 return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
202 namespace foreign_storage {
203 // Foreign tables skip the population of dictionaries during metadata scan. This function
204 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
208 if (
const auto foreign_table = dynamic_cast<const ForeignTable*>(
209 catalog->getMetadataForTable(table_id,
false))) {
210 const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
211 if (col_desc->columnType.is_dict_encoded_type()) {
212 auto& fragmenter = foreign_table->fragmenter;
213 CHECK(fragmenter !=
nullptr);
217 for (
const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
218 ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
226 CHECK(metadata_map.find(col_id) != metadata_map.end());
227 if (
auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
231 &(catalog->getDataMgr()),
246 const size_t block_size_x,
247 const size_t grid_size_x,
248 const size_t max_gpu_slab_size,
249 const std::string& debug_dir,
250 const std::string& debug_file)
251 : executor_id_(executor_id)
252 , context_(new llvm::LLVMContext())
263 update_extension_modules();
271 auto template_path = root_path +
"/QueryEngine/RuntimeFunctions.bc";
272 CHECK(boost::filesystem::exists(template_path));
276 auto rt_geos_path = root_path +
"/QueryEngine/GeosRuntime.bc";
277 CHECK(boost::filesystem::exists(rt_geos_path));
283 if (boost::filesystem::exists(rt_libdevice_path)) {
288 <<
" does not exist; support for some UDF "
289 "functions might not be available.";
298 qe->s_code_accessor->clear();
299 qe->s_stubs_accessor->clear();
300 qe->cpu_code_accessor->clear();
301 qe->gpu_code_accessor->clear();
302 qe->tf_code_accessor->clear();
304 if (discard_runtime_modules_only) {
309 cgen_state_->module_ =
nullptr;
311 extension_modules_.clear();
313 context_.reset(
new llvm::LLVMContext());
314 cgen_state_.reset(
new CgenState({},
false,
this));
320 const std::string& source) {
326 CHECK(!source.empty());
327 switch (module_kind) {
347 return std::unique_ptr<llvm::Module>();
352 bool erase_not_found =
false) {
355 auto llvm_module = read_module(module_kind, it->second);
357 extension_modules_[module_kind] = std::move(llvm_module);
358 }
else if (erase_not_found) {
359 extension_modules_.erase(module_kind);
361 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
363 <<
" LLVM module. The module will be unavailable.";
366 <<
" LLVM module. Using the existing module.";
370 if (erase_not_found) {
371 extension_modules_.erase(module_kind);
373 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
375 <<
" LLVM module is unavailable. The module will be unavailable.";
378 <<
" LLVM module is unavailable. Using the existing module.";
384 if (!update_runtime_modules_only) {
410 , cgen_state_(std::move(
executor_.cgen_state_))
418 const bool allow_lazy_fetch,
419 const std::vector<InputTableInfo>& query_infos,
432 executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
437 for (
auto& p :
executor_.cgen_state_->row_func_hoisted_literals_) {
438 auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
439 if (inst && inst->getNumUses() == 0 && inst->getParent() ==
nullptr) {
442 p.first->deleteValue();
445 executor_.cgen_state_->row_func_hoisted_literals_.clear();
451 for (
auto& str_dict_translation_mgr :
452 executor_.cgen_state_->str_dict_translation_mgrs_) {
453 cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
455 executor_.cgen_state_->str_dict_translation_mgrs_.clear();
457 for (
auto& bm :
executor_.cgen_state_->in_values_bitmaps_) {
460 executor_.cgen_state_->in_values_bitmaps_.clear();
477 const std::string& debug_dir,
478 const std::string& debug_file,
486 auto executor = std::make_shared<Executor>(executor_id,
493 CHECK(
executors_.insert(std::make_pair(executor_id, executor)).second);
498 switch (memory_level) {
516 throw std::runtime_error(
517 "Clearing memory levels other than the CPU level or GPU level is not "
529 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
530 const bool with_generation)
const {
531 CHECK(row_set_mem_owner);
532 std::lock_guard<std::mutex> lock(
534 return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
539 const bool with_generation) {
545 const auto dd = catalog->getMetadataForDict(dict_id);
547 auto dict_key = dict_key_in;
549 CHECK(dd->stringDict);
551 const int64_t generation =
552 with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
553 return addStringDict(dd->stringDict, dict_key, generation);
557 if (!lit_str_dict_proxy_) {
559 std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
561 lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
564 return lit_str_dict_proxy_.get();
571 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
572 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
573 const bool with_generation)
const {
574 CHECK(row_set_mem_owner);
575 std::lock_guard<std::mutex> lock(
577 return row_set_mem_owner->getOrAddStringProxyTranslationMap(
578 source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
585 const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
586 const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
587 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
const {
588 CHECK(row_set_mem_owner);
589 std::lock_guard<std::mutex> lock(
592 if (!dest_string_op_infos.empty()) {
593 row_set_mem_owner->addStringProxyUnionTranslationMap(
594 dest_proxy, dest_proxy, dest_string_op_infos);
596 return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
597 source_proxy, dest_proxy, source_string_op_infos);
603 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
604 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
605 const bool with_generation)
const {
606 CHECK(row_set_mem_owner);
607 std::lock_guard<std::mutex> lock(
609 return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
610 source_dict_key, with_generation, string_op_infos);
616 const bool with_generation,
618 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
619 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
620 const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
622 return addStringProxyIntersectionTranslationMap(
623 source_proxy, dest_proxy, string_op_infos);
625 return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
632 const bool with_generation,
633 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
634 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
635 return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
639 std::lock_guard<std::mutex> lock(state_mutex_);
641 .emplace_back(std::make_unique<quantile::TDigest>(
660 if (!cd || n > cd->columnType.get_physical_cols()) {
691 const std::set<shared::TableKey>& table_ids_to_fetch)
const {
692 size_t num_bytes = 0;
696 for (
const auto& fetched_col :
plan_state_->columns_to_fetch_) {
697 if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
701 if (fetched_col.table_id < 0) {
705 {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
710 if (!ti.is_logical_geo_type()) {
724 const std::vector<Analyzer::Expr*>& target_exprs)
const {
726 for (
const auto target_expr : target_exprs) {
735 const std::vector<Analyzer::Expr*>& target_exprs)
const {
737 std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
738 for (
const auto target_expr : target_exprs) {
739 if (!
plan_state_->isLazyFetchColumn(target_expr)) {
740 col_lazy_fetch_info.emplace_back(
745 auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
747 if (cd &&
IS_GEO(cd->columnType.get_type())) {
751 auto col_key = col_var->getColumnKey();
752 col_key.column_id += 1;
754 const auto col0_ti = cd0->columnType;
755 CHECK(!cd0->isVirtualCol);
756 const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
757 const auto local_col0_id =
plan_state_->getLocalColumnId(col0_var.get(),
false);
758 col_lazy_fetch_info.emplace_back(
762 auto local_col_id =
plan_state_->getLocalColumnId(col_var,
false);
763 const auto& col_ti = col_var->get_type_info();
768 return col_lazy_fetch_info;
778 const std::unordered_map<int, CgenState::LiteralValues>& literals,
779 const int device_id) {
780 if (literals.empty()) {
783 const auto dev_literals_it = literals.find(device_id);
784 CHECK(dev_literals_it != literals.end());
785 const auto& dev_literals = dev_literals_it->second;
786 size_t lit_buf_size{0};
787 std::vector<std::string> real_strings;
788 std::vector<std::vector<double>> double_array_literals;
789 std::vector<std::vector<int8_t>> align64_int8_array_literals;
790 std::vector<std::vector<int32_t>> int32_array_literals;
791 std::vector<std::vector<int8_t>> align32_int8_array_literals;
792 std::vector<std::vector<int8_t>> int8_array_literals;
793 for (
const auto& lit : dev_literals) {
795 if (lit.which() == 7) {
796 const auto p = boost::get<std::string>(&lit);
798 real_strings.push_back(*p);
799 }
else if (lit.which() == 8) {
800 const auto p = boost::get<std::vector<double>>(&lit);
802 double_array_literals.push_back(*p);
803 }
else if (lit.which() == 9) {
804 const auto p = boost::get<std::vector<int32_t>>(&lit);
806 int32_array_literals.push_back(*p);
807 }
else if (lit.which() == 10) {
808 const auto p = boost::get<std::vector<int8_t>>(&lit);
810 int8_array_literals.push_back(*p);
811 }
else if (lit.which() == 11) {
812 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
814 if (p->second == 64) {
815 align64_int8_array_literals.push_back(p->first);
816 }
else if (p->second == 32) {
817 align32_int8_array_literals.push_back(p->first);
823 if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
826 int16_t crt_real_str_off = lit_buf_size;
827 for (
const auto& real_str : real_strings) {
828 CHECK_LE(real_str.size(),
static_cast<size_t>(std::numeric_limits<int16_t>::max()));
829 lit_buf_size += real_str.size();
831 if (double_array_literals.size() > 0) {
832 lit_buf_size =
align(lit_buf_size,
sizeof(
double));
834 int16_t crt_double_arr_lit_off = lit_buf_size;
835 for (
const auto& double_array_literal : double_array_literals) {
836 CHECK_LE(double_array_literal.size(),
837 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
838 lit_buf_size += double_array_literal.size() *
sizeof(double);
840 if (align64_int8_array_literals.size() > 0) {
841 lit_buf_size =
align(lit_buf_size,
sizeof(uint64_t));
843 int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
844 for (
const auto& align64_int8_array_literal : align64_int8_array_literals) {
845 CHECK_LE(align64_int8_array_literals.size(),
846 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
847 lit_buf_size += align64_int8_array_literal.size();
849 if (int32_array_literals.size() > 0) {
850 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
852 int16_t crt_int32_arr_lit_off = lit_buf_size;
853 for (
const auto& int32_array_literal : int32_array_literals) {
854 CHECK_LE(int32_array_literal.size(),
855 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
856 lit_buf_size += int32_array_literal.size() *
sizeof(int32_t);
858 if (align32_int8_array_literals.size() > 0) {
859 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
861 int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
862 for (
const auto& align32_int8_array_literal : align32_int8_array_literals) {
863 CHECK_LE(align32_int8_array_literals.size(),
864 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
865 lit_buf_size += align32_int8_array_literal.size();
867 int16_t crt_int8_arr_lit_off = lit_buf_size;
868 for (
const auto& int8_array_literal : int8_array_literals) {
870 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
871 lit_buf_size += int8_array_literal.size();
873 unsigned crt_real_str_idx = 0;
874 unsigned crt_double_arr_lit_idx = 0;
875 unsigned crt_align64_int8_arr_lit_idx = 0;
876 unsigned crt_int32_arr_lit_idx = 0;
877 unsigned crt_align32_int8_arr_lit_idx = 0;
878 unsigned crt_int8_arr_lit_idx = 0;
879 std::vector<int8_t> serialized(lit_buf_size);
881 for (
const auto& lit : dev_literals) {
884 switch (lit.which()) {
886 const auto p = boost::get<int8_t>(&lit);
888 serialized[off - lit_bytes] = *p;
892 const auto p = boost::get<int16_t>(&lit);
894 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
898 const auto p = boost::get<int32_t>(&lit);
900 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
904 const auto p = boost::get<int64_t>(&lit);
906 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
910 const auto p = boost::get<float>(&lit);
912 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
916 const auto p = boost::get<double>(&lit);
918 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
922 const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
930 memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
934 const auto p = boost::get<std::string>(&lit);
936 int32_t off_and_len = crt_real_str_off << 16;
937 const auto& crt_real_str = real_strings[crt_real_str_idx];
938 off_and_len |=
static_cast<int16_t
>(crt_real_str.size());
939 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
940 memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
942 crt_real_str_off += crt_real_str.size();
946 const auto p = boost::get<std::vector<double>>(&lit);
948 int32_t off_and_len = crt_double_arr_lit_off << 16;
949 const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
950 int32_t len = crt_double_arr_lit.size();
952 off_and_len |=
static_cast<int16_t
>(len);
953 int32_t double_array_bytesize = len *
sizeof(double);
954 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
955 memcpy(&serialized[crt_double_arr_lit_off],
956 crt_double_arr_lit.data(),
957 double_array_bytesize);
958 ++crt_double_arr_lit_idx;
959 crt_double_arr_lit_off += double_array_bytesize;
963 const auto p = boost::get<std::vector<int32_t>>(&lit);
965 int32_t off_and_len = crt_int32_arr_lit_off << 16;
966 const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
967 int32_t len = crt_int32_arr_lit.size();
969 off_and_len |=
static_cast<int16_t
>(len);
970 int32_t int32_array_bytesize = len *
sizeof(int32_t);
971 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
972 memcpy(&serialized[crt_int32_arr_lit_off],
973 crt_int32_arr_lit.data(),
974 int32_array_bytesize);
975 ++crt_int32_arr_lit_idx;
976 crt_int32_arr_lit_off += int32_array_bytesize;
980 const auto p = boost::get<std::vector<int8_t>>(&lit);
982 int32_t off_and_len = crt_int8_arr_lit_off << 16;
983 const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
984 int32_t len = crt_int8_arr_lit.size();
986 off_and_len |=
static_cast<int16_t
>(len);
987 int32_t int8_array_bytesize = len;
988 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
989 memcpy(&serialized[crt_int8_arr_lit_off],
990 crt_int8_arr_lit.data(),
991 int8_array_bytesize);
992 ++crt_int8_arr_lit_idx;
993 crt_int8_arr_lit_off += int8_array_bytesize;
997 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
999 if (p->second == 64) {
1000 int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1001 const auto& crt_align64_int8_arr_lit =
1002 align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1003 int32_t len = crt_align64_int8_arr_lit.size();
1005 off_and_len |=
static_cast<int16_t
>(len);
1006 int32_t align64_int8_array_bytesize = len;
1007 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1008 memcpy(&serialized[crt_align64_int8_arr_lit_off],
1009 crt_align64_int8_arr_lit.data(),
1010 align64_int8_array_bytesize);
1011 ++crt_align64_int8_arr_lit_idx;
1012 crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1013 }
else if (p->second == 32) {
1014 int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1015 const auto& crt_align32_int8_arr_lit =
1016 align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1017 int32_t len = crt_align32_int8_arr_lit.size();
1019 off_and_len |=
static_cast<int16_t
>(len);
1020 int32_t align32_int8_array_bytesize = len;
1021 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1022 memcpy(&serialized[crt_align32_int8_arr_lit_off],
1023 crt_align32_int8_arr_lit.data(),
1024 align32_int8_array_bytesize);
1025 ++crt_align32_int8_arr_lit_idx;
1026 crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1056 const int64_t agg_init_val,
1057 const int8_t out_byte_width,
1058 const int64_t* out_vec,
1059 const size_t out_vec_sz,
1060 const bool is_group_by,
1061 const bool float_argument_input) {
1066 if (0 != agg_init_val) {
1068 int64_t agg_result = agg_init_val;
1069 for (
size_t i = 0; i < out_vec_sz; ++i) {
1072 return {agg_result, 0};
1075 switch (out_byte_width) {
1077 int agg_result =
static_cast<int32_t
>(agg_init_val);
1078 for (
size_t i = 0; i < out_vec_sz; ++i) {
1081 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1082 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1084 const int64_t converted_bin =
1085 float_argument_input
1086 ?
static_cast<int64_t
>(agg_result)
1088 return {converted_bin, 0};
1092 int64_t agg_result = agg_init_val;
1093 for (
size_t i = 0; i < out_vec_sz; ++i) {
1096 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1097 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1099 return {agg_result, 0};
1108 int64_t agg_result = 0;
1109 for (
size_t i = 0; i < out_vec_sz; ++i) {
1110 agg_result += out_vec[i];
1112 return {agg_result, 0};
1115 switch (out_byte_width) {
1118 for (
size_t i = 0; i < out_vec_sz; ++i) {
1119 r += *
reinterpret_cast<const float*
>(may_alias_ptr(&out_vec[i]));
1121 const auto float_bin = *
reinterpret_cast<const int32_t*
>(may_alias_ptr(&r));
1122 const int64_t converted_bin =
1124 return {converted_bin, 0};
1128 for (
size_t i = 0; i < out_vec_sz; ++i) {
1129 r += *
reinterpret_cast<const double*
>(may_alias_ptr(&out_vec[i]));
1131 return {*
reinterpret_cast<const int64_t*
>(may_alias_ptr(&r)), 0};
1140 uint64_t agg_result = 0;
1141 for (
size_t i = 0; i < out_vec_sz; ++i) {
1142 const uint64_t out =
static_cast<uint64_t
>(out_vec[i]);
1145 return {
static_cast<int64_t
>(agg_result), 0};
1149 int64_t agg_result = agg_init_val;
1150 for (
size_t i = 0; i < out_vec_sz; ++i) {
1153 return {agg_result, 0};
1155 switch (out_byte_width) {
1157 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1158 for (
size_t i = 0; i < out_vec_sz; ++i) {
1161 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1162 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1164 const int64_t converted_bin =
1165 float_argument_input
1166 ?
static_cast<int64_t
>(agg_result)
1168 return {converted_bin, 0};
1171 int64_t agg_result = agg_init_val;
1172 for (
size_t i = 0; i < out_vec_sz; ++i) {
1175 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1176 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1178 return {agg_result, 0};
1187 int64_t agg_result = agg_init_val;
1188 for (
size_t i = 0; i < out_vec_sz; ++i) {
1191 return {agg_result, 0};
1193 switch (out_byte_width) {
1195 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1196 for (
size_t i = 0; i < out_vec_sz; ++i) {
1199 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1200 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1202 const int64_t converted_bin =
1203 float_argument_input ?
static_cast<int64_t
>(agg_result)
1205 return {converted_bin, 0};
1208 int64_t agg_result = agg_init_val;
1209 for (
size_t i = 0; i < out_vec_sz; ++i) {
1212 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1213 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1215 return {agg_result, 0};
1222 int64_t agg_result = agg_init_val;
1223 for (
size_t i = 0; i < out_vec_sz; ++i) {
1224 if (out_vec[i] != agg_init_val) {
1225 if (agg_result == agg_init_val) {
1226 agg_result = out_vec[i];
1227 }
else if (out_vec[i] != agg_result) {
1232 return {agg_result, 0};
1235 int64_t agg_result = agg_init_val;
1236 for (
size_t i = 0; i < out_vec_sz; ++i) {
1237 if (out_vec[i] != agg_init_val) {
1238 agg_result = out_vec[i];
1242 return {agg_result, 0};
1253 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1254 std::vector<TargetInfo>
const& targets) {
1255 auto& first = results_per_device.front().first;
1258 if (first_target_idx) {
1259 first->translateDictEncodedColumns(targets, *first_target_idx);
1261 for (
size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1262 const auto& next = results_per_device[dev_idx].first;
1264 if (first_target_idx) {
1265 next->translateDictEncodedColumns(targets, *first_target_idx);
1267 first->append(*next);
1269 return std::move(first);
1284 auto const targets = shared::transform<std::vector<TargetInfo>>(
1286 if (results_per_device.empty()) {
1287 return std::make_shared<ResultSet>(targets,
1294 using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1296 results_per_device.end(),
1297 [](
const IndexedResultSet& lhs,
const IndexedResultSet& rhs) {
1298 CHECK_GE(lhs.second.size(), size_t(1));
1299 CHECK_GE(rhs.second.size(), size_t(1));
1300 return lhs.second.front() < rhs.second.front();
1308 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1309 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1316 if (results_per_device.empty()) {
1317 auto const targets = shared::transform<std::vector<TargetInfo>>(
1319 return std::make_shared<ResultSet>(targets,
1336 const size_t executor_id,
1337 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1338 int64_t* compilation_queue_time) {
1341 *compilation_queue_time =
timer_stop(clock_begin);
1342 const auto& this_result_set = results_per_device[0].first;
1344 this_result_set->getTargetInfos(),
1345 this_result_set->getTargetInitVals(),
1347 return reduction_jit.
codegen();
1353 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1354 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1357 std::shared_ptr<ResultSet> reduced_results;
1359 const auto& first = results_per_device.front().first;
1363 results_per_device.size() > 1) {
1365 results_per_device.begin(),
1366 results_per_device.end(),
1368 [](
const size_t init,
const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1369 const auto& r = rs.first;
1370 return init + r->getQueryMemDesc().getEntryCount();
1372 CHECK(total_entry_count);
1373 auto query_mem_desc = first->getQueryMemDesc();
1375 reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1381 auto result_storage = reduced_results->allocateStorage(
plan_state_->init_agg_vals_);
1382 reduced_results->initializeStorage();
1383 switch (query_mem_desc.getEffectiveKeyWidth()) {
1385 first->getStorage()->moveEntriesToBuffer<int32_t>(
1386 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1389 first->getStorage()->moveEntriesToBuffer<int64_t>(
1390 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1396 reduced_results = first;
1399 int64_t compilation_queue_time = 0;
1400 const auto reduction_code =
1403 for (
size_t i = 1; i < results_per_device.size(); ++i) {
1404 reduced_results->getStorage()->reduce(
1405 *(results_per_device[i].first->getStorage()), {}, reduction_code,
executor_id_);
1407 reduced_results->addCompilationQueueTime(compilation_queue_time);
1408 reduced_results->invalidateCachedRowCount();
1409 return reduced_results;
1414 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1415 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1417 if (results_per_device.size() == 1) {
1418 return std::move(results_per_device.front().first);
1422 for (
const auto&
result : results_per_device) {
1423 auto rows =
result.first;
1431 std::max(
size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1436 return m.
asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc,
this, top_n, desc);
1441 std::unordered_set<int> available_gpus;
1446 for (
int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1447 available_gpus.insert(gpu_id);
1450 return available_gpus;
1454 const size_t cpu_count,
1455 const size_t gpu_count) {
1457 :
static_cast<size_t>(cpu_count);
1468 using checked_size_t = boost::multiprecision::number<
1469 boost::multiprecision::cpp_int_backend<64,
1471 boost::multiprecision::unsigned_magnitude,
1472 boost::multiprecision::checked,
1474 checked_size_t max_groups_buffer_entry_guess = 1;
1475 for (
const auto& query_info : query_infos) {
1476 CHECK(!query_info.info.fragments.empty());
1477 auto it = std::max_element(query_info.info.fragments.begin(),
1478 query_info.info.fragments.end(),
1479 [](
const FragmentInfo& f1,
const FragmentInfo& f2) {
1480 return f1.getNumTuples() < f2.getNumTuples();
1482 max_groups_buffer_entry_guess *= it->getNumTuples();
1486 constexpr
size_t max_groups_buffer_entry_guess_cap = 100000000;
1488 return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1489 max_groups_buffer_entry_guess_cap);
1491 return max_groups_buffer_entry_guess_cap;
1502 return td->tableName;
1509 const int device_count) {
1517 const std::vector<InputTableInfo>& table_infos,
1519 const int device_count) {
1520 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1521 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1525 if (!ra_exe_unit.
scan_limit && table_infos.size() == 1 &&
1539 std::vector<std::string> table_names;
1540 const auto& input_descs = ra_exe_unit.
input_descs;
1541 for (
const auto& input_desc : input_descs) {
1546 "Projection query would require a scan without a limit on table(s): " +
1550 "Projection query output result set on table(s): " +
1553 " rows, which is more than the current system limit of " +
1563 const auto inner_table_key = ra_exe_unit.
input_descs.back().getTableKey();
1565 std::optional<size_t> inner_table_idx;
1566 for (
size_t i = 0; i < query_infos.size(); ++i) {
1567 if (query_infos[i].table_key == inner_table_key) {
1568 inner_table_idx = i;
1572 CHECK(inner_table_idx);
1573 return query_infos[*inner_table_idx].info.getNumTuples();
1578 template <
typename T>
1580 std::vector<std::string> expr_strs;
1581 for (
const auto& expr : expr_container) {
1583 expr_strs.emplace_back(
"NULL");
1585 expr_strs.emplace_back(expr->toString());
1593 const std::list<Analyzer::OrderEntry>& expr_container) {
1594 std::vector<std::string> expr_strs;
1595 for (
const auto& expr : expr_container) {
1596 expr_strs.emplace_back(expr.toString());
1602 switch (algorithm) {
1606 return "Speculative Top N";
1608 return "Streaming Top N";
1619 std::ostringstream os;
1621 const auto& scan_desc = input_col_desc->getScanDesc();
1622 os << scan_desc.getTableKey() <<
"," << input_col_desc->getColId() <<
","
1623 << scan_desc.getNestLevel();
1628 os << qual->toString() <<
",";
1632 if (!ra_exe_unit.
quals.empty()) {
1633 for (
const auto& qual : ra_exe_unit.
quals) {
1635 os << qual->toString() <<
",";
1640 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1641 const auto& join_condition = ra_exe_unit.
join_quals[i];
1643 for (
const auto& qual : join_condition.quals) {
1645 os << qual->toString() <<
",";
1653 os << qual->toString() <<
",";
1659 os << expr->toString() <<
",";
1669 os <<
"\n\tTable/Col/Levels: ";
1671 const auto& scan_desc = input_col_desc->getScanDesc();
1672 os <<
"(" << scan_desc.getTableKey() <<
", " << input_col_desc->getColId() <<
", "
1673 << scan_desc.getNestLevel() <<
") ";
1676 os <<
"\n\tSimple Quals: "
1680 if (!ra_exe_unit.
quals.empty()) {
1685 os <<
"\n\tJoin Quals: ";
1686 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1687 const auto& join_condition = ra_exe_unit.
join_quals[i];
1693 os <<
"\n\tGroup By: "
1697 os <<
"\n\tProjected targets: "
1700 os <<
"\n\tSort Info: ";
1701 const auto& sort_info = ra_exe_unit.
sort_info;
1702 os <<
"\n\t Order Entries: "
1710 os <<
"\n\tUnion: " << std::string(*ra_exe_unit.
union_all ?
"UNION ALL" :
"UNION");
1718 const size_t new_scan_limit) {
1722 ra_exe_unit_in.
quals,
1743 const std::vector<InputTableInfo>& query_infos,
1748 const bool has_cardinality_estimation,
1750 VLOG(1) <<
"Executor " <<
executor_id_ <<
" is executing work unit:" << ra_exe_unit_in;
1772 has_cardinality_estimation,
1778 result->setValidationOnlyRes();
1793 has_cardinality_estimation,
1799 result->setValidationOnlyRes();
1807 size_t& max_groups_buffer_entry_guess,
1809 const bool allow_single_frag_table_opt,
1810 const std::vector<InputTableInfo>& query_infos,
1814 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1816 const bool has_cardinality_estimation,
1819 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1821 CHECK(!query_infos.empty());
1822 if (!max_groups_buffer_entry_guess) {
1838 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1839 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1844 query_mem_desc_owned =
1845 query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1847 has_cardinality_estimation,
1863 CHECK(query_mem_desc_owned);
1864 crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1871 plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1872 CHECK(!query_mem_desc_owned);
1873 query_mem_desc_owned.reset(
1880 for (
const auto target_expr : ra_exe_unit.target_exprs) {
1881 plan_state_->target_exprs_.push_back(target_expr);
1888 const auto context_count =
1897 allow_single_frag_table_opt,
1899 *query_comp_desc_owned,
1900 *query_mem_desc_owned,
1905 shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
1915 static_cast<size_t>(crt_min_byte_width << 1) <=
sizeof(int64_t)) {
1916 crt_min_byte_width <<= 1;
1925 std::string curRunningSession{
""};
1926 std::string curRunningQuerySubmittedTime{
""};
1927 bool sessionEnrolled =
false;
1932 curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
1936 if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
1939 curRunningQuerySubmittedTime,
1946 *query_mem_desc_owned,
1947 query_comp_desc_owned->getDeviceType(),
1952 crt_min_byte_width <<= 1;
1956 <<
", what(): " << e.what();
1962 }
while (static_cast<size_t>(crt_min_byte_width) <=
sizeof(int64_t));
1964 return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1979 const std::set<size_t>& fragment_indexes_param) {
1980 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1983 std::vector<InputTableInfo> table_infos{table_info};
1987 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1988 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1990 query_mem_desc_owned =
1991 query_comp_desc_owned->compile(0,
2003 CHECK(query_mem_desc_owned);
2004 CHECK_EQ(
size_t(1), ra_exe_unit.input_descs.size());
2005 const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2008 std::set<size_t> fragment_indexes;
2009 if (fragment_indexes_param.empty()) {
2013 for (
size_t i = 0; i < outer_fragments.size(); i++) {
2014 fragment_indexes.emplace(i);
2017 fragment_indexes = fragment_indexes_param;
2025 for (
auto fragment_index : fragment_indexes) {
2028 FragmentsList fragments_list{{table_key, {fragment_index}}};
2034 *query_comp_desc_owned,
2035 *query_mem_desc_owned,
2040 kernel.
run(
this, 0, kernel_context);
2046 for (
const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2047 CHECK_EQ(result_fragment_indexes.size(), 1);
2048 cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2054 const std::vector<InputTableInfo>& table_infos,
2064 return std::make_shared<ResultSet>(
2088 std::shared_ptr<CompilationContext> compilation_context;
2097 compilation_context =
2098 tf_compilation_context.
compile(exe_unit,
true );
2102 compilation_context,
2108 std::shared_ptr<CompilationContext> compilation_context;
2116 compilation_context =
2117 tf_compilation_context.compile(exe_unit,
false );
2119 return exe_context.
execute(exe_unit,
2121 compilation_context,
2129 return std::make_shared<ResultSet>(query_comp_desc.
getIR());
2134 const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2138 [
this, &dict_id_visitor, &row_set_mem_owner](
const Analyzer::Expr* expr) {
2142 const auto& dict_key = dict_id_visitor.
visit(expr);
2143 if (dict_key.dict_id >= 0) {
2147 visitor.
visit(expr);
2152 visit_expr(group_expr.get());
2155 for (
const auto& group_expr : ra_exe_unit.
quals) {
2156 visit_expr(group_expr.get());
2159 for (
const auto& group_expr : ra_exe_unit.
simple_quals) {
2160 visit_expr(group_expr.get());
2163 const auto visit_target_expr = [&](
const Analyzer::Expr* target_expr) {
2164 const auto& target_type = target_expr->get_type_info();
2165 if (!target_type.is_string() || target_type.get_compression() ==
kENCODING_DICT) {
2169 if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() ==
kSINGLE_VALUE ||
2170 agg_expr->get_aggtype() ==
kSAMPLE || agg_expr->get_aggtype() ==
kMODE) {
2171 visit_expr(agg_expr->get_arg());
2174 visit_expr(target_expr);
2179 std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2181 std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2187 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
2191 if ((agg_info.agg_kind ==
kAVG || agg_info.agg_kind ==
kSUM ||
2192 agg_info.agg_kind ==
kSUM_IF) &&
2193 agg_info.agg_arg_type.get_type() ==
kDOUBLE) {
2197 if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2201 return requested_device_type;
2210 int64_t float_null_val = 0;
2211 *
reinterpret_cast<float*
>(may_alias_ptr(&float_null_val)) =
2213 return float_null_val;
2216 return *
reinterpret_cast<const int64_t*
>(may_alias_ptr(&double_null_val));
2222 std::vector<int64_t>& entry,
2223 const std::vector<Analyzer::Expr*>& target_exprs,
2225 for (
size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2226 const auto target_expr = target_exprs[target_idx];
2228 CHECK(agg_info.is_agg);
2229 target_infos.push_back(agg_info);
2231 const auto executor = query_mem_desc.
getExecutor();
2233 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2234 CHECK(row_set_mem_owner);
2235 const auto& count_distinct_desc =
2238 CHECK(row_set_mem_owner);
2239 auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2240 count_distinct_desc.bitmapPaddedSizeBytes(),
2242 entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2247 CHECK(row_set_mem_owner);
2248 row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2249 entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2254 if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2256 }
else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2259 }
else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2260 if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2261 for (
int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2264 }
else if (agg_info.sql_type.is_varlen()) {
2268 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2271 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2277 const std::vector<Analyzer::Expr*>& target_exprs_in,
2280 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2281 std::vector<Analyzer::Expr*> target_exprs;
2282 for (
const auto target_expr : target_exprs_in) {
2283 const auto target_expr_copy =
2285 CHECK(target_expr_copy);
2286 auto ti = target_expr->get_type_info();
2288 target_expr_copy->set_type_info(ti);
2289 if (target_expr_copy->get_arg()) {
2290 auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2291 arg_ti.set_notnull(
false);
2292 target_expr_copy->get_arg()->set_type_info(arg_ti);
2294 target_exprs_owned_copies.push_back(target_expr_copy);
2295 target_exprs.push_back(target_expr_copy.get());
2297 std::vector<TargetInfo> target_infos;
2298 std::vector<int64_t> entry;
2300 const auto executor = query_mem_desc.
getExecutor();
2302 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2303 CHECK(row_set_mem_owner);
2304 auto rs = std::make_shared<ResultSet>(target_infos,
2308 executor->blockSize(),
2309 executor->gridSize());
2310 rs->allocateStorage();
2311 rs->fillOneEntry(entry);
2322 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2328 ra_exe_unit.
target_exprs, query_mem_desc, device_type);
2333 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2334 }
catch (
const std::bad_alloc&) {
2338 const auto shard_count =
2343 if (shard_count && !result_per_device.empty()) {
2347 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2364 size_t output_row_index,
2366 const std::vector<uint32_t>& top_permutation) {
2369 for (
const auto sorted_idx : top_permutation) {
2371 for (
size_t group_idx = 0; group_idx < input_query_mem_desc.
getKeyCount();
2373 const auto input_column_ptr =
2376 const auto output_column_ptr =
2379 output_row_index * output_query_mem_desc.
groupColWidth(group_idx);
2380 memcpy(output_column_ptr,
2385 for (
size_t slot_idx = 0; slot_idx < input_query_mem_desc.
getSlotCount();
2387 const auto input_column_ptr =
2390 const auto output_column_ptr =
2393 memcpy(output_column_ptr,
2399 return output_row_index;
2413 size_t output_row_index,
2415 const std::vector<uint32_t>& top_permutation) {
2418 for (
const auto sorted_idx : top_permutation) {
2419 const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.
getRowSize();
2420 memcpy(output_buffer + output_row_index * output_query_mem_desc.
getRowSize(),
2425 return output_row_index;
2436 const auto first_result_set = result_per_device.front().first;
2437 CHECK(first_result_set);
2438 auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2439 CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2441 top_query_mem_desc.setEntryCount(0);
2442 for (
auto&
result : result_per_device) {
2443 const auto result_set =
result.first;
2446 size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2447 top_query_mem_desc.setEntryCount(new_entry_cnt);
2449 auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2450 first_result_set->getDeviceType(),
2452 first_result_set->getRowSetMemOwner(),
2455 auto top_storage = top_result_set->allocateStorage();
2456 size_t top_output_row_idx{0};
2457 for (
auto&
result : result_per_device) {
2458 const auto result_set =
result.first;
2460 const auto& top_permutation = result_set->getPermutationBuffer();
2461 CHECK_LE(top_permutation.size(), top_n);
2462 if (top_query_mem_desc.didOutputColumnar()) {
2464 result_set->getQueryMemDesc(),
2477 CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2478 return top_result_set;
2481 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2483 std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2485 CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2486 for (
size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2487 const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2489 std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2497 for (
const auto& col : fetched_cols) {
2498 if (col.is_lazily_fetched) {
2511 const std::vector<InputTableInfo>& table_infos,
2514 const bool allow_single_frag_table_opt,
2515 const size_t context_count,
2519 std::unordered_set<int>& available_gpus,
2520 int& available_cpus) {
2521 std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2528 : std::vector<Data_Namespace::MemoryInfo>{},
2534 const bool uses_lazy_fetch =
2539 const auto device_count =
deviceCount(device_type);
2542 fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2546 use_multifrag_kernel,
2549 if (eo.
with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2553 if (use_multifrag_kernel) {
2554 VLOG(1) <<
"Creating multifrag execution kernels";
2562 auto multifrag_kernel_dispatch = [&ra_exe_unit,
2568 render_info](
const int device_id,
2570 const int64_t rowid_lookup_key) {
2571 execution_kernels.emplace_back(
2572 std::make_unique<ExecutionKernel>(ra_exe_unit,
2584 fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2586 VLOG(1) <<
"Creating one execution kernel per fragment";
2591 table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2592 const auto max_frag_size =
2593 table_infos.front().info.getFragmentNumTuplesUpperBound();
2596 <<
" to match max fragment size " << max_frag_size
2597 <<
" for kernel per fragment execution path.";
2602 size_t frag_list_idx{0};
2603 auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2611 render_info](
const int device_id,
2613 const int64_t rowid_lookup_key) {
2614 if (!frag_list.size()) {
2619 execution_kernels.emplace_back(
2620 std::make_unique<ExecutionKernel>(ra_exe_unit,
2634 fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2638 return execution_kernels;
2642 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
2651 kernels.empty() ?
nullptr : &kernels[0]->ra_exe_unit_;
2655 shared_context.setThreadPool(&tg);
2657 ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(
nullptr); });
2660 VLOG(1) <<
"Launching " << kernels.size() <<
" kernels for query on "
2662 size_t kernel_idx = 1;
2663 for (
auto& kernel : kernels) {
2664 CHECK(kernel.get());
2669 crt_kernel_idx = kernel_idx++] {
2672 const size_t thread_i = crt_kernel_idx %
cpu_threads();
2673 kernel->run(
this, thread_i, shared_context);
2678 for (
auto& exec_ctx : shared_context.getTlsExecutionContext()) {
2685 results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
2687 results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
2697 const size_t table_idx,
2698 const size_t outer_frag_idx,
2699 std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
2700 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
2701 inner_table_id_to_join_condition) {
2702 const auto& table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
2703 auto table_frags_it = selected_tables_fragments.find(table_key);
2704 CHECK(table_frags_it != selected_tables_fragments.end());
2705 const auto& outer_input_desc = ra_exe_unit.
input_descs[0];
2706 const auto outer_table_fragments_it =
2707 selected_tables_fragments.find(outer_input_desc.getTableKey());
2708 const auto outer_table_fragments = outer_table_fragments_it->second;
2709 CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2710 CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2712 return {outer_frag_idx};
2714 const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2715 auto& inner_frags = table_frags_it->second;
2717 std::vector<size_t> all_frag_ids;
2718 for (
size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2720 const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2724 inner_table_id_to_join_condition,
2729 all_frag_ids.push_back(inner_frag_idx);
2731 return all_frag_ids;
2739 const int table_idx,
2740 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
2741 inner_table_id_to_join_condition,
2747 CHECK(table_idx >= 0 &&
2748 static_cast<size_t>(table_idx) < ra_exe_unit.
input_descs.size());
2749 const auto& inner_table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
2751 if (outer_fragment_info.
shard == -1 || inner_fragment_info.
shard == -1 ||
2752 outer_fragment_info.
shard == inner_fragment_info.
shard) {
2757 CHECK(!inner_table_id_to_join_condition.empty());
2758 auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
2759 CHECK(condition_it != inner_table_id_to_join_condition.end());
2760 join_condition = condition_it->second;
2761 CHECK(join_condition);
2764 plan_state_->join_info_.join_hash_tables_.size());
2765 for (
size_t i = 0; i <
plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2766 if (
plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2768 CHECK(!join_condition);
2769 join_condition =
plan_state_->join_info_.equi_join_tautologies_[i].get();
2773 if (!join_condition) {
2777 if (join_condition->is_overlaps_oper()) {
2780 size_t shard_count{0};
2781 if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2782 join_condition->get_left_operand())) {
2783 auto inner_outer_pairs =
2786 join_condition,
this, inner_outer_pairs);
2790 if (shard_count && !ra_exe_unit.
join_quals.empty()) {
2791 plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2800 const auto col_id = col_desc->
getColId();
2807 const std::vector<InputDescriptor>& input_descs,
2808 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
2809 std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
2810 for (
auto& desc : input_descs) {
2811 const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
2812 CHECK(fragments_it != all_tables_fragments.end());
2813 const auto& fragments = *fragments_it->second;
2814 std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2815 for (
size_t i = 0, off = 0; i < fragments.size(); ++i) {
2816 frag_offsets[i] = off;
2817 off += fragments[i].getNumTuples();
2819 tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
2821 return tab_id_to_frag_offsets;
2824 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2827 const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2828 const std::vector<InputDescriptor>& input_descs,
2829 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
2830 std::vector<std::vector<int64_t>> all_num_rows;
2831 std::vector<std::vector<uint64_t>> all_frag_offsets;
2832 const auto tab_id_to_frag_offsets =
2834 std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2835 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2836 std::vector<int64_t> num_rows;
2837 std::vector<uint64_t> frag_offsets;
2839 CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2841 for (
size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2842 const auto frag_id = ra_exe_unit.
union_all ? 0 : selected_frag_ids[tab_idx];
2843 const auto fragments_it =
2844 all_tables_fragments.find(input_descs[tab_idx].getTableKey());
2845 CHECK(fragments_it != all_tables_fragments.end());
2846 const auto& fragments = *fragments_it->second;
2847 if (ra_exe_unit.
join_quals.empty() || tab_idx == 0 ||
2848 plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2849 const auto& fragment = fragments[frag_id];
2850 num_rows.push_back(fragment.getNumTuples());
2852 size_t total_row_count{0};
2853 for (
const auto& fragment : fragments) {
2854 total_row_count += fragment.getNumTuples();
2856 num_rows.push_back(total_row_count);
2858 const auto frag_offsets_it =
2859 tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
2860 CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2861 const auto& offsets = frag_offsets_it->second;
2863 frag_offsets.push_back(offsets[frag_id]);
2865 all_num_rows.push_back(num_rows);
2867 all_frag_offsets.push_back(frag_offsets);
2869 return {all_num_rows, all_frag_offsets};
2877 const auto& input_descs = ra_exe_unit.
input_descs;
2879 if (nest_level < 1 ||
2881 ra_exe_unit.
join_quals.empty() || input_descs.size() < 2 ||
2883 plan_state_->isLazyFetchColumn(inner_col_desc))) {
2887 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2888 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
2889 const auto& fragments = selected_fragments[nest_level].fragment_ids;
2890 return fragments.size() > 1;
2901 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2902 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
2903 const auto& fragments = selected_fragments[nest_level].fragment_ids;
2904 auto need_linearize =
2907 return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
2919 const int device_id,
2921 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
2923 std::list<ChunkIter>& chunk_iterators,
2924 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2926 const size_t thread_idx,
2927 const bool allow_runtime_interrupt) {
2931 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2932 std::vector<size_t> local_col_to_frag_pos;
2934 local_col_to_frag_pos,
2940 selected_fragments_crossjoin);
2941 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2942 std::vector<std::vector<int64_t>> all_num_rows;
2943 std::vector<std::vector<uint64_t>> all_frag_offsets;
2944 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2945 std::vector<const int8_t*> frag_col_buffers(
2947 for (
const auto& col_id : col_global_ids) {
2948 if (allow_runtime_interrupt) {
2949 bool isInterrupted =
false;
2957 if (isInterrupted) {
2966 if (cd && cd->isVirtualCol) {
2970 const auto& table_key = col_id->getScanDesc().getTableKey();
2971 const auto fragments_it = all_tables_fragments.find(table_key);
2972 CHECK(fragments_it != all_tables_fragments.end());
2973 const auto fragments = fragments_it->second;
2974 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2976 CHECK_LT(static_cast<size_t>(it->second),
2978 const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2979 if (!fragments->size()) {
2982 CHECK_LT(frag_id, fragments->size());
2983 auto memory_level_for_column = memory_level;
2985 col_id->getColId()};
2986 if (
plan_state_->columns_to_fetch_.find(tbl_col_key) ==
2991 frag_col_buffers[it->second] =
2993 memory_level_for_column,
3004 cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3005 bool for_lazy_fetch =
false;
3006 if (
plan_state_->columns_to_not_fetch_.find(tbl_col_key) !=
3008 for_lazy_fetch =
true;
3009 VLOG(2) <<
"Try to linearize lazy fetch column (col_id: " << cd->columnId
3010 <<
", col_name: " << cd->columnName <<
")";
3013 col_id->getScanDesc().getTableKey(),
3015 all_tables_fragments,
3019 for_lazy_fetch ? 0 : device_id,
3024 col_id->getScanDesc().getTableKey(),
3026 all_tables_fragments,
3027 memory_level_for_column,
3034 col_id->getScanDesc().getTableKey(),
3037 all_tables_fragments,
3040 memory_level_for_column,
3046 all_frag_col_buffers.push_back(frag_col_buffers);
3049 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
3050 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3055 std::vector<InputDescriptor>
const& input_descs) {
3056 auto const has_table_key = [&table_key](
InputDescriptor const& input_desc) {
3057 return table_key == input_desc.getTableKey();
3059 return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3060 input_descs.begin();
3065 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3066 auto const has_table_key = [&table_key](
auto const& input_desc) {
3067 return table_key == input_desc->getScanDesc().getTableKey();
3069 return std::distance(
3070 input_col_descs.begin(),
3071 std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3076 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3077 std::list<std::shared_ptr<const InputColDescriptor>> selected;
3078 for (
auto const& input_col_desc : input_col_descs) {
3079 if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3080 selected.push_back(input_col_desc);
3088 int8_t
const*
const ptr,
3089 size_t const local_col_id,
3091 size_t const begin = local_col_id - local_col_id %
N;
3092 size_t const end = begin +
N;
3093 CHECK_LE(end, frag_col_buffers.size()) << (
void*)ptr <<
' ' << local_col_id <<
' ' <<
N;
3094 for (
size_t i = begin; i < end; ++i) {
3095 frag_col_buffers[i] = ptr;
3105 const int device_id,
3107 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3109 std::list<ChunkIter>& chunk_iterators,
3110 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3112 const size_t thread_idx,
3113 const bool allow_runtime_interrupt) {
3117 CHECK_EQ(1u, selected_fragments.size());
3120 auto const& input_descs = ra_exe_unit.
input_descs;
3121 const auto& selected_table_key = selected_fragments.front().table_key;
3122 size_t const input_descs_index =
3124 CHECK_LT(input_descs_index, input_descs.size());
3125 size_t const input_col_descs_index =
3128 VLOG(2) <<
"selected_table_key=" << selected_table_key
3129 <<
" input_descs_index=" << input_descs_index
3130 <<
" input_col_descs_index=" << input_col_descs_index
3132 <<
" ra_exe_unit.input_col_descs="
3135 std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3137 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3140 selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3143 selected_fragments_crossjoin);
3145 if (allow_runtime_interrupt) {
3146 bool isInterrupted =
false;
3153 if (isInterrupted) {
3157 std::vector<const int8_t*> frag_col_buffers(
3159 for (
const auto& col_id : selected_input_col_descs) {
3162 if (cd && cd->isVirtualCol) {
3166 const auto fragments_it = all_tables_fragments.find(selected_table_key);
3167 CHECK(fragments_it != all_tables_fragments.end());
3168 const auto fragments = fragments_it->second;
3169 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3171 size_t const local_col_id = it->second;
3173 constexpr
size_t frag_id = 0;
3174 if (fragments->empty()) {
3178 plan_state_->columns_to_fetch_.count({selected_table_key, col_id->getColId()})
3184 col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3188 all_tables_fragments,
3189 memory_level_for_column,
3197 all_tables_fragments,
3200 memory_level_for_column,
3205 set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3208 ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3213 <<
" input_descs_index=" << input_descs_index
3214 <<
" input_col_descs_index=" << input_col_descs_index;
3215 return {{std::move(frag_col_buffers)},
3216 {{num_rows[0][input_descs_index]}},
3217 {{frag_offsets[0][input_descs_index]}}};
3221 const size_t scan_idx,
3225 !
plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3226 !selected_fragments[scan_idx].fragment_ids.empty()) {
3231 return selected_fragments[scan_idx].fragment_ids;
3235 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3236 std::vector<size_t>& local_col_to_frag_pos,
3237 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3240 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
3242 const auto& input_descs = ra_exe_unit.
input_descs;
3243 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3244 const auto& table_key = input_descs[scan_idx].getTableKey();
3245 CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3246 selected_fragments_crossjoin.push_back(
3248 for (
const auto& col_id : col_global_ids) {
3250 const auto& input_desc = col_id->getScanDesc();
3251 if (input_desc.getTableKey() != table_key ||
3252 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
3255 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3257 CHECK_LT(static_cast<size_t>(it->second),
3259 local_col_to_frag_pos[it->second] = frag_pos;
3266 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3269 const auto& input_descs = ra_exe_unit.
input_descs;
3270 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3272 if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3273 selected_fragments_crossjoin.push_back({size_t(1)});
3282 OutVecOwner(
const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3284 for (
auto out : out_vec_) {
3297 const bool hoist_literals,
3299 const std::vector<Analyzer::Expr*>& target_exprs,
3301 std::vector<std::vector<const int8_t*>>& col_buffers,
3303 const std::vector<std::vector<int64_t>>& num_rows,
3304 const std::vector<std::vector<uint64_t>>& frag_offsets,
3306 const int device_id,
3307 const uint32_t start_rowid,
3308 const uint32_t num_tables,
3309 const bool allow_runtime_interrupt,
3311 const bool optimize_cuda_block_and_grid_sizes,
3312 const int64_t rows_to_process) {
3315 CHECK(!results || !(*results));
3316 if (col_buffers.empty()) {
3325 <<
"CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3326 "currently unsupported.";
3331 std::vector<int64_t*> out_vec;
3334 std::unique_ptr<OutVecOwner> output_memory_scope;
3335 if (allow_runtime_interrupt) {
3336 bool isInterrupted =
false;
3343 if (isInterrupted) {
3353 CHECK(cpu_generated_code);
3364 join_hash_table_ptrs,
3366 output_memory_scope.reset(
new OutVecOwner(out_vec));
3370 CHECK(gpu_generated_code);
3388 allow_runtime_interrupt,
3389 join_hash_table_ptrs,
3390 render_allocator_map_ptr,
3391 optimize_cuda_block_and_grid_sizes);
3392 output_memory_scope.reset(
new OutVecOwner(out_vec));
3395 }
catch (
const std::exception& e) {
3396 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3418 std::vector<int64_t> reduced_outs;
3419 const auto num_frags = col_buffers.size();
3420 const size_t entry_count =
3426 if (
size_t(1) == entry_count) {
3427 for (
auto out : out_vec) {
3429 reduced_outs.push_back(*out);
3432 size_t out_vec_idx = 0;
3434 for (
const auto target_expr : target_exprs) {
3436 CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3437 << target_expr->toString();
3439 const int num_iterations = agg_info.sql_type.is_geometry()
3440 ? agg_info.sql_type.get_physical_coord_cols()
3443 for (
int i = 0; i < num_iterations; i++) {
3447 shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3448 bool const check = shared::
3449 is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3451 CHECK(check) << agg_info.agg_kind;
3452 val1 = out_vec[out_vec_idx][0];
3455 const auto chosen_bytes =
static_cast<size_t>(
3461 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3462 out_vec[out_vec_idx],
3465 float_argument_input);
3470 reduced_outs.push_back(val1);
3471 if (agg_info.agg_kind ==
kAVG ||
3472 (agg_info.agg_kind ==
kSAMPLE &&
3473 (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3474 const auto chosen_bytes =
static_cast<size_t>(
3479 agg_info.agg_kind ==
kAVG ?
kCOUNT : agg_info.agg_kind,
3482 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3483 out_vec[out_vec_idx + 1],
3490 reduced_outs.push_back(val2);
3503 auto rows_ptr = std::shared_ptr<ResultSet>(
3505 rows_ptr->fillOneEntry(reduced_outs);
3506 *results = std::move(rows_ptr);
3514 return results && results->rowCount() < scan_limit;
3522 const bool hoist_literals,
3525 std::vector<std::vector<const int8_t*>>& col_buffers,
3526 const std::vector<size_t> outer_tab_frag_ids,
3528 const std::vector<std::vector<int64_t>>& num_rows,
3529 const std::vector<std::vector<uint64_t>>& frag_offsets,
3531 const int device_id,
3533 const int64_t scan_limit,
3534 const uint32_t start_rowid,
3535 const uint32_t num_tables,
3536 const bool allow_runtime_interrupt,
3538 const bool optimize_cuda_block_and_grid_sizes,
3539 const int64_t rows_to_process) {
3543 CHECK(!results || !(*results));
3544 if (col_buffers.empty()) {
3555 if (allow_runtime_interrupt) {
3556 bool isInterrupted =
false;
3563 if (isInterrupted) {
3576 VLOG(2) <<
"bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.
union_all)
3577 <<
" ra_exe_unit.input_descs="
3579 <<
" ra_exe_unit.input_col_descs="
3581 <<
" ra_exe_unit.scan_limit=" << ra_exe_unit.
scan_limit
3584 <<
" query_exe_context->query_buffers_->num_rows_="
3586 <<
" query_exe_context->query_mem_desc_.getEntryCount()="
3588 <<
" device_id=" << device_id <<
" outer_table_key=" << outer_table_key
3589 <<
" scan_limit=" << scan_limit <<
" start_rowid=" << start_rowid
3590 <<
" num_tables=" << num_tables;
3597 std::stable_sort(ra_exe_unit_copy.
input_descs.begin(),
3599 [outer_table_key](
auto const&
a,
auto const& b) {
3600 return a.getTableKey() == outer_table_key &&
3601 b.getTableKey() != outer_table_key;
3604 ra_exe_unit_copy.
input_descs.back().getTableKey() != outer_table_key) {
3609 [outer_table_key](
auto const& input_col_desc) {
3610 return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
3616 const int32_t scan_limit_for_query =
3618 const int32_t max_matched = scan_limit_for_query == 0
3620 : scan_limit_for_query;
3623 CHECK(cpu_generated_code);
3634 join_hash_table_ptrs,
3640 CHECK(gpu_generated_code);
3657 allow_runtime_interrupt,
3658 join_hash_table_ptrs,
3659 render_allocator_map_ptr,
3660 optimize_cuda_block_and_grid_sizes);
3667 }
catch (
const std::exception& e) {
3668 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3684 *results = query_exe_context->
getRowSet(ra_exe_unit_copy,
3687 VLOG(2) <<
"results->rowCount()=" << (*results)->rowCount();
3688 (*results)->holdLiterals(hoist_buf);
3690 if (error_code < 0 && render_allocator_map_ptr) {
3691 auto const adjusted_scan_limit =
3695 if (adjusted_scan_limit != 0) {
3701 if (results && error_code &&
3710 const int device_id) {
3711 std::vector<int8_t*> table_ptrs;
3712 const auto& join_hash_tables =
plan_state_->join_info_.join_hash_tables_;
3713 for (
auto hash_table : join_hash_tables) {
3715 CHECK(table_ptrs.empty());
3718 table_ptrs.push_back(hash_table->getJoinHashBuffer(
3725 const std::vector<InputTableInfo>& query_infos,
3730 const bool contains_left_deep_outer_join =
3731 ra_exe_unit && std::find_if(ra_exe_unit->
join_quals.begin(),
3737 new CgenState(query_infos.size(), contains_left_deep_outer_join,
this));
3745 const std::vector<InputTableInfo>& query_infos) {
3747 const auto ld_count = input_descs.size();
3749 for (
size_t i = 0; i < ld_count; ++i) {
3751 const auto frag_count = query_infos[i].info.fragments.size();
3755 if (frag_count > 1) {
3757 frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
3766 const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3767 const std::vector<InputTableInfo>& query_infos,
3770 const HashType preferred_hash_type,
3776 return {
nullptr,
"Overlaps hash join disabled, attempting to fall back to loop join"};
3786 preferred_hash_type,
3790 hashtable_build_dag_map,
3792 table_id_to_node_map);
3795 return {
nullptr, e.what()};
3801 CHECK(!dev_props.empty());
3802 return dev_props.front().warpSize;
3817 return std::max((
unsigned)2,
3853 return static_cast<int64_t
>(dev_props.front().clockKhz) * milliseconds;
3860 if (value->getType()->isIntegerTy() && from_ti.
is_number() && to_ti.
is_fp() &&
3865 fp_type = llvm::Type::getFloatTy(
cgen_state_->context_);
3868 fp_type = llvm::Type::getDoubleTy(
cgen_state_->context_);
3873 value =
cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3885 CHECK(val->getType()->isPointerTy());
3887 const auto val_ptr_type =
static_cast<llvm::PointerType*
>(val->getType());
3888 const auto val_type = val_ptr_type->getPointerElementType();
3889 size_t val_width = 0;
3890 if (val_type->isIntegerTy()) {
3891 val_width = val_type->getIntegerBitWidth();
3893 if (val_type->isFloatTy()) {
3896 CHECK(val_type->isDoubleTy());
3901 if (bitWidth == val_width) {
3908 #define EXECUTE_INCLUDE
3915 #undef EXECUTE_INCLUDE
3921 auto deleted_cols_it = deleted_cols_map.find(table_key);
3922 if (deleted_cols_it == deleted_cols_map.end()) {
3923 CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
3925 CHECK_EQ(deleted_cd, deleted_cols_it->second);
3936 auto ra_exe_unit_with_deleted = ra_exe_unit;
3938 for (
const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3942 const auto& table_key = input_table.getTableKey();
3943 const auto catalog =
3946 const auto td = catalog->getMetadataForTable(table_key.table_id);
3948 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
3952 CHECK(deleted_cd->columnType.is_boolean());
3955 for (
const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3956 if (input_col.get()->getColId() == deleted_cd->columnId &&
3957 input_col.get()->getScanDesc().getTableKey() == table_key &&
3958 input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3966 ra_exe_unit_with_deleted.input_col_descs.emplace_back(
3968 deleted_cd->tableId,
3970 input_table.getNestLevel()));
3974 return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3982 const int64_t chunk_min,
3983 const int64_t chunk_max,
3988 CHECK(ldim != rdim);
3992 return {
true, chunk_min / scale, chunk_max / scale};
3996 boost::multiprecision::cpp_int_backend<64,
3998 boost::multiprecision::signed_magnitude,
3999 boost::multiprecision::checked,
4004 std::make_tuple(
true,
4008 }
catch (
const std::overflow_error& e) {
4011 return std::make_tuple(
false, chunk_min, chunk_max);
4021 if (table_key.table_id < 0) {
4025 const auto catalog =
4028 const auto td = catalog->getMetadataForTable(fragment.
physicalTableId);
4030 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4035 const auto& chunk_type = deleted_cd->columnType;
4036 CHECK(chunk_type.is_boolean());
4038 const auto deleted_col_id = deleted_cd->columnId;
4041 const int64_t chunk_min =
4043 const int64_t chunk_max =
4045 if (chunk_min == 1 && chunk_max == 1) {
4063 double chunk_min{0.};
4064 double chunk_max{0.};
4068 if (chunk_min > chunk_max) {
4077 const auto rhs_val = rhs_type ==
kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4083 if (chunk_max < rhs_val) {
4088 if (chunk_max <= rhs_val) {
4093 if (chunk_min > rhs_val) {
4098 if (chunk_min >= rhs_val) {
4103 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4116 const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4117 const std::vector<uint64_t>& frag_offsets,
4118 const size_t frag_idx) {
4122 <<
", fragment id: " << frag_idx;
4126 for (
const auto& simple_qual : simple_quals) {
4127 const auto comp_expr =
4135 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4139 CHECK(lhs_uexpr->get_optype() ==
4142 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4149 const auto rhs = comp_expr->get_right_operand();
4155 if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4156 !lhs->get_type_info().is_fp()) {
4159 if (lhs->get_type_info().is_fp()) {
4160 const auto fragment_skip_status =
4162 switch (fragment_skip_status) {
4177 if (lhs_col->get_type_info().is_timestamp() &&
4178 rhs_const->get_type_info().is_any<
kTIME>()) {
4185 const int col_id = lhs_col->getColumnKey().column_id;
4187 int64_t chunk_min{0};
4188 int64_t chunk_max{0};
4189 bool is_rowid{
false};
4190 size_t start_rowid{0};
4194 if (cd->isVirtualCol) {
4195 CHECK(cd->columnName ==
"rowid");
4197 start_rowid = table_generation.start_rowid;
4198 chunk_min = frag_offsets[frag_idx] + start_rowid;
4199 chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4203 const auto& chunk_type = lhs_col->get_type_info();
4209 if (chunk_min > chunk_max) {
4213 if (lhs->get_type_info().is_timestamp() &&
4214 (lhs_col->get_type_info().get_dimension() !=
4215 rhs_const->get_type_info().get_dimension()) &&
4216 (lhs_col->get_type_info().is_high_precision_timestamp() ||
4217 rhs_const->get_type_info().is_high_precision_timestamp())) {
4226 std::tie(is_valid, chunk_min, chunk_max) =
4228 chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4230 VLOG(4) <<
"Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4234 <<
"\nLHS col precision is: "
4236 <<
"\nRHS precision is: "
4237 <<
std::to_string(rhs_const->get_type_info().get_dimension()) <<
".";
4241 if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4246 chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4248 chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4250 llvm::LLVMContext local_context;
4251 CgenState local_cgen_state(local_context);
4254 const auto rhs_val =
4257 switch (comp_expr->get_optype()) {
4259 if (chunk_max < rhs_val) {
4264 if (chunk_max <= rhs_val) {
4269 if (chunk_min > rhs_val) {
4274 if (chunk_min >= rhs_val) {
4279 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4281 }
else if (is_rowid) {
4282 return {
false, rhs_val - start_rowid};
4320 const std::vector<uint64_t>& frag_offsets,
4321 const size_t frag_idx) {
4322 std::pair<bool, int64_t> skip_frag{
false, -1};
4323 for (
auto& inner_join : ra_exe_unit.
join_quals) {
4330 std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4331 for (
auto& qual : inner_join.quals) {
4333 inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4334 temp_qual.simple_quals.begin(),
4335 temp_qual.simple_quals.end());
4338 table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4339 if (temp_skip_frag.second != -1) {
4340 skip_frag.second = temp_skip_frag.second;
4343 skip_frag.first = skip_frag.first || temp_skip_frag.first;
4350 const std::unordered_set<PhysicalInput>& phys_inputs) {
4352 std::unordered_set<shared::TableKey> phys_table_keys;
4353 for (
const auto& phys_input : phys_inputs) {
4354 phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4356 std::vector<InputTableInfo> query_infos;
4357 for (
const auto& table_key : phys_table_keys) {
4360 for (
const auto& phys_input : phys_inputs) {
4361 auto db_id = phys_input.db_id;
4362 auto table_id = phys_input.table_id;
4363 auto column_id = phys_input.col_id;
4368 const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4371 agg_col_range_cache.
setColRange(phys_input, col_range);
4374 return agg_col_range_cache;
4378 const std::unordered_set<PhysicalInput>& phys_inputs) {
4384 for (
const auto& phys_input : phys_inputs) {
4385 const auto catalog =
4388 const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4390 const auto& col_ti =
4391 cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4392 if (col_ti.is_string() && col_ti.get_compression() ==
kENCODING_DICT) {
4393 const auto& dict_key = col_ti.getStringDictKey();
4394 const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4395 CHECK(dd && dd->stringDict);
4397 dd->stringDict->storageEntryCount());
4400 return string_dictionary_generations;
4404 const std::unordered_set<shared::TableKey>& phys_table_keys) {
4406 for (
const auto& table_key : phys_table_keys) {
4410 TableGeneration{
static_cast<int64_t
>(table_info.getPhysicalNumTuples()), 0});
4412 return table_generations;
4416 const std::unordered_set<shared::TableKey>& phys_table_ids) {
4451 return !candidate_query_session.empty() &&
4463 ->second.getQueryStatus();
4465 return QuerySessionStatus::QueryStatus::UNDEFINED;
4475 const std::string& query_str,
4476 const std::string& query_submitted_time) {
4477 if (!query_session_id.empty()) {
4481 query_session_id, query_submitted_time,
executor_id_, write_lock);
4483 query_submitted_time,
4484 QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4487 return {query_session_id, query_str};
4494 if (query_session.empty()) {
4501 VLOG(1) <<
"Interrupting pending query is not available since the query session is "
4506 <<
"Interrupting pending query is not available since its interrupt flag is "
4517 const std::string& submitted_time_str) {
4520 if (query_session.empty()) {
4532 const std::string& submitted_time_str,
4536 if (query_session.empty()) {
4539 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4543 query_session, submitted_time_str, new_query_status, session_write_lock);
4548 const std::string& query_str,
4549 const std::string& submitted_time_str,
4550 const size_t executor_id,
4554 if (query_session.empty()) {
4562 query_session_status,
4563 session_write_lock);
4565 if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4577 const std::string& query_str,
4578 const std::string& submitted_time_str,
4579 const size_t executor_id,
4587 .emplace(submitted_time_str,
4595 .emplace(submitted_time_str,
4603 std::map<std::string, QuerySessionStatus> executor_per_query_map;
4604 executor_per_query_map.emplace(
4607 query_session, executor_id, query_str, submitted_time_str, query_status));
4615 const std::string& submitted_time_str,
4619 if (query_session.empty()) {
4624 auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
4626 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4627 auto prev_status = query_status.second.getQueryStatus();
4628 if (prev_status == updated_query_status) {
4631 query_status.second.setQueryStatus(updated_query_status);
4641 const std::string& submitted_time_str,
4642 const size_t executor_id,
4645 if (query_session.empty()) {
4650 for (
auto it = storage.begin(); it != storage.end(); it++) {
4651 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4653 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
4655 .at(submitted_time_str)
4656 .setExecutorId(executor_id);
4666 const std::string& submitted_time_str,
4668 if (query_session.empty()) {
4673 if (storage.size() > 1) {
4675 for (
auto it = storage.begin(); it != storage.end(); it++) {
4676 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
4679 submitted_time_str.compare(target_submitted_t_str) == 0) {
4684 }
else if (storage.size() == 1) {
4701 if (query_session.empty()) {
4712 if (query_session.empty()) {
4723 if (query_session.empty()) {
4730 const double runtime_query_check_freq,
4731 const unsigned pending_query_check_freq)
const {
4745 const size_t cache_value) {
4749 VLOG(1) <<
"Put estimated cardinality to the cache";
4757 VLOG(1) <<
"Reuse cached cardinality";
4768 std::vector<QuerySessionStatus> ret;
4769 for (
auto& info : query_infos) {
4771 info.second.getExecutorId(),
4772 info.second.getQueryStr(),
4773 info.second.getQuerySubmittedTime(),
4774 info.second.getQueryStatus()));
4783 std::vector<size_t>
res;
4787 for (
auto& kv : it->second) {
4788 if (kv.second.getQueryStatus() ==
4789 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
4790 res.push_back(kv.second.getExecutorId());
4850 std::stringstream ss;
4851 ss <<
"colRangeCache: ";
4853 ss <<
"{" << phys_input.col_id <<
", " << phys_input.table_id
4854 <<
"} = " << exp_range.toString() <<
", ";
4856 ss <<
"stringDictGenerations: ";
4857 for (
auto& [key, val] :
row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
4858 ss << key <<
" = " << val <<
", ";
4860 ss <<
"tableGenerations: ";
4862 ss << key <<
" = {" << val.tuple_count <<
", " << val.start_rowid <<
"}, ";
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
size_t getSlotCount() const
constexpr size_t kArenaBlockOverhead
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
std::vector< int > ChunkKey
double g_running_query_interrupt_freq
robin_hood::unordered_set< int64_t > CountDistinctSet
std::string get_cuda_libdevice_dir(void)
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
static QuerySessionMap queries_session_map_
CudaMgr_Namespace::CudaMgr * cudaMgr() const
unsigned ceil_div(unsigned const dividend, unsigned const divisor)
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
int64_t kernel_queue_time_ms_
size_t maxGpuSlabSize() const
HOST DEVICE int get_size() const
size_t getEntryCount() const
bool useCudaBuffers() const
Data_Namespace::DataMgr * data_mgr_
size_t getKeyCount() const
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< shared::TableKey, const TableFragments * > &selected_tables_fragments, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
int32_t getErrorCode() const
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
size_t g_cpu_sub_task_size
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
void checkPendingQueryStatus(const QuerySessionId &query_session)
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
static const int32_t ERR_INTERRUPTED
class for a per-database catalog. also includes metadata for the current database and the current use...
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
std::unordered_map< shared::TableKey, const ColumnDescriptor * > DeletedColumnsMap
void setEntryCount(const size_t val)
input_table_info_cache_(this)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const ExecutorDeviceType device_type, const int device_count)
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
std::atomic< bool > interrupted_
void setGeneration(const shared::StringDictKey &dict_key, const uint64_t generation)
std::map< shared::TableKey, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
static ResultSetRecyclerHolder resultset_recycler_holder_
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
static const int max_gpu_count
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
bool with_dynamic_watchdog
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
HOST DEVICE int get_scale() const
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs)
const std::list< Analyzer::OrderEntry > order_entries
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
void clearMemory(const MemoryLevel memLevel)
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
void setOutputColumnar(const bool val)
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
DEVICE void sort(ARGS &&...args)
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
ExpressionRange getColRange(const PhysicalInput &) const
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
TypeR::rep timer_stop(Type clock_begin)
Functions to support geospatial operations used by the executor.
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
QuerySessionId current_query_session_
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
heavyai::shared_mutex & getSessionLock()
static const int32_t ERR_GEOS
AggregatedColRange agg_col_range_cache_
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
std::unique_ptr< CgenState > cgen_state_
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
bool with_dynamic_watchdog
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
static uint32_t gpu_active_modules_device_mask_
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
HOST DEVICE SQLTypes get_type() const
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
quantile::TDigest * nullTDigest(double const q)
bool isSharedMemoryUsed() const
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
void reset(bool discard_runtime_modules_only=false)
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
unsigned numBlocksPerMP() const
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key, const bool with_generation) const
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
Container for compilation results and assorted options for a single execution unit.
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
std::vector< FragmentsPerTable > FragmentsList
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t literalBytes(const CgenState::LiteralValue &lit)
bool filter_on_deleted_column
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
bool g_enable_overlaps_hashjoin
bool checkNonKernelTimeInterrupted() const
size_t getRowSize() const
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
std::vector< Analyzer::Expr * > target_exprs_union
bool g_enable_string_functions
std::unordered_map< shared::TableKey, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
static const size_t high_scan_limit
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
SQLOps get_optype() const
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
const ExecutorId executor_id_
Data_Namespace::DataMgr & getDataMgr() const
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const size_t max_gpu_slab_size_
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
int8_t groupColWidth(const size_t key_idx) const
bool containsPreFlightFn() const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
void setGeneration(const shared::TableKey &table_key, const TableGeneration &generation)
static SysCatalog & instance()
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Classes representing a parse tree.
int getDeviceCount() const
Definition: