19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
87 bool g_enable_filter_function{true};
88 unsigned g_dynamic_watchdog_time_limit{10000};
89 bool g_allow_cpu_retry{true};
90 bool g_allow_query_step_cpu_retry{true};
91 bool g_null_div_by_zero{false};
92 unsigned g_trivial_loop_join_threshold{1000};
93 bool g_from_table_reordering{true};
94 bool g_inner_join_fragment_skipping{true};
95 extern bool g_enable_smem_group_by;
96 extern std::unique_ptr<llvm::Module> udf_gpu_module;
97 extern std::unique_ptr<llvm::Module> udf_cpu_module;
98 bool g_enable_filter_push_down{false};
99 float g_filter_push_down_low_frac{-1.0f};
100 float g_filter_push_down_high_frac{-1.0f};
101 size_t g_filter_push_down_passing_row_ubound{0};
102 bool g_enable_columnar_output{false};
103 bool g_enable_left_join_filter_hoisting{true};
104 bool g_optimize_row_initialization{true};
105 bool g_enable_bbox_intersect_hashjoin{true};
106 size_t g_num_tuple_threshold_switch_to_baseline{100000};
107 size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline{100};
108 bool g_enable_distance_rangejoin{true};
109 bool g_enable_hashjoin_many_to_many{true};
110 size_t g_bbox_intersect_max_table_size_bytes{1024 * 1024 * 1024};
111 double g_bbox_intersect_target_entries_per_bin{1.3};
112 bool g_strip_join_covered_quals{false};
113 size_t g_constrained_by_in_threshold{10};
114 size_t g_default_max_groups_buffer_entry_guess{16384};
115 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
116 bool g_enable_window_functions{true};
117 bool g_enable_table_functions{true};
118 bool g_enable_ml_functions{true};
119 bool g_restrict_ml_model_metadata_to_superusers{false};
120 bool g_enable_dev_table_functions{false};
121 bool g_enable_geo_ops_on_uncompressed_coords{true};
122 bool g_enable_rf_prop_table_functions{true};
123 bool g_allow_memory_status_log{true};
124 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
125 size_t g_min_memory_allocation_size{
126 256}; // minimum memory allocation required for projection query output buffer
127 // without pre-flight count
128 bool g_enable_bump_allocator{false};
129 double g_bump_allocator_step_reduction{0.75};
130 bool g_enable_direct_columnarization{true};
131 extern bool g_enable_string_functions;
132 bool g_enable_lazy_fetch{true};
133 bool g_enable_runtime_query_interrupt{true};
134 bool g_enable_non_kernel_time_query_interrupt{true};
135 bool g_use_estimator_result_cache{true};
136 unsigned g_pending_query_interrupt_freq{1000};
137 double g_running_query_interrupt_freq{0.1};
138 size_t g_gpu_smem_threshold{
139 4096}; // GPU shared memory threshold (in bytes), if larger
140 // buffer sizes are required we do not use GPU shared
141 // memory optimizations Setting this to 0 means unlimited
142 // (subject to other dynamically calculated caps)
143 bool g_enable_smem_grouped_non_count_agg{
144 true}; // enable use of shared memory when performing group-by with select non-count
146 bool g_enable_smem_non_grouped_agg{
147 true}; // enable optimizations for using GPU shared memory in implementation of
148 // non-grouped aggregates
149 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
150 // limits the allocation for the output buffer arena
151 // and data recycler test
152 size_t g_enable_parallel_linearization{
153 10000}; // # rows that we are trying to linearize varlen col in parallel
154 bool g_enable_data_recycler{true};
155 bool g_use_hashtable_cache{true};
156 bool g_use_query_resultset_cache{true};
157 bool g_use_chunk_metadata_cache{true};
158 bool g_allow_auto_resultset_caching{false};
159 bool g_allow_query_step_skipping{true};
160 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
161 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
162 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
163 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
164 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
165 bool g_optimize_cuda_block_and_grid_sizes{false};
167 size_t g_approx_quantile_buffer{1000};
168 size_t g_approx_quantile_centroids{300};
170 bool g_enable_automatic_ir_metadata{true};
172 size_t g_max_log_length{500};
174 bool g_enable_executor_resource_mgr{true};
176 double g_executor_resource_mgr_cpu_result_mem_ratio{0.8};
177 size_t g_executor_resource_mgr_cpu_result_mem_bytes{Executor::auto_cpu_mem_bytes};
178 double g_executor_resource_mgr_per_query_max_cpu_slots_ratio{0.9};
179 double g_executor_resource_mgr_per_query_max_cpu_result_mem_ratio{0.8};
181 // Todo: rework ConcurrentResourceGrantPolicy and ExecutorResourcePool to allow
182 // thresholds for concurrent oversubscription, rather than just boolean allowed/disallowed
183 bool g_executor_resource_mgr_allow_cpu_kernel_concurrency{true};
184 bool g_executor_resource_mgr_allow_cpu_gpu_kernel_concurrency{true};
185 // Whether a single query can oversubscribe CPU slots should be controlled with
186 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
187 bool g_executor_resource_mgr_allow_cpu_slot_oversubscription_concurrency{false};
188 // Whether a single query can oversubscribe CPU memory should be controlled with
189 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
190 bool g_executor_resource_mgr_allow_cpu_result_mem_oversubscription_concurrency{false};
191 double g_executor_resource_mgr_max_available_resource_use_ratio{0.8};
193 extern bool g_cache_string_hash;
194 extern bool g_allow_memory_status_log;
196 int const Executor::max_gpu_count;
198 const int32_t Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES;
200 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
202 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
203 const std::string& udf_ir_filename,
204 llvm::LLVMContext& ctx);
205 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
206 const std::string& udf_ir_filename,
207 llvm::LLVMContext& ctx,
208 bool is_gpu = false);
209 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
210 const std::string& udf_ir_string,
211 llvm::LLVMContext& ctx,
212 bool is_gpu = false);
215 // This function is notably different from that in RelAlgExecutor because it already
216 // expects SPI values and therefore needs to avoid that transformation.
217 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
218 for (const auto [col_id, table_id, db_id] : phys_inputs) {
219 foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
223 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
224 const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
225 // The fragmenter always returns at least one fragment, even when the table is empty.
226 return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
230 namespace foreign_storage {
231 // Foreign tables skip the population of dictionaries during metadata scan. This function
232 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
236 if (
const auto foreign_table = dynamic_cast<const ForeignTable*>(
237 catalog->getMetadataForTable(table_id,
false))) {
238 const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
239 if (col_desc->columnType.is_dict_encoded_type()) {
240 auto& fragmenter = foreign_table->fragmenter;
241 CHECK(fragmenter !=
nullptr);
245 for (
const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
246 ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
254 CHECK(metadata_map.find(col_id) != metadata_map.end());
255 if (
auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
259 &(catalog->getDataMgr()),
274 const size_t block_size_x,
275 const size_t grid_size_x,
276 const size_t max_gpu_slab_size,
277 const std::string& debug_dir,
278 const std::string& debug_file)
279 : executor_id_(executor_id)
280 , context_(new llvm::LLVMContext())
291 update_extension_modules();
299 auto template_path = root_path +
"/QueryEngine/RuntimeFunctions.bc";
300 CHECK(boost::filesystem::exists(template_path));
304 auto rt_geos_path = root_path +
"/QueryEngine/GeosRuntime.bc";
305 CHECK(boost::filesystem::exists(rt_geos_path));
311 if (boost::filesystem::exists(rt_libdevice_path)) {
316 <<
" does not exist; support for some UDF "
317 "functions might not be available.";
326 qe->s_code_accessor->clear();
327 qe->s_stubs_accessor->clear();
328 qe->cpu_code_accessor->clear();
329 qe->gpu_code_accessor->clear();
330 qe->tf_code_accessor->clear();
332 if (discard_runtime_modules_only) {
337 cgen_state_->module_ =
nullptr;
339 extension_modules_.clear();
341 context_.reset(
new llvm::LLVMContext());
342 cgen_state_.reset(
new CgenState({},
false,
this));
348 const std::string& source) {
354 CHECK(!source.empty());
355 switch (module_kind) {
375 return std::unique_ptr<llvm::Module>();
380 bool erase_not_found =
false) {
383 auto llvm_module = read_module(module_kind, it->second);
385 extension_modules_[module_kind] = std::move(llvm_module);
386 }
else if (erase_not_found) {
387 extension_modules_.erase(module_kind);
389 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
391 <<
" LLVM module. The module will be unavailable.";
394 <<
" LLVM module. Using the existing module.";
398 if (erase_not_found) {
399 extension_modules_.erase(module_kind);
401 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
403 <<
" LLVM module is unavailable. The module will be unavailable.";
406 <<
" LLVM module is unavailable. Using the existing module.";
412 if (!update_runtime_modules_only) {
438 , cgen_state_(std::move(
executor_.cgen_state_))
446 const bool allow_lazy_fetch,
447 const std::vector<InputTableInfo>& query_infos,
460 executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
465 for (
auto& p :
executor_.cgen_state_->row_func_hoisted_literals_) {
466 auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
467 if (inst && inst->getNumUses() == 0 && inst->getParent() ==
nullptr) {
470 p.first->deleteValue();
473 executor_.cgen_state_->row_func_hoisted_literals_.clear();
479 for (
auto& bm :
executor_.cgen_state_->in_values_bitmaps_) {
482 executor_.cgen_state_->in_values_bitmaps_.clear();
484 for (
auto& str_dict_translation_mgr :
485 executor_.cgen_state_->str_dict_translation_mgrs_) {
486 cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
488 executor_.cgen_state_->str_dict_translation_mgrs_.clear();
490 for (
auto& tree_model_prediction_mgr :
491 executor_.cgen_state_->tree_model_prediction_mgrs_) {
492 cgen_state_->moveTreeModelPredictionMgr(std::move(tree_model_prediction_mgr));
494 executor_.cgen_state_->tree_model_prediction_mgrs_.clear();
511 const std::string& debug_dir,
512 const std::string& debug_file,
520 auto executor = std::make_shared<Executor>(executor_id,
527 CHECK(
executors_.insert(std::make_pair(executor_id, executor)).second);
532 switch (memory_level) {
551 throw std::runtime_error(
552 "Clearing memory levels other than the CPU level or GPU level is not "
564 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
565 const bool with_generation)
const {
566 CHECK(row_set_mem_owner);
567 std::lock_guard<std::mutex> lock(
569 return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
574 const bool with_generation) {
580 const auto dd = catalog->getMetadataForDict(dict_id);
582 auto dict_key = dict_key_in;
584 CHECK(dd->stringDict);
586 const int64_t generation =
587 with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
588 return addStringDict(dd->stringDict, dict_key, generation);
592 if (!lit_str_dict_proxy_) {
594 std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
596 lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
599 return lit_str_dict_proxy_.get();
606 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
607 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
608 const bool with_generation)
const {
609 CHECK(row_set_mem_owner);
610 std::lock_guard<std::mutex> lock(
612 return row_set_mem_owner->getOrAddStringProxyTranslationMap(
613 source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
620 const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
621 const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
622 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
const {
623 CHECK(row_set_mem_owner);
624 std::lock_guard<std::mutex> lock(
627 if (!dest_string_op_infos.empty()) {
628 row_set_mem_owner->addStringProxyUnionTranslationMap(
629 dest_proxy, dest_proxy, dest_string_op_infos);
631 return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
632 source_proxy, dest_proxy, source_string_op_infos);
638 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
639 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
640 const bool with_generation)
const {
641 CHECK(row_set_mem_owner);
642 std::lock_guard<std::mutex> lock(
644 return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
645 source_dict_key, with_generation, string_op_infos);
651 const bool with_generation,
653 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
654 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
655 const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
657 return addStringProxyIntersectionTranslationMap(
658 source_proxy, dest_proxy, string_op_infos);
660 return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
667 const bool with_generation,
668 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
669 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
670 return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
674 std::lock_guard<std::mutex> lock(state_mutex_);
676 .emplace_back(std::make_unique<quantile::TDigest>(
695 if (!cd || n > cd->columnType.get_physical_cols()) {
730 std::string
const& log_tag,
731 size_t const thread_idx) {
732 std::ostringstream oss;
734 oss <<
" (" << log_tag <<
", EXECUTOR-" << executor_id <<
", THREAD-" << thread_idx
735 <<
", TOOK: " << log_time_ms <<
" ms)";
736 VLOG(1) << oss.str();
741 size_t const thread_idx)
const {
744 std::ostringstream oss;
752 size_t const thread_idx)
const {
775 const auto& ti = cd->columnType;
776 const auto sz = ti.get_size();
779 if (ti.is_logical_geo_type()) {
795 const std::set<shared::TableKey>& table_ids_to_fetch,
796 const bool include_lazy_fetched_cols)
const {
797 std::map<shared::ColumnKey, size_t> col_byte_width_map;
799 for (
const auto& fetched_col :
plan_state_->getColumnsToFetch()) {
800 if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
804 CHECK(col_byte_width_map.insert({fetched_col, col_byte_width}).second);
806 if (include_lazy_fetched_cols) {
807 for (
const auto& lazy_fetched_col :
plan_state_->getColumnsToNotFetch()) {
808 if (table_ids_to_fetch.count({lazy_fetched_col.db_id, lazy_fetched_col.table_id}) ==
813 CHECK(col_byte_width_map.insert({lazy_fetched_col, col_byte_width}).second);
816 return col_byte_width_map;
820 const std::set<shared::TableKey>& table_ids_to_fetch)
const {
821 size_t num_bytes = 0;
825 for (
const auto& fetched_col :
plan_state_->getColumnsToFetch()) {
826 if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
830 if (fetched_col.table_id < 0) {
834 {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
839 if (!ti.is_logical_geo_type()) {
854 const std::vector<InputDescriptor>& input_descs,
855 const std::vector<InputTableInfo>& query_infos,
856 const std::vector<std::pair<int32_t, FragmentsList>>& kernel_fragment_lists)
const {
857 using TableFragmentId = std::pair<shared::TableKey, int32_t>;
858 using TableFragmentSizeMap = std::map<TableFragmentId, size_t>;
867 std::set<shared::TableKey> lhs_table_keys;
868 for (
const auto& input_desc : input_descs) {
869 if (input_desc.getNestLevel() == 0) {
870 lhs_table_keys.insert(input_desc.getTableKey());
875 const auto column_byte_width_map =
882 size_t const byte_width_per_row =
884 column_byte_width_map.end(),
886 [](
size_t sum,
auto& col_entry) {
return sum + col_entry.second; });
890 TableFragmentSizeMap all_table_fragments_size_map;
892 for (
auto& query_info : query_infos) {
893 const auto& table_key = query_info.table_key;
894 for (
const auto& frag : query_info.info.fragments) {
895 const int32_t frag_id = frag.fragmentId;
896 const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
897 const size_t fragment_num_tuples = frag.getNumTuples();
898 all_table_fragments_size_map.insert(
899 std::make_pair(table_frag_id, fragment_num_tuples));
906 TableFragmentSizeMap query_table_fragments_size_map;
907 std::vector<size_t> bytes_per_kernel;
908 bytes_per_kernel.reserve(kernel_fragment_lists.size());
910 size_t max_kernel_bytes{0};
912 for (
auto& kernel_frag_list : kernel_fragment_lists) {
913 size_t kernel_bytes{0};
914 const auto frag_list = kernel_frag_list.second;
915 for (
const auto& table_frags : frag_list) {
916 const auto& table_key = table_frags.table_key;
917 for (
const size_t frag_id : table_frags.fragment_ids) {
918 const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
919 const size_t fragment_num_tuples = all_table_fragments_size_map[table_frag_id];
920 kernel_bytes += fragment_num_tuples * byte_width_per_row;
921 query_table_fragments_size_map.insert(
922 std::make_pair(table_frag_id, fragment_num_tuples));
925 bytes_per_kernel.emplace_back(kernel_bytes);
926 if (kernel_bytes > max_kernel_bytes) {
927 max_kernel_bytes = kernel_bytes;
933 std::map<ChunkKey, size_t> all_chunks_byte_sizes_map;
934 constexpr int32_t subkey_min = std::numeric_limits<int32_t>::min();
936 for (
const auto& col_byte_width_entry : column_byte_width_map) {
938 const int32_t db_id = col_byte_width_entry.first.db_id;
939 const int32_t table_id = col_byte_width_entry.first.table_id;
940 const int32_t col_id = col_byte_width_entry.first.column_id;
941 const size_t col_byte_width = col_byte_width_entry.second;
944 const auto frag_start =
945 query_table_fragments_size_map.lower_bound({table_key, subkey_min});
946 for (
auto frag_itr = frag_start; frag_itr != query_table_fragments_size_map.end() &&
947 frag_itr->first.first == table_key;
949 const ChunkKey chunk_key = {db_id, table_id, col_id, frag_itr->first.second};
950 const size_t chunk_byte_size = col_byte_width * frag_itr->second;
951 all_chunks_byte_sizes_map.insert({chunk_key, chunk_byte_size});
955 size_t total_chunk_bytes{0};
956 const size_t num_chunks = all_chunks_byte_sizes_map.size();
957 std::vector<std::pair<ChunkKey, size_t>> chunks_with_byte_sizes;
958 chunks_with_byte_sizes.reserve(num_chunks);
959 for (
const auto& chunk_byte_size_entry : all_chunks_byte_sizes_map) {
960 chunks_with_byte_sizes.emplace_back(
961 std::make_pair(chunk_byte_size_entry.first, chunk_byte_size_entry.second));
964 total_chunk_bytes += chunk_byte_size_entry.second;
973 chunks_with_byte_sizes,
978 bytes_scales_per_kernel};
982 const std::vector<Analyzer::Expr*>& target_exprs)
const {
984 for (
const auto target_expr : target_exprs) {
993 const std::vector<Analyzer::Expr*>& target_exprs)
const {
995 std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
996 for (
const auto target_expr : target_exprs) {
997 if (!
plan_state_->isLazyFetchColumn(target_expr)) {
998 col_lazy_fetch_info.emplace_back(
1003 auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
1005 if (cd &&
IS_GEO(cd->columnType.get_type())) {
1009 auto col_key = col_var->getColumnKey();
1010 col_key.column_id += 1;
1012 const auto col0_ti = cd0->columnType;
1013 CHECK(!cd0->isVirtualCol);
1014 const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
1015 const auto local_col0_id =
plan_state_->getLocalColumnId(col0_var.get(),
false);
1016 col_lazy_fetch_info.emplace_back(
1020 auto local_col_id =
plan_state_->getLocalColumnId(col_var,
false);
1021 const auto& col_ti = col_var->get_type_info();
1026 return col_lazy_fetch_info;
1036 const std::unordered_map<int, CgenState::LiteralValues>& literals,
1037 const int device_id) {
1038 if (literals.empty()) {
1041 const auto dev_literals_it = literals.find(device_id);
1042 CHECK(dev_literals_it != literals.end());
1043 const auto& dev_literals = dev_literals_it->second;
1044 size_t lit_buf_size{0};
1045 std::vector<std::string> real_strings;
1046 std::vector<std::vector<double>> double_array_literals;
1047 std::vector<std::vector<int8_t>> align64_int8_array_literals;
1048 std::vector<std::vector<int32_t>> int32_array_literals;
1049 std::vector<std::vector<int8_t>> align32_int8_array_literals;
1050 std::vector<std::vector<int8_t>> int8_array_literals;
1051 for (
const auto& lit : dev_literals) {
1053 if (lit.which() == 7) {
1054 const auto p = boost::get<std::string>(&lit);
1056 real_strings.push_back(*p);
1057 }
else if (lit.which() == 8) {
1058 const auto p = boost::get<std::vector<double>>(&lit);
1060 double_array_literals.push_back(*p);
1061 }
else if (lit.which() == 9) {
1062 const auto p = boost::get<std::vector<int32_t>>(&lit);
1064 int32_array_literals.push_back(*p);
1065 }
else if (lit.which() == 10) {
1066 const auto p = boost::get<std::vector<int8_t>>(&lit);
1068 int8_array_literals.push_back(*p);
1069 }
else if (lit.which() == 11) {
1070 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
1072 if (p->second == 64) {
1073 align64_int8_array_literals.push_back(p->first);
1074 }
else if (p->second == 32) {
1075 align32_int8_array_literals.push_back(p->first);
1081 if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
1084 int16_t crt_real_str_off = lit_buf_size;
1085 for (
const auto& real_str : real_strings) {
1086 CHECK_LE(real_str.size(),
static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1087 lit_buf_size += real_str.size();
1089 if (double_array_literals.size() > 0) {
1090 lit_buf_size =
align(lit_buf_size,
sizeof(
double));
1092 int16_t crt_double_arr_lit_off = lit_buf_size;
1093 for (
const auto& double_array_literal : double_array_literals) {
1094 CHECK_LE(double_array_literal.size(),
1095 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1096 lit_buf_size += double_array_literal.size() *
sizeof(double);
1098 if (align64_int8_array_literals.size() > 0) {
1099 lit_buf_size =
align(lit_buf_size,
sizeof(uint64_t));
1101 int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
1102 for (
const auto& align64_int8_array_literal : align64_int8_array_literals) {
1103 CHECK_LE(align64_int8_array_literals.size(),
1104 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1105 lit_buf_size += align64_int8_array_literal.size();
1107 if (int32_array_literals.size() > 0) {
1108 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
1110 int16_t crt_int32_arr_lit_off = lit_buf_size;
1111 for (
const auto& int32_array_literal : int32_array_literals) {
1112 CHECK_LE(int32_array_literal.size(),
1113 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1114 lit_buf_size += int32_array_literal.size() *
sizeof(int32_t);
1116 if (align32_int8_array_literals.size() > 0) {
1117 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
1119 int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
1120 for (
const auto& align32_int8_array_literal : align32_int8_array_literals) {
1121 CHECK_LE(align32_int8_array_literals.size(),
1122 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1123 lit_buf_size += align32_int8_array_literal.size();
1125 int16_t crt_int8_arr_lit_off = lit_buf_size;
1126 for (
const auto& int8_array_literal : int8_array_literals) {
1127 CHECK_LE(int8_array_literal.size(),
1128 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1129 lit_buf_size += int8_array_literal.size();
1131 unsigned crt_real_str_idx = 0;
1132 unsigned crt_double_arr_lit_idx = 0;
1133 unsigned crt_align64_int8_arr_lit_idx = 0;
1134 unsigned crt_int32_arr_lit_idx = 0;
1135 unsigned crt_align32_int8_arr_lit_idx = 0;
1136 unsigned crt_int8_arr_lit_idx = 0;
1137 std::vector<int8_t> serialized(lit_buf_size);
1139 for (
const auto& lit : dev_literals) {
1142 switch (lit.which()) {
1144 const auto p = boost::get<int8_t>(&lit);
1146 serialized[off - lit_bytes] = *p;
1150 const auto p = boost::get<int16_t>(&lit);
1152 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1156 const auto p = boost::get<int32_t>(&lit);
1158 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1162 const auto p = boost::get<int64_t>(&lit);
1164 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1168 const auto p = boost::get<float>(&lit);
1170 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1174 const auto p = boost::get<double>(&lit);
1176 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1180 const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
1188 memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
1192 const auto p = boost::get<std::string>(&lit);
1194 int32_t off_and_len = crt_real_str_off << 16;
1195 const auto& crt_real_str = real_strings[crt_real_str_idx];
1196 off_and_len |=
static_cast<int16_t
>(crt_real_str.size());
1197 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1198 memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
1200 crt_real_str_off += crt_real_str.size();
1204 const auto p = boost::get<std::vector<double>>(&lit);
1206 int32_t off_and_len = crt_double_arr_lit_off << 16;
1207 const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
1208 int32_t len = crt_double_arr_lit.size();
1210 off_and_len |=
static_cast<int16_t
>(len);
1211 int32_t double_array_bytesize = len *
sizeof(double);
1212 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1213 memcpy(&serialized[crt_double_arr_lit_off],
1214 crt_double_arr_lit.data(),
1215 double_array_bytesize);
1216 ++crt_double_arr_lit_idx;
1217 crt_double_arr_lit_off += double_array_bytesize;
1221 const auto p = boost::get<std::vector<int32_t>>(&lit);
1223 int32_t off_and_len = crt_int32_arr_lit_off << 16;
1224 const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
1225 int32_t len = crt_int32_arr_lit.size();
1227 off_and_len |=
static_cast<int16_t
>(len);
1228 int32_t int32_array_bytesize = len *
sizeof(int32_t);
1229 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1230 memcpy(&serialized[crt_int32_arr_lit_off],
1231 crt_int32_arr_lit.data(),
1232 int32_array_bytesize);
1233 ++crt_int32_arr_lit_idx;
1234 crt_int32_arr_lit_off += int32_array_bytesize;
1238 const auto p = boost::get<std::vector<int8_t>>(&lit);
1240 int32_t off_and_len = crt_int8_arr_lit_off << 16;
1241 const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
1242 int32_t len = crt_int8_arr_lit.size();
1244 off_and_len |=
static_cast<int16_t
>(len);
1245 int32_t int8_array_bytesize = len;
1246 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1247 memcpy(&serialized[crt_int8_arr_lit_off],
1248 crt_int8_arr_lit.data(),
1249 int8_array_bytesize);
1250 ++crt_int8_arr_lit_idx;
1251 crt_int8_arr_lit_off += int8_array_bytesize;
1255 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
1257 if (p->second == 64) {
1258 int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1259 const auto& crt_align64_int8_arr_lit =
1260 align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1261 int32_t len = crt_align64_int8_arr_lit.size();
1263 off_and_len |=
static_cast<int16_t
>(len);
1264 int32_t align64_int8_array_bytesize = len;
1265 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1266 memcpy(&serialized[crt_align64_int8_arr_lit_off],
1267 crt_align64_int8_arr_lit.data(),
1268 align64_int8_array_bytesize);
1269 ++crt_align64_int8_arr_lit_idx;
1270 crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1271 }
else if (p->second == 32) {
1272 int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1273 const auto& crt_align32_int8_arr_lit =
1274 align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1275 int32_t len = crt_align32_int8_arr_lit.size();
1277 off_and_len |=
static_cast<int16_t
>(len);
1278 int32_t align32_int8_array_bytesize = len;
1279 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1280 memcpy(&serialized[crt_align32_int8_arr_lit_off],
1281 crt_align32_int8_arr_lit.data(),
1282 align32_int8_array_bytesize);
1283 ++crt_align32_int8_arr_lit_idx;
1284 crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1314 const int64_t agg_init_val,
1315 const int8_t out_byte_width,
1316 const int64_t* out_vec,
1317 const size_t out_vec_sz,
1318 const bool is_group_by,
1319 const bool float_argument_input) {
1324 if (0 != agg_init_val) {
1326 int64_t agg_result = agg_init_val;
1327 for (
size_t i = 0; i < out_vec_sz; ++i) {
1330 return {agg_result, 0};
1333 switch (out_byte_width) {
1335 int agg_result =
static_cast<int32_t
>(agg_init_val);
1336 for (
size_t i = 0; i < out_vec_sz; ++i) {
1339 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1340 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1342 const int64_t converted_bin =
1343 float_argument_input
1344 ?
static_cast<int64_t
>(agg_result)
1346 return {converted_bin, 0};
1350 int64_t agg_result = agg_init_val;
1351 for (
size_t i = 0; i < out_vec_sz; ++i) {
1354 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1355 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1357 return {agg_result, 0};
1366 int64_t agg_result = 0;
1367 for (
size_t i = 0; i < out_vec_sz; ++i) {
1368 agg_result += out_vec[i];
1370 return {agg_result, 0};
1373 switch (out_byte_width) {
1376 for (
size_t i = 0; i < out_vec_sz; ++i) {
1377 r += *
reinterpret_cast<const float*
>(may_alias_ptr(&out_vec[i]));
1379 const auto float_bin = *
reinterpret_cast<const int32_t*
>(may_alias_ptr(&r));
1380 const int64_t converted_bin =
1382 return {converted_bin, 0};
1386 for (
size_t i = 0; i < out_vec_sz; ++i) {
1387 r += *
reinterpret_cast<const double*
>(may_alias_ptr(&out_vec[i]));
1389 return {*
reinterpret_cast<const int64_t*
>(may_alias_ptr(&r)), 0};
1398 uint64_t agg_result = 0;
1399 for (
size_t i = 0; i < out_vec_sz; ++i) {
1400 const uint64_t out =
static_cast<uint64_t
>(out_vec[i]);
1403 return {
static_cast<int64_t
>(agg_result), 0};
1407 int64_t agg_result = agg_init_val;
1408 for (
size_t i = 0; i < out_vec_sz; ++i) {
1411 return {agg_result, 0};
1413 switch (out_byte_width) {
1415 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1416 for (
size_t i = 0; i < out_vec_sz; ++i) {
1419 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1420 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1422 const int64_t converted_bin =
1423 float_argument_input
1424 ?
static_cast<int64_t
>(agg_result)
1426 return {converted_bin, 0};
1429 int64_t agg_result = agg_init_val;
1430 for (
size_t i = 0; i < out_vec_sz; ++i) {
1433 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1434 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1436 return {agg_result, 0};
1445 int64_t agg_result = agg_init_val;
1446 for (
size_t i = 0; i < out_vec_sz; ++i) {
1449 return {agg_result, 0};
1451 switch (out_byte_width) {
1453 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1454 for (
size_t i = 0; i < out_vec_sz; ++i) {
1457 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1458 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1460 const int64_t converted_bin =
1461 float_argument_input ?
static_cast<int64_t
>(agg_result)
1463 return {converted_bin, 0};
1466 int64_t agg_result = agg_init_val;
1467 for (
size_t i = 0; i < out_vec_sz; ++i) {
1470 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1471 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1473 return {agg_result, 0};
1480 int64_t agg_result = agg_init_val;
1481 for (
size_t i = 0; i < out_vec_sz; ++i) {
1482 if (out_vec[i] != agg_init_val) {
1483 if (agg_result == agg_init_val) {
1484 agg_result = out_vec[i];
1485 }
else if (out_vec[i] != agg_result) {
1490 return {agg_result, 0};
1493 int64_t agg_result = agg_init_val;
1494 for (
size_t i = 0; i < out_vec_sz; ++i) {
1495 if (out_vec[i] != agg_init_val) {
1496 agg_result = out_vec[i];
1500 return {agg_result, 0};
1511 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1512 std::vector<TargetInfo>
const& targets) {
1513 auto& first = results_per_device.front().first;
1516 if (first_target_idx) {
1517 first->translateDictEncodedColumns(targets, *first_target_idx);
1519 for (
size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1520 const auto& next = results_per_device[dev_idx].first;
1522 if (first_target_idx) {
1523 next->translateDictEncodedColumns(targets, *first_target_idx);
1525 first->append(*next);
1527 return std::move(first);
1542 auto const targets = shared::transform<std::vector<TargetInfo>>(
1544 if (results_per_device.empty()) {
1545 return std::make_shared<ResultSet>(targets,
1552 using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1554 results_per_device.end(),
1555 [](
const IndexedResultSet& lhs,
const IndexedResultSet& rhs) {
1556 CHECK_GE(lhs.second.size(), size_t(1));
1557 CHECK_GE(rhs.second.size(), size_t(1));
1558 return lhs.second.front() < rhs.second.front();
1566 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1567 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1574 if (results_per_device.empty()) {
1575 auto const targets = shared::transform<std::vector<TargetInfo>>(
1577 return std::make_shared<ResultSet>(targets,
1598 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>
1600 const std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device)
1602 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> unique_thread_results;
1603 if (results_per_device.empty()) {
1604 return unique_thread_results;
1606 auto max_ti = [](
int acc,
auto& e) {
return std::max(acc, e.first->getThreadIdx()); };
1607 int const max_thread_idx =
1608 std::accumulate(results_per_device.begin(), results_per_device.end(), -1, max_ti);
1609 std::vector<bool> seen_thread_idxs(max_thread_idx + 1,
false);
1610 for (
const auto&
result : results_per_device) {
1611 const int32_t result_thread_idx =
result.first->getThreadIdx();
1612 if (!seen_thread_idxs[result_thread_idx]) {
1613 seen_thread_idxs[result_thread_idx] =
true;
1614 unique_thread_results.emplace_back(
result);
1617 return unique_thread_results;
1623 const size_t executor_id,
1624 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1625 int64_t* compilation_queue_time) {
1628 *compilation_queue_time =
timer_stop(clock_begin);
1629 const auto& this_result_set = results_per_device[0].first;
1631 this_result_set->getTargetInfos(),
1632 this_result_set->getTargetInitVals(),
1634 return reduction_jit.
codegen();
1640 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1641 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1644 std::shared_ptr<ResultSet> reduced_results;
1646 const auto& first = results_per_device.front().first;
1650 results_per_device.size() > 1) {
1652 results_per_device.begin(),
1653 results_per_device.end(),
1655 [](
const size_t init,
const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1656 const auto& r = rs.first;
1657 return init + r->getQueryMemDesc().getEntryCount();
1659 CHECK(total_entry_count);
1660 auto query_mem_desc = first->getQueryMemDesc();
1662 reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1668 auto result_storage = reduced_results->allocateStorage(
plan_state_->init_agg_vals_);
1669 reduced_results->initializeStorage();
1670 switch (query_mem_desc.getEffectiveKeyWidth()) {
1672 first->getStorage()->moveEntriesToBuffer<int32_t>(
1673 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1676 first->getStorage()->moveEntriesToBuffer<int64_t>(
1677 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1683 reduced_results = first;
1686 int64_t compilation_queue_time = 0;
1687 const auto reduction_code =
1690 for (
size_t i = 1; i < results_per_device.size(); ++i) {
1691 reduced_results->getStorage()->reduce(
1692 *(results_per_device[i].first->getStorage()), {}, reduction_code,
executor_id_);
1694 reduced_results->addCompilationQueueTime(compilation_queue_time);
1695 reduced_results->invalidateCachedRowCount();
1696 return reduced_results;
1701 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1702 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1704 if (results_per_device.size() == 1) {
1705 return std::move(results_per_device.front().first);
1710 for (
const auto&
result : results_per_device) {
1719 std::max(
size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1724 return m.
asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc,
this, top_n, desc);
1729 std::unordered_set<int> available_gpus;
1734 for (
int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1735 available_gpus.insert(gpu_id);
1738 return available_gpus;
1742 const size_t cpu_count,
1743 const size_t gpu_count) {
1745 :
static_cast<size_t>(cpu_count);
1758 <<
"Exploiting a result of filtered count query as output buffer entry count: "
1763 using checked_size_t = boost::multiprecision::number<
1764 boost::multiprecision::cpp_int_backend<64,
1766 boost::multiprecision::unsigned_magnitude,
1767 boost::multiprecision::checked,
1769 checked_size_t checked_max_groups_buffer_entry_guess = 1;
1772 constexpr
size_t max_groups_buffer_entry_guess_cap = 100000000;
1775 for (
const auto& table_info : query_infos) {
1776 CHECK(!table_info.info.fragments.empty());
1777 checked_size_t table_cardinality = 0;
1778 std::for_each(table_info.info.fragments.begin(),
1779 table_info.info.fragments.end(),
1780 [&table_cardinality](
const FragmentInfo& frag_info) {
1781 table_cardinality += frag_info.getNumTuples();
1783 checked_max_groups_buffer_entry_guess *= table_cardinality;
1786 checked_max_groups_buffer_entry_guess = max_groups_buffer_entry_guess_cap;
1787 VLOG(1) <<
"Detect overflow when approximating output buffer entry count, "
1789 << max_groups_buffer_entry_guess_cap;
1791 size_t max_groups_buffer_entry_guess =
1792 std::min(static_cast<size_t>(checked_max_groups_buffer_entry_guess),
1793 max_groups_buffer_entry_guess_cap);
1794 VLOG(1) <<
"Set an approximated output entry count as: "
1795 << max_groups_buffer_entry_guess;
1796 return max_groups_buffer_entry_guess;
1806 return td->tableName;
1813 size_t watchdog_max_projected_rows_per_device,
1815 const int device_count) {
1817 return device_count * watchdog_max_projected_rows_per_device;
1819 return watchdog_max_projected_rows_per_device;
1823 const std::vector<InputTableInfo>& table_infos,
1825 const int device_count) {
1826 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1827 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1831 size_t watchdog_max_projected_rows_per_device =
1835 watchdog_max_projected_rows_per_device =
1837 VLOG(1) <<
"Set the watchdog per device maximum projection limit: "
1838 << watchdog_max_projected_rows_per_device <<
" by a query hint";
1840 if (!ra_exe_unit.
scan_limit && table_infos.size() == 1 &&
1841 table_infos.front().info.getPhysicalNumTuples() <
1842 watchdog_max_projected_rows_per_device) {
1856 watchdog_max_projected_rows_per_device, device_type, device_count))) {
1857 std::vector<std::string> table_names;
1858 const auto& input_descs = ra_exe_unit.
input_descs;
1859 for (
const auto& input_desc : input_descs) {
1864 "Projection query would require a scan without a limit on table(s): " +
1868 "Projection query output result set on table(s): " +
1871 " rows, which is more than the current system limit of " +
1873 watchdog_max_projected_rows_per_device, device_type, device_count)));
1882 const auto inner_table_key = ra_exe_unit.
input_descs.back().getTableKey();
1884 std::optional<size_t> inner_table_idx;
1885 for (
size_t i = 0; i < query_infos.size(); ++i) {
1886 if (query_infos[i].table_key == inner_table_key) {
1887 inner_table_idx = i;
1891 CHECK(inner_table_idx);
1892 return query_infos[*inner_table_idx].info.getNumTuples();
1897 template <
typename T>
1899 std::vector<std::string> expr_strs;
1900 for (
const auto& expr : expr_container) {
1902 expr_strs.emplace_back(
"NULL");
1904 expr_strs.emplace_back(expr->toString());
1912 const std::list<Analyzer::OrderEntry>& expr_container) {
1913 std::vector<std::string> expr_strs;
1914 for (
const auto& expr : expr_container) {
1915 expr_strs.emplace_back(expr.toString());
1921 switch (algorithm) {
1925 return "Speculative Top N";
1927 return "Streaming Top N";
1938 std::ostringstream os;
1940 const auto& scan_desc = input_col_desc->getScanDesc();
1941 os << scan_desc.getTableKey() <<
"," << input_col_desc->getColId() <<
","
1942 << scan_desc.getNestLevel();
1943 table_keys.emplace(scan_desc.getTableKey());
1948 os << qual->toString() <<
",";
1952 if (!ra_exe_unit.
quals.empty()) {
1953 for (
const auto& qual : ra_exe_unit.
quals) {
1955 os << qual->toString() <<
",";
1960 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1961 const auto& join_condition = ra_exe_unit.
join_quals[i];
1963 for (
const auto& qual : join_condition.quals) {
1965 os << qual->toString() <<
",";
1973 os << qual->toString() <<
",";
1979 os << expr->toString() <<
",";
1988 return key == other.
key;
1996 return table_keys.find(table_key) != table_keys.end();
2001 os <<
"\n\tTable/Col/Levels: ";
2003 const auto& scan_desc = input_col_desc->getScanDesc();
2004 os <<
"(" << scan_desc.getTableKey() <<
", " << input_col_desc->getColId() <<
", "
2005 << scan_desc.getNestLevel() <<
") ";
2008 os <<
"\n\tSimple Quals: "
2012 if (!ra_exe_unit.
quals.empty()) {
2017 os <<
"\n\tJoin Quals: ";
2018 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
2019 const auto& join_condition = ra_exe_unit.
join_quals[i];
2025 os <<
"\n\tGroup By: "
2029 os <<
"\n\tProjected targets: "
2032 os <<
"\n\tSort Info: ";
2033 const auto& sort_info = ra_exe_unit.
sort_info;
2034 os <<
"\n\t Order Entries: "
2037 std::string limit_str = sort_info.limit ?
std::to_string(*sort_info.limit) :
"N/A";
2038 os <<
"\n\t Limit: " << limit_str;
2043 os <<
"\n\tUnion: " << std::string(*ra_exe_unit.
union_all ?
"UNION ALL" :
"UNION");
2051 const size_t new_scan_limit) {
2055 ra_exe_unit_in.
quals,
2076 const std::vector<InputTableInfo>& query_infos,
2081 const bool has_cardinality_estimation,
2083 VLOG(1) <<
"Executor " <<
executor_id_ <<
" is executing work unit:" << ra_exe_unit_in;
2106 has_cardinality_estimation,
2112 result->setValidationOnlyRes();
2127 has_cardinality_estimation,
2133 result->setValidationOnlyRes();
2141 size_t& max_groups_buffer_entry_guess,
2143 const bool allow_single_frag_table_opt,
2144 const std::vector<InputTableInfo>& query_infos,
2148 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
2150 const bool has_cardinality_estimation,
2153 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
2155 CHECK(!query_infos.empty());
2156 if (!max_groups_buffer_entry_guess) {
2161 max_groups_buffer_entry_guess =
2175 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2176 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2180 query_mem_desc_owned =
2181 query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
2183 has_cardinality_estimation,
2192 CHECK(query_mem_desc_owned);
2193 crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
2195 VLOG(1) << e.what();
2201 plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2202 CHECK(!query_mem_desc_owned);
2203 query_mem_desc_owned.reset(
2210 if (query_mem_desc_owned->canUsePerDeviceCardinality(ra_exe_unit)) {
2211 auto const max_rows_per_device =
2212 query_mem_desc_owned->getMaxPerDeviceCardinality(ra_exe_unit);
2213 if (max_rows_per_device && *max_rows_per_device >= 0 &&
2214 *max_rows_per_device < query_mem_desc_owned->getEntryCount()) {
2215 VLOG(1) <<
"Setting the max per device cardinality of {max_rows_per_device} as "
2216 "the new scan limit: "
2217 << *max_rows_per_device;
2226 const auto context_count =
2235 allow_single_frag_table_opt,
2237 *query_comp_desc_owned,
2238 *query_mem_desc_owned,
2245 query_comp_desc_owned->getDeviceType(),
2246 ra_exe_unit.input_descs,
2247 *query_mem_desc_owned);
2250 shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
2262 static_cast<size_t>(crt_min_byte_width << 1) <=
sizeof(int64_t)) {
2263 crt_min_byte_width <<= 1;
2272 std::string curRunningSession{
""};
2273 std::string curRunningQuerySubmittedTime{
""};
2274 bool sessionEnrolled =
false;
2279 curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
2283 if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
2286 curRunningQuerySubmittedTime,
2293 auto row =
result.first->getNextRow(
false,
false);
2295 auto scalar_r = boost::get<ScalarTargetValue>(&row[0]);
2297 auto p = boost::get<int64_t>(scalar_r);
2300 auto frag_ids =
result.second;
2302 <<
"} : " <<
static_cast<size_t>(*p);
2304 static_cast<size_t>(*p));
2305 result.first->moveToBegin();
2310 *query_mem_desc_owned,
2311 query_comp_desc_owned->getDeviceType(),
2316 crt_min_byte_width <<= 1;
2320 <<
", what(): " << e.what();
2326 }
while (static_cast<size_t>(crt_min_byte_width) <=
sizeof(int64_t));
2328 return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2343 const std::set<size_t>& fragment_indexes_param) {
2344 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
2347 std::vector<InputTableInfo> table_infos{table_info};
2351 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2352 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2354 query_mem_desc_owned =
2355 query_comp_desc_owned->compile(0,
2367 CHECK(query_mem_desc_owned);
2368 CHECK_EQ(
size_t(1), ra_exe_unit.input_descs.size());
2369 const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2372 std::set<size_t> fragment_indexes;
2373 if (fragment_indexes_param.empty()) {
2377 for (
size_t i = 0; i < outer_fragments.size(); i++) {
2378 fragment_indexes.emplace(i);
2381 fragment_indexes = fragment_indexes_param;
2389 for (
auto fragment_index : fragment_indexes) {
2392 FragmentsList fragments_list{{table_key, {fragment_index}}};
2398 *query_comp_desc_owned,
2399 *query_mem_desc_owned,
2404 kernel.
run(
this, 0, kernel_context);
2410 for (
const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2411 CHECK_EQ(result_fragment_indexes.size(), 1);
2412 cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2418 const std::vector<InputTableInfo>& table_infos,
2426 return std::make_shared<ResultSet>(
2450 std::shared_ptr<CompilationContext> compilation_context;
2459 compilation_context =
2460 tf_compilation_context.
compile(exe_unit,
true );
2464 compilation_context,
2470 std::shared_ptr<CompilationContext> compilation_context;
2478 compilation_context =
2479 tf_compilation_context.compile(exe_unit,
false );
2481 return exe_context.
execute(exe_unit,
2483 compilation_context,
2491 return std::make_shared<ResultSet>(query_comp_desc.
getIR());
2496 const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2500 [
this, &dict_id_visitor, &row_set_mem_owner](
const Analyzer::Expr* expr) {
2504 const auto& dict_key = dict_id_visitor.
visit(expr);
2505 if (dict_key.dict_id >= 0) {
2509 visitor.
visit(expr);
2514 visit_expr(group_expr.get());
2517 for (
const auto& group_expr : ra_exe_unit.
quals) {
2518 visit_expr(group_expr.get());
2521 for (
const auto& group_expr : ra_exe_unit.
simple_quals) {
2522 visit_expr(group_expr.get());
2525 const auto visit_target_expr = [&](
const Analyzer::Expr* target_expr) {
2526 const auto& target_type = target_expr->get_type_info();
2527 if (!target_type.is_string() || target_type.get_compression() ==
kENCODING_DICT) {
2531 if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() ==
kSINGLE_VALUE ||
2532 agg_expr->get_aggtype() ==
kSAMPLE || agg_expr->get_aggtype() ==
kMODE) {
2533 visit_expr(agg_expr->get_arg());
2536 visit_expr(target_expr);
2541 std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2543 std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2552 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
2556 if ((agg_info.agg_kind ==
kAVG || agg_info.agg_kind ==
kSUM ||
2557 agg_info.agg_kind ==
kSUM_IF) &&
2558 agg_info.agg_arg_type.get_type() ==
kDOUBLE) {
2562 if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2566 return requested_device_type;
2575 int64_t float_null_val = 0;
2576 *
reinterpret_cast<float*
>(may_alias_ptr(&float_null_val)) =
2578 return float_null_val;
2581 return *
reinterpret_cast<const int64_t*
>(may_alias_ptr(&double_null_val));
2587 std::vector<int64_t>& entry,
2588 const std::vector<Analyzer::Expr*>& target_exprs,
2590 for (
size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2591 const auto target_expr = target_exprs[target_idx];
2593 CHECK(agg_info.is_agg);
2594 target_infos.push_back(agg_info);
2596 const auto executor = query_mem_desc.
getExecutor();
2598 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2599 CHECK(row_set_mem_owner);
2600 const auto& count_distinct_desc =
2603 CHECK(row_set_mem_owner);
2604 auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
2605 count_distinct_desc.bitmapPaddedSizeBytes(),
2607 entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2612 CHECK(row_set_mem_owner);
2613 row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2614 entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2619 if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2621 }
else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2624 }
else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2625 if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2626 for (
int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2629 }
else if (agg_info.sql_type.is_varlen()) {
2633 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2636 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2642 const std::vector<Analyzer::Expr*>& target_exprs_in,
2645 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2646 std::vector<Analyzer::Expr*> target_exprs;
2647 for (
const auto target_expr : target_exprs_in) {
2648 const auto target_expr_copy =
2650 CHECK(target_expr_copy);
2651 auto ti = target_expr->get_type_info();
2653 target_expr_copy->set_type_info(ti);
2654 if (target_expr_copy->get_arg()) {
2655 auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2656 arg_ti.set_notnull(
false);
2657 target_expr_copy->get_arg()->set_type_info(arg_ti);
2659 target_exprs_owned_copies.push_back(target_expr_copy);
2660 target_exprs.push_back(target_expr_copy.get());
2662 std::vector<TargetInfo> target_infos;
2663 std::vector<int64_t> entry;
2665 const auto executor = query_mem_desc.
getExecutor();
2667 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2668 CHECK(row_set_mem_owner);
2669 auto rs = std::make_shared<ResultSet>(target_infos,
2673 executor->blockSize(),
2674 executor->gridSize());
2675 rs->allocateStorage();
2676 rs->fillOneEntry(entry);
2687 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2693 ra_exe_unit.
target_exprs, query_mem_desc, device_type);
2698 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2699 }
catch (
const std::bad_alloc&) {
2703 const auto shard_count =
2708 if (shard_count && !result_per_device.empty()) {
2712 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2729 size_t output_row_index,
2731 const std::vector<uint32_t>& top_permutation) {
2734 for (
const auto sorted_idx : top_permutation) {
2736 for (
size_t group_idx = 0; group_idx < input_query_mem_desc.
getKeyCount();
2738 const auto input_column_ptr =
2741 const auto output_column_ptr =
2744 output_row_index * output_query_mem_desc.
groupColWidth(group_idx);
2745 memcpy(output_column_ptr,
2750 for (
size_t slot_idx = 0; slot_idx < input_query_mem_desc.
getSlotCount();
2752 const auto input_column_ptr =
2755 const auto output_column_ptr =
2758 memcpy(output_column_ptr,
2764 return output_row_index;
2778 size_t output_row_index,
2780 const std::vector<uint32_t>& top_permutation) {
2783 for (
const auto sorted_idx : top_permutation) {
2784 const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.
getRowSize();
2785 memcpy(output_buffer + output_row_index * output_query_mem_desc.
getRowSize(),
2790 return output_row_index;
2802 const auto first_result_set = result_per_device.front().first;
2803 CHECK(first_result_set);
2804 auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2805 CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2808 top_query_mem_desc.setEntryCount(0);
2809 for (
auto&
result : result_per_device) {
2810 const auto result_set =
result.first;
2813 size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2814 top_query_mem_desc.setEntryCount(new_entry_cnt);
2816 auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2817 first_result_set->getDeviceType(),
2819 first_result_set->getRowSetMemOwner(),
2822 auto top_storage = top_result_set->allocateStorage();
2823 size_t top_output_row_idx{0};
2824 for (
auto&
result : result_per_device) {
2825 const auto result_set =
result.first;
2827 const auto& top_permutation = result_set->getPermutationBuffer();
2828 CHECK_LE(top_permutation.size(), top_n);
2829 if (top_query_mem_desc.didOutputColumnar()) {
2831 result_set->getQueryMemDesc(),
2844 CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2845 return top_result_set;
2848 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2850 std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2852 CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2853 for (
size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2854 const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2856 std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2864 for (
const auto& col : fetched_cols) {
2865 if (col.is_lazily_fetched) {
2878 const std::vector<InputTableInfo>& table_infos,
2881 const bool allow_single_frag_table_opt,
2882 const size_t context_count,
2886 std::unordered_set<int>& available_gpus,
2887 int& available_cpus) {
2888 std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2895 : std::vector<Data_Namespace::MemoryInfo>{},
2901 const bool uses_lazy_fetch =
2906 const auto device_count =
deviceCount(device_type);
2909 fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2913 use_multifrag_kernel,
2916 if (eo.
with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2920 if (use_multifrag_kernel) {
2921 VLOG(1) <<
"Creating multifrag execution kernels";
2929 auto multifrag_kernel_dispatch = [&ra_exe_unit,
2935 render_info](
const int device_id,
2937 const int64_t rowid_lookup_key) {
2938 execution_kernels.emplace_back(
2939 std::make_unique<ExecutionKernel>(ra_exe_unit,
2951 fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2953 VLOG(1) <<
"Creating one execution kernel per fragment";
2958 table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2959 const auto max_frag_size =
2960 table_infos.front().info.getFragmentNumTuplesUpperBound();
2963 <<
" to match max fragment size " << max_frag_size
2964 <<
" for kernel per fragment execution path.";
2969 size_t frag_list_idx{0};
2970 auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2978 render_info](
const int device_id,
2980 const int64_t rowid_lookup_key) {
2981 if (!frag_list.size()) {
2986 execution_kernels.emplace_back(
2987 std::make_unique<ExecutionKernel>(ra_exe_unit,
3001 fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
3005 return execution_kernels;
3009 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3011 const size_t requested_num_threads) {
3013 const size_t num_threads =
3015 ? std::min(kernels.size(),
static_cast<size_t>(
cpu_threads()))
3016 : requested_num_threads;
3017 tbb::task_arena local_arena(num_threads);
3022 LOG(
EXECUTOR) <<
"Launching query step with " << num_threads <<
" threads.";
3026 kernels.empty() ?
nullptr : &kernels[0]->ra_exe_unit_;
3030 shared_context.setThreadPool(&tg);
3032 ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(
nullptr); });
3035 VLOG(1) <<
"Launching " << kernels.size() <<
" kernels for query on "
3037 <<
" using pool of " << num_threads <<
" threads.";
3038 size_t kernel_idx = 1;
3040 for (
auto& kernel : kernels) {
3041 CHECK(kernel.get());
3043 local_arena.execute([&] {
3050 crt_kernel_idx = kernel_idx++] {
3056 const size_t old_thread_idx = crt_kernel_idx % num_threads;
3057 const size_t thread_idx = tbb::this_task_arena::current_thread_index();
3059 <<
" Old thread idx: " << old_thread_idx;
3061 const size_t thread_idx = crt_kernel_idx % num_threads;
3063 kernel->run(
this, thread_idx, shared_context);
3070 local_arena.execute([&] { tg.
wait(); });
3075 for (
auto& exec_ctx : shared_context.getTlsExecutionContext()) {
3082 results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
3084 results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
3093 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3105 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3107 const std::vector<InputDescriptor>& input_descs,
3112 const size_t num_kernels = kernels.size();
3113 constexpr
bool cap_slots =
false;
3114 const size_t num_compute_slots =
3116 ? std::min(num_kernels,
3118 ->get_resource_info(
3124 const size_t cpu_result_mem_bytes_per_kernel =
3127 std::vector<std::pair<int32_t, FragmentsList>> kernel_fragments_list;
3128 kernel_fragments_list.reserve(num_kernels);
3129 for (
auto& kernel : kernels) {
3130 const auto device_id = kernel->get_chosen_device_id();
3131 const auto frag_list = kernel->get_fragment_list();
3132 if (!frag_list.empty()) {
3133 kernel_fragments_list.emplace_back(std::make_pair(device_id, frag_list));
3137 device_type, input_descs, shared_context.
getQueryInfos(), kernel_fragments_list);
3139 auto gen_resource_request_info = [device_type,
3141 cpu_result_mem_bytes_per_kernel,
3142 &chunk_request_info,
3147 static_cast<size_t>(0),
3148 static_cast<size_t>(0),
3149 static_cast<size_t>(0),
3152 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3153 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3157 const size_t min_cpu_slots{1};
3158 const size_t min_cpu_result_mem =
3160 ? cpu_result_mem_bytes_per_kernel * min_cpu_slots
3161 : cpu_result_mem_bytes_per_kernel * num_compute_slots;
3164 static_cast<size_t>(0),
3169 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3173 .threadsCanReuseGroupByBuffers());
3177 const auto resource_request_info = gen_resource_request_info();
3180 const bool is_empty_request =
3181 resource_request_info.cpu_slots == 0UL && resource_request_info.gpu_slots == 0UL;
3182 auto resource_handle =
3183 is_empty_request ?
nullptr
3185 const auto num_cpu_threads =
3186 is_empty_request ? 0UL : resource_handle->get_resource_grant().cpu_slots;
3188 const auto num_gpu_slots =
3189 is_empty_request ? 0UL : resource_handle->get_resource_grant().gpu_slots;
3190 VLOG(1) <<
"In Executor::LaunchKernels executor " <<
getExecutorId() <<
" requested "
3191 <<
"between " << resource_request_info.min_gpu_slots <<
" and "
3192 << resource_request_info.gpu_slots <<
" GPU slots, and was granted "
3193 << num_gpu_slots <<
" GPU slots.";
3195 VLOG(1) <<
"In Executor::LaunchKernels executor " <<
getExecutorId() <<
" requested "
3196 <<
"between " << resource_request_info.min_cpu_slots <<
" and "
3197 << resource_request_info.cpu_slots <<
" CPU slots, and was granted "
3198 << num_cpu_threads <<
" CPU slots.";
3201 launchKernelsImpl(shared_context, std::move(kernels), device_type, num_cpu_threads);
3207 const size_t table_idx,
3208 const size_t outer_frag_idx,
3209 std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
3210 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3211 inner_table_id_to_join_condition) {
3212 const auto& table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
3213 auto table_frags_it = selected_tables_fragments.find(table_key);
3214 CHECK(table_frags_it != selected_tables_fragments.end());
3215 const auto& outer_input_desc = ra_exe_unit.
input_descs[0];
3216 const auto outer_table_fragments_it =
3217 selected_tables_fragments.find(outer_input_desc.getTableKey());
3218 const auto outer_table_fragments = outer_table_fragments_it->second;
3219 CHECK(outer_table_fragments_it != selected_tables_fragments.end());
3220 CHECK_LT(outer_frag_idx, outer_table_fragments->size());
3222 return {outer_frag_idx};
3224 const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
3225 auto& inner_frags = table_frags_it->second;
3227 std::vector<size_t> all_frag_ids;
3228 for (
size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
3230 const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
3234 inner_table_id_to_join_condition,
3239 all_frag_ids.push_back(inner_frag_idx);
3241 return all_frag_ids;
3249 const int table_idx,
3250 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3251 inner_table_id_to_join_condition,
3257 CHECK(table_idx >= 0 &&
3258 static_cast<size_t>(table_idx) < ra_exe_unit.
input_descs.size());
3259 const auto& inner_table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
3261 if (outer_fragment_info.
shard == -1 || inner_fragment_info.
shard == -1 ||
3262 outer_fragment_info.
shard == inner_fragment_info.
shard) {
3267 CHECK(!inner_table_id_to_join_condition.empty());
3268 auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
3269 CHECK(condition_it != inner_table_id_to_join_condition.end());
3270 join_condition = condition_it->second;
3271 CHECK(join_condition);
3274 plan_state_->join_info_.join_hash_tables_.size());
3275 for (
size_t i = 0; i <
plan_state_->join_info_.join_hash_tables_.size(); ++i) {
3276 if (
plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
3278 CHECK(!join_condition);
3279 join_condition =
plan_state_->join_info_.equi_join_tautologies_[i].get();
3283 if (!join_condition) {
3287 if (join_condition->is_bbox_intersect_oper()) {
3290 size_t shard_count{0};
3291 if (dynamic_cast<const Analyzer::ExpressionTuple*>(
3292 join_condition->get_left_operand())) {
3293 auto inner_outer_pairs =
3296 join_condition,
this, inner_outer_pairs);
3300 if (shard_count && !ra_exe_unit.
join_quals.empty()) {
3301 plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
3310 const auto col_id = col_desc->
getColId();
3317 const std::vector<InputDescriptor>& input_descs,
3318 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3319 std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
3320 for (
auto& desc : input_descs) {
3321 const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
3322 CHECK(fragments_it != all_tables_fragments.end());
3323 const auto& fragments = *fragments_it->second;
3324 std::vector<uint64_t> frag_offsets(fragments.size(), 0);
3325 for (
size_t i = 0, off = 0; i < fragments.size(); ++i) {
3326 frag_offsets[i] = off;
3327 off += fragments[i].getNumTuples();
3329 tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
3331 return tab_id_to_frag_offsets;
3334 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
3337 const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
3338 const std::vector<InputDescriptor>& input_descs,
3339 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3340 std::vector<std::vector<int64_t>> all_num_rows;
3341 std::vector<std::vector<uint64_t>> all_frag_offsets;
3342 const auto tab_id_to_frag_offsets =
3344 std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
3345 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
3346 std::vector<int64_t> num_rows;
3347 std::vector<uint64_t> frag_offsets;
3349 CHECK_EQ(selected_frag_ids.size(), input_descs.size());
3351 for (
size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
3352 const auto frag_id = ra_exe_unit.
union_all ? 0 : selected_frag_ids[tab_idx];
3353 const auto fragments_it =
3354 all_tables_fragments.find(input_descs[tab_idx].getTableKey());
3355 CHECK(fragments_it != all_tables_fragments.end());
3356 const auto& fragments = *fragments_it->second;
3357 if (ra_exe_unit.
join_quals.empty() || tab_idx == 0 ||
3358 plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
3359 const auto& fragment = fragments[frag_id];
3360 num_rows.push_back(fragment.getNumTuples());
3362 size_t total_row_count{0};
3363 for (
const auto& fragment : fragments) {
3364 total_row_count += fragment.getNumTuples();
3366 num_rows.push_back(total_row_count);
3368 const auto frag_offsets_it =
3369 tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
3370 CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
3371 const auto& offsets = frag_offsets_it->second;
3373 frag_offsets.push_back(offsets[frag_id]);
3375 all_num_rows.push_back(num_rows);
3377 all_frag_offsets.push_back(frag_offsets);
3379 return {all_num_rows, all_frag_offsets};
3387 const auto& input_descs = ra_exe_unit.
input_descs;
3389 if (nest_level < 1 ||
3391 ra_exe_unit.
join_quals.empty() || input_descs.size() < 2 ||
3393 plan_state_->isLazyFetchColumn(inner_col_desc))) {
3397 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3398 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3399 const auto& fragments = selected_fragments[nest_level].fragment_ids;
3400 return fragments.size() > 1;
3411 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3412 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3413 const auto& fragments = selected_fragments[nest_level].fragment_ids;
3414 auto need_linearize =
3417 return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
3429 const int device_id,
3431 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3433 std::list<ChunkIter>& chunk_iterators,
3434 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3436 const size_t thread_idx,
3437 const bool allow_runtime_interrupt) {
3441 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3442 std::vector<size_t> local_col_to_frag_pos;
3444 local_col_to_frag_pos,
3450 selected_fragments_crossjoin);
3451 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3452 std::vector<std::vector<int64_t>> all_num_rows;
3453 std::vector<std::vector<uint64_t>> all_frag_offsets;
3454 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
3455 std::vector<const int8_t*> frag_col_buffers(
3457 for (
const auto& col_id : col_global_ids) {
3458 if (allow_runtime_interrupt) {
3459 bool isInterrupted =
false;
3467 if (isInterrupted) {
3476 if (cd && cd->isVirtualCol) {
3480 const auto& table_key = col_id->getScanDesc().getTableKey();
3481 const auto fragments_it = all_tables_fragments.find(table_key);
3482 CHECK(fragments_it != all_tables_fragments.end());
3483 const auto fragments = fragments_it->second;
3484 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3486 CHECK_LT(static_cast<size_t>(it->second),
3488 const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3489 if (!fragments->size()) {
3492 CHECK_LT(frag_id, fragments->size());
3493 auto memory_level_for_column = memory_level;
3495 col_id->getColId()};
3500 frag_col_buffers[it->second] =
3502 memory_level_for_column,
3513 cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3514 bool for_lazy_fetch =
false;
3515 if (
plan_state_->isColumnToNotFetch(tbl_col_key)) {
3516 for_lazy_fetch =
true;
3517 VLOG(2) <<
"Try to linearize lazy fetch column (col_id: " << cd->columnId
3518 <<
", col_name: " << cd->columnName <<
")";
3521 col_id->getScanDesc().getTableKey(),
3523 all_tables_fragments,
3527 for_lazy_fetch ? 0 : device_id,
3532 col_id->getScanDesc().getTableKey(),
3534 all_tables_fragments,
3535 memory_level_for_column,
3542 col_id->getScanDesc().getTableKey(),
3545 all_tables_fragments,
3548 memory_level_for_column,
3554 all_frag_col_buffers.push_back(frag_col_buffers);
3557 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
3558 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3563 std::vector<InputDescriptor>
const& input_descs) {
3564 auto const has_table_key = [&table_key](
InputDescriptor const& input_desc) {
3565 return table_key == input_desc.getTableKey();
3567 return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3568 input_descs.begin();
3573 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3574 auto const has_table_key = [&table_key](
auto const& input_desc) {
3575 return table_key == input_desc->getScanDesc().getTableKey();
3577 return std::distance(
3578 input_col_descs.begin(),
3579 std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3584 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3585 std::list<std::shared_ptr<const InputColDescriptor>> selected;
3586 for (
auto const& input_col_desc : input_col_descs) {
3587 if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3588 selected.push_back(input_col_desc);
3596 int8_t
const*
const ptr,
3597 size_t const local_col_id,
3599 size_t const begin = local_col_id - local_col_id %
N;
3600 size_t const end = begin +
N;
3601 CHECK_LE(end, frag_col_buffers.size()) << (
void*)ptr <<
' ' << local_col_id <<
' ' <<
N;
3602 for (
size_t i = begin; i < end; ++i) {
3603 frag_col_buffers[i] = ptr;
3613 const int device_id,
3615 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3617 std::list<ChunkIter>& chunk_iterators,
3618 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3620 const size_t thread_idx,
3621 const bool allow_runtime_interrupt) {
3625 CHECK_EQ(1u, selected_fragments.size());
3628 auto const& input_descs = ra_exe_unit.
input_descs;
3629 const auto& selected_table_key = selected_fragments.front().table_key;
3630 size_t const input_descs_index =
3632 CHECK_LT(input_descs_index, input_descs.size());
3633 size_t const input_col_descs_index =
3636 VLOG(2) <<
"selected_table_key=" << selected_table_key
3637 <<
" input_descs_index=" << input_descs_index
3638 <<
" input_col_descs_index=" << input_col_descs_index
3640 <<
" ra_exe_unit.input_col_descs="
3643 std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3645 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3648 selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3651 selected_fragments_crossjoin);
3653 if (allow_runtime_interrupt) {
3654 bool isInterrupted =
false;
3661 if (isInterrupted) {
3665 std::vector<const int8_t*> frag_col_buffers(
3667 for (
const auto& col_id : selected_input_col_descs) {
3670 if (cd && cd->isVirtualCol) {
3674 const auto fragments_it = all_tables_fragments.find(selected_table_key);
3675 CHECK(fragments_it != all_tables_fragments.end());
3676 const auto fragments = fragments_it->second;
3677 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3679 size_t const local_col_id = it->second;
3681 constexpr
size_t frag_id = 0;
3682 if (fragments->empty()) {
3686 plan_state_->isColumnToFetch({selected_table_key, col_id->getColId()})
3692 col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3696 all_tables_fragments,
3697 memory_level_for_column,
3705 all_tables_fragments,
3708 memory_level_for_column,
3713 set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3716 ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3721 <<
" input_descs_index=" << input_descs_index
3722 <<
" input_col_descs_index=" << input_col_descs_index;
3723 return {{std::move(frag_col_buffers)},
3724 {{num_rows[0][input_descs_index]}},
3725 {{frag_offsets[0][input_descs_index]}}};
3729 const size_t scan_idx,
3733 !
plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3734 !selected_fragments[scan_idx].fragment_ids.empty()) {
3739 return selected_fragments[scan_idx].fragment_ids;
3743 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3744 std::vector<size_t>& local_col_to_frag_pos,
3745 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3748 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
3750 const auto& input_descs = ra_exe_unit.
input_descs;
3751 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3752 const auto& table_key = input_descs[scan_idx].getTableKey();
3753 CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3754 selected_fragments_crossjoin.push_back(
3756 for (
const auto& col_id : col_global_ids) {
3758 const auto& input_desc = col_id->getScanDesc();
3759 if (input_desc.getTableKey() != table_key ||
3760 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
3763 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3765 CHECK_LT(static_cast<size_t>(it->second),
3767 local_col_to_frag_pos[it->second] = frag_pos;
3774 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3777 const auto& input_descs = ra_exe_unit.
input_descs;
3778 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3780 if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3781 selected_fragments_crossjoin.push_back({size_t(1)});
3790 OutVecOwner(
const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3792 for (
auto out : out_vec_) {
3805 const bool hoist_literals,
3807 const std::vector<Analyzer::Expr*>& target_exprs,
3809 std::vector<std::vector<const int8_t*>>& col_buffers,
3811 const std::vector<std::vector<int64_t>>& num_rows,
3812 const std::vector<std::vector<uint64_t>>& frag_offsets,
3814 const int device_id,
3815 const uint32_t start_rowid,
3816 const uint32_t num_tables,
3817 const bool allow_runtime_interrupt,
3819 const bool optimize_cuda_block_and_grid_sizes,
3820 const int64_t rows_to_process) {
3823 CHECK(!results || !(*results));
3824 if (col_buffers.empty()) {
3833 <<
"CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3834 "currently unsupported.";
3839 std::vector<int64_t*> out_vec;
3842 std::unique_ptr<OutVecOwner> output_memory_scope;
3843 if (allow_runtime_interrupt) {
3844 bool isInterrupted =
false;
3851 if (isInterrupted) {
3861 CHECK(cpu_generated_code);
3873 join_hash_table_ptrs,
3875 output_memory_scope.reset(
new OutVecOwner(out_vec));
3879 CHECK(gpu_generated_code);
3897 allow_runtime_interrupt,
3898 join_hash_table_ptrs,
3899 render_allocator_map_ptr,
3900 optimize_cuda_block_and_grid_sizes);
3901 output_memory_scope.reset(
new OutVecOwner(out_vec));
3904 }
catch (
const std::exception& e) {
3905 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3927 std::vector<int64_t> reduced_outs;
3928 const auto num_frags = col_buffers.size();
3929 const size_t entry_count =
3935 if (
size_t(1) == entry_count) {
3936 for (
auto out : out_vec) {
3938 reduced_outs.push_back(*out);
3941 size_t out_vec_idx = 0;
3943 for (
const auto target_expr : target_exprs) {
3945 CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3946 << target_expr->toString();
3948 const int num_iterations = agg_info.sql_type.is_geometry()
3949 ? agg_info.sql_type.get_physical_coord_cols()
3952 for (
int i = 0; i < num_iterations; i++) {
3956 shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3957 bool const check = shared::
3958 is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3960 CHECK(check) << agg_info.agg_kind;
3961 val1 = out_vec[out_vec_idx][0];
3964 const auto chosen_bytes =
static_cast<size_t>(
3970 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3971 out_vec[out_vec_idx],
3974 float_argument_input);
3979 reduced_outs.push_back(val1);
3980 if (agg_info.agg_kind ==
kAVG ||
3981 (agg_info.agg_kind ==
kSAMPLE &&
3982 (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3983 const auto chosen_bytes =
static_cast<size_t>(
3988 agg_info.agg_kind ==
kAVG ?
kCOUNT : agg_info.agg_kind,
3991 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
3992 out_vec[out_vec_idx + 1],
3999 reduced_outs.push_back(val2);
4012 auto rows_ptr = std::shared_ptr<ResultSet>(
4014 rows_ptr->fillOneEntry(reduced_outs);
4015 *results = std::move(rows_ptr);
4023 return results && results->rowCount() < scan_limit;
4031 const bool hoist_literals,
4034 std::vector<std::vector<const int8_t*>>& col_buffers,
4035 const std::vector<size_t> outer_tab_frag_ids,
4037 const std::vector<std::vector<int64_t>>& num_rows,
4038 const std::vector<std::vector<uint64_t>>& frag_offsets,
4040 const int device_id,
4042 const int64_t scan_limit,
4043 const uint32_t start_rowid,
4044 const uint32_t num_tables,
4045 const bool allow_runtime_interrupt,
4047 const bool optimize_cuda_block_and_grid_sizes,
4048 const int64_t rows_to_process) {
4052 CHECK(!results || !(*results));
4053 if (col_buffers.empty()) {
4064 if (allow_runtime_interrupt) {
4065 bool isInterrupted =
false;
4072 if (isInterrupted) {
4085 VLOG(2) <<
"bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.
union_all)
4086 <<
" ra_exe_unit.input_descs="
4088 <<
" ra_exe_unit.input_col_descs="
4090 <<
" ra_exe_unit.scan_limit=" << ra_exe_unit.
scan_limit
4093 <<
" query_exe_context->query_buffers_->num_rows_="
4095 <<
" query_exe_context->query_mem_desc_.getEntryCount()="
4097 <<
" device_id=" << device_id <<
" outer_table_key=" << outer_table_key
4098 <<
" scan_limit=" << scan_limit <<
" start_rowid=" << start_rowid
4099 <<
" num_tables=" << num_tables;
4106 std::stable_sort(ra_exe_unit_copy.
input_descs.begin(),
4108 [outer_table_key](
auto const&
a,
auto const& b) {
4109 return a.getTableKey() == outer_table_key &&
4110 b.getTableKey() != outer_table_key;
4113 ra_exe_unit_copy.
input_descs.back().getTableKey() != outer_table_key) {
4118 [outer_table_key](
auto const& input_col_desc) {
4119 return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
4125 const int32_t scan_limit_for_query =
4127 const int32_t max_matched = scan_limit_for_query == 0
4129 : scan_limit_for_query;
4132 CHECK(cpu_generated_code);
4144 join_hash_table_ptrs,
4150 CHECK(gpu_generated_code);
4167 allow_runtime_interrupt,
4168 join_hash_table_ptrs,
4169 render_allocator_map_ptr,
4170 optimize_cuda_block_and_grid_sizes);
4177 }
catch (
const std::exception& e) {
4178 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
4194 *results = query_exe_context->
getRowSet(ra_exe_unit_copy,
4197 VLOG(2) <<
"results->rowCount()=" << (*results)->rowCount();
4198 (*results)->holdLiterals(hoist_buf);
4200 if (error_code < 0 && render_allocator_map_ptr) {
4201 auto const adjusted_scan_limit =
4205 if (adjusted_scan_limit != 0) {
4211 if (results && error_code &&
4220 const int device_id) {
4221 std::vector<int8_t*> table_ptrs;
4222 const auto& join_hash_tables =
plan_state_->join_info_.join_hash_tables_;
4223 for (
auto hash_table : join_hash_tables) {
4225 CHECK(table_ptrs.empty());
4228 table_ptrs.push_back(hash_table->getJoinHashBuffer(
4235 const std::vector<InputTableInfo>& query_infos,
4240 const bool contains_left_deep_outer_join =
4241 ra_exe_unit && std::find_if(ra_exe_unit->
join_quals.begin(),
4247 new CgenState(query_infos.size(), contains_left_deep_outer_join,
this));
4255 const std::vector<InputTableInfo>& query_infos) {
4257 const auto ld_count = input_descs.size();
4259 for (
size_t i = 0; i < ld_count; ++i) {
4261 const auto frag_count = query_infos[i].info.fragments.size();
4265 if (frag_count > 1) {
4267 frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
4276 const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
4277 const std::vector<InputTableInfo>& query_infos,
4280 const HashType preferred_hash_type,
4287 "Bounding box intersection disabled, attempting to fall back to loop join"};
4297 preferred_hash_type,
4301 hashtable_build_dag_map,
4303 table_id_to_node_map);
4306 return {
nullptr, e.what()};
4312 CHECK(!dev_props.empty());
4313 return dev_props.front().warpSize;
4328 return std::max((
unsigned)2,
4364 return static_cast<int64_t
>(dev_props.front().clockKhz) * milliseconds;
4371 if (value->getType()->isIntegerTy() && from_ti.
is_number() && to_ti.
is_fp() &&
4376 fp_type = llvm::Type::getFloatTy(
cgen_state_->context_);
4379 fp_type = llvm::Type::getDoubleTy(
cgen_state_->context_);
4384 value =
cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
4396 CHECK(val->getType()->isPointerTy());
4398 const auto val_ptr_type =
static_cast<llvm::PointerType*
>(val->getType());
4399 const auto val_type = val_ptr_type->getPointerElementType();
4400 size_t val_width = 0;
4401 if (val_type->isIntegerTy()) {
4402 val_width = val_type->getIntegerBitWidth();
4404 if (val_type->isFloatTy()) {
4407 CHECK(val_type->isDoubleTy());
4412 if (bitWidth == val_width) {
4419 #define EXECUTE_INCLUDE
4426 #undef EXECUTE_INCLUDE
4432 auto deleted_cols_it = deleted_cols_map.find(table_key);
4433 if (deleted_cols_it == deleted_cols_map.end()) {
4434 CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
4436 CHECK_EQ(deleted_cd, deleted_cols_it->second);
4447 auto ra_exe_unit_with_deleted = ra_exe_unit;
4449 for (
const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
4453 const auto& table_key = input_table.getTableKey();
4454 const auto catalog =
4457 const auto td = catalog->getMetadataForTable(table_key.table_id);
4459 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4463 CHECK(deleted_cd->columnType.is_boolean());
4466 for (
const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
4467 if (input_col.get()->getColId() == deleted_cd->columnId &&
4468 input_col.get()->getScanDesc().getTableKey() == table_key &&
4469 input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
4477 ra_exe_unit_with_deleted.input_col_descs.emplace_back(
4479 deleted_cd->tableId,
4481 input_table.getNestLevel()));
4485 return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
4493 const int64_t chunk_min,
4494 const int64_t chunk_max,
4499 CHECK(ldim != rdim);
4503 return {
true, chunk_min / scale, chunk_max / scale};
4507 boost::multiprecision::cpp_int_backend<64,
4509 boost::multiprecision::signed_magnitude,
4510 boost::multiprecision::checked,
4515 std::make_tuple(
true,
4519 }
catch (
const std::overflow_error& e) {
4522 return std::make_tuple(
false, chunk_min, chunk_max);
4532 if (table_key.table_id < 0) {
4536 const auto catalog =
4539 const auto td = catalog->getMetadataForTable(fragment.
physicalTableId);
4541 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4546 const auto& chunk_type = deleted_cd->columnType;
4547 CHECK(chunk_type.is_boolean());
4549 const auto deleted_col_id = deleted_cd->columnId;
4552 const int64_t chunk_min =
4554 const int64_t chunk_max =
4556 if (chunk_min == 1 && chunk_max == 1) {
4574 double chunk_min{0.};
4575 double chunk_max{0.};
4579 if (chunk_min > chunk_max) {
4588 const auto rhs_val = rhs_type ==
kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4594 if (chunk_max < rhs_val) {
4599 if (chunk_max <= rhs_val) {
4604 if (chunk_min > rhs_val) {
4609 if (chunk_min >= rhs_val) {
4614 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4627 const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4628 const std::vector<uint64_t>& frag_offsets,
4629 const size_t frag_idx) {
4633 <<
", fragment id: " << frag_idx;
4637 for (
const auto& simple_qual : simple_quals) {
4638 const auto comp_expr =
4646 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4650 CHECK(lhs_uexpr->get_optype() ==
4653 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4660 const auto rhs = comp_expr->get_right_operand();
4666 if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4667 !lhs->get_type_info().is_fp()) {
4670 if (lhs->get_type_info().is_fp()) {
4671 const auto fragment_skip_status =
4673 switch (fragment_skip_status) {
4688 if (lhs_col->get_type_info().is_timestamp() &&
4689 rhs_const->get_type_info().is_any<
kTIME>()) {
4696 const int col_id = lhs_col->getColumnKey().column_id;
4698 int64_t chunk_min{0};
4699 int64_t chunk_max{0};
4700 bool is_rowid{
false};
4701 size_t start_rowid{0};
4705 if (cd->isVirtualCol) {
4706 CHECK(cd->columnName ==
"rowid");
4708 start_rowid = table_generation.start_rowid;
4709 chunk_min = frag_offsets[frag_idx] + start_rowid;
4710 chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4714 const auto& chunk_type = lhs_col->get_type_info();
4720 if (chunk_min > chunk_max) {
4724 if (lhs->get_type_info().is_timestamp() &&
4725 (lhs_col->get_type_info().get_dimension() !=
4726 rhs_const->get_type_info().get_dimension()) &&
4727 (lhs_col->get_type_info().is_high_precision_timestamp() ||
4728 rhs_const->get_type_info().is_high_precision_timestamp())) {
4737 std::tie(is_valid, chunk_min, chunk_max) =
4739 chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4741 VLOG(4) <<
"Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4745 <<
"\nLHS col precision is: "
4747 <<
"\nRHS precision is: "
4748 <<
std::to_string(rhs_const->get_type_info().get_dimension()) <<
".";
4752 if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4757 chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4759 chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4761 llvm::LLVMContext local_context;
4762 CgenState local_cgen_state(local_context);
4765 const auto rhs_val =
4768 switch (comp_expr->get_optype()) {
4770 if (chunk_max < rhs_val) {
4775 if (chunk_max <= rhs_val) {
4780 if (chunk_min > rhs_val) {
4785 if (chunk_min >= rhs_val) {
4790 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4792 }
else if (is_rowid) {
4793 return {
false, rhs_val - start_rowid};
4831 const std::vector<uint64_t>& frag_offsets,
4832 const size_t frag_idx) {
4833 std::pair<bool, int64_t> skip_frag{
false, -1};
4834 for (
auto& inner_join : ra_exe_unit.
join_quals) {
4841 std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4842 for (
auto& qual : inner_join.quals) {
4844 inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4845 temp_qual.simple_quals.begin(),
4846 temp_qual.simple_quals.end());
4849 table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4850 if (temp_skip_frag.second != -1) {
4851 skip_frag.second = temp_skip_frag.second;
4854 skip_frag.first = skip_frag.first || temp_skip_frag.first;
4861 const std::unordered_set<PhysicalInput>& phys_inputs) {
4863 std::unordered_set<shared::TableKey> phys_table_keys;
4864 for (
const auto& phys_input : phys_inputs) {
4865 phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4867 std::vector<InputTableInfo> query_infos;
4868 for (
const auto& table_key : phys_table_keys) {
4871 for (
const auto& phys_input : phys_inputs) {
4872 auto db_id = phys_input.db_id;
4873 auto table_id = phys_input.table_id;
4874 auto column_id = phys_input.col_id;
4879 const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4882 agg_col_range_cache.
setColRange(phys_input, col_range);
4885 return agg_col_range_cache;
4889 const std::unordered_set<PhysicalInput>& phys_inputs) {
4895 for (
const auto& phys_input : phys_inputs) {
4896 const auto catalog =
4899 const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4901 const auto& col_ti =
4902 cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4903 if (col_ti.is_string() && col_ti.get_compression() ==
kENCODING_DICT) {
4904 const auto& dict_key = col_ti.getStringDictKey();
4905 const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4906 CHECK(dd && dd->stringDict);
4908 dd->stringDict->storageEntryCount());
4911 return string_dictionary_generations;
4915 const std::unordered_set<shared::TableKey>& phys_table_keys) {
4917 for (
const auto& table_key : phys_table_keys) {
4921 TableGeneration{
static_cast<int64_t
>(table_info.getPhysicalNumTuples()), 0});
4923 return table_generations;
4927 const std::unordered_set<shared::TableKey>& phys_table_ids) {
4962 return !candidate_query_session.empty() &&
4974 ->second.getQueryStatus();
4976 return QuerySessionStatus::QueryStatus::UNDEFINED;
4986 const std::string& query_str,
4987 const std::string& query_submitted_time) {
4988 if (!query_session_id.empty()) {
4992 query_session_id, query_submitted_time,
executor_id_, write_lock);
4994 query_submitted_time,
4995 QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
4998 return {query_session_id, query_str};
5005 if (query_session.empty()) {
5012 VLOG(1) <<
"Interrupting pending query is not available since the query session is "
5017 <<
"Interrupting pending query is not available since its interrupt flag is "
5028 const std::string& submitted_time_str) {
5031 if (query_session.empty()) {
5043 const std::string& submitted_time_str,
5047 if (query_session.empty()) {
5050 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5054 query_session, submitted_time_str, new_query_status, session_write_lock);
5059 const std::string& query_str,
5060 const std::string& submitted_time_str,
5061 const size_t executor_id,
5065 if (query_session.empty()) {
5073 query_session_status,
5074 session_write_lock);
5076 if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5088 const std::string& query_str,
5089 const std::string& submitted_time_str,
5090 const size_t executor_id,
5098 .emplace(submitted_time_str,
5106 .emplace(submitted_time_str,
5114 std::map<std::string, QuerySessionStatus> executor_per_query_map;
5115 executor_per_query_map.emplace(
5118 query_session, executor_id, query_str, submitted_time_str, query_status));
5126 const std::string& submitted_time_str,
5130 if (query_session.empty()) {
5135 auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
5137 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5138 auto prev_status = query_status.second.getQueryStatus();
5139 if (prev_status == updated_query_status) {
5142 query_status.second.setQueryStatus(updated_query_status);
5152 const std::string& submitted_time_str,
5153 const size_t executor_id,
5156 if (query_session.empty()) {
5161 for (
auto it = storage.begin(); it != storage.end(); it++) {
5162 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5164 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5166 .at(submitted_time_str)
5167 .setExecutorId(executor_id);
5177 const std::string& submitted_time_str,
5179 if (query_session.empty()) {
5184 if (storage.size() > 1) {
5186 for (
auto it = storage.begin(); it != storage.end(); it++) {
5187 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5190 submitted_time_str.compare(target_submitted_t_str) == 0) {
5195 }
else if (storage.size() == 1) {
5212 if (query_session.empty()) {
5223 if (query_session.empty()) {
5234 if (query_session.empty()) {
5241 const double runtime_query_check_freq,
5242 const unsigned pending_query_check_freq)
const {
5256 const size_t cache_value) {
5260 VLOG(1) <<
"Put estimated cardinality to the cache";
5269 VLOG(1) <<
"Reuse cached cardinality";
5286 if (it->first.containsTableKey(table_key)) {
5300 std::vector<QuerySessionStatus> ret;
5301 for (
auto& info : query_infos) {
5302 ret.emplace_back(query_session,
5303 info.second.getExecutorId(),
5304 info.second.getQueryStr(),
5305 info.second.getQuerySubmittedTime(),
5306 info.second.getQueryStatus());
5315 std::vector<size_t>
res;
5319 for (
auto& kv : it->second) {
5320 if (kv.second.getQueryStatus() ==
5321 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5322 res.push_back(kv.second.getExecutorId());
5354 const size_t num_cpu_slots,
5355 const size_t num_gpu_slots,
5356 const size_t cpu_result_mem,
5357 const size_t cpu_buffer_pool_mem,
5358 const size_t gpu_buffer_pool_mem,
5359 const double per_query_max_cpu_slots_ratio,
5360 const double per_query_max_cpu_result_mem_ratio,
5361 const bool allow_cpu_kernel_concurrency,
5362 const bool allow_cpu_gpu_kernel_concurrency,
5363 const bool allow_cpu_slot_oversubscription_concurrency,
5364 const bool allow_cpu_result_mem_oversubscription_concurrency,
5365 const double max_available_resource_use_ratio) {
5366 const double per_query_max_pinned_cpu_buffer_pool_mem_ratio{1.0};
5367 const double per_query_max_pageable_cpu_buffer_pool_mem_ratio{0.5};
5372 cpu_buffer_pool_mem,
5373 gpu_buffer_pool_mem,
5374 per_query_max_cpu_slots_ratio,
5375 per_query_max_cpu_result_mem_ratio,
5376 per_query_max_pinned_cpu_buffer_pool_mem_ratio,
5377 per_query_max_pageable_cpu_buffer_pool_mem_ratio,
5378 allow_cpu_kernel_concurrency,
5379 allow_cpu_gpu_kernel_concurrency,
5380 allow_cpu_slot_oversubscription_concurrency,
5382 allow_cpu_result_mem_oversubscription_concurrency,
5383 max_available_resource_use_ratio);
5388 throw std::runtime_error(
5389 "Executor queue cannot be paused as it requires Executor Resource Manager to be "
5397 throw std::runtime_error(
5398 "Executor queue cannot be resumed as it requires Executor Resource Manager to be "
5407 throw std::runtime_error(
5408 "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5416 throw std::runtime_error(
5417 "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5424 const size_t resource_quantity) {
5426 throw std::runtime_error(
5427 "ExecutorResourceMgr must be enabled to set executor resource pool resource.");
5436 throw std::runtime_error(
5437 "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5445 concurrent_resource_grant_policy) {
5447 throw std::runtime_error(
5448 "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5452 concurrent_resource_grant_policy);
5474 std::shared_ptr<ExecutorResourceMgr_Namespace::ExecutorResourceMgr>
5487 std::stringstream ss;
5488 ss <<
"colRangeCache: ";
5490 ss <<
"{" << phys_input.col_id <<
", " << phys_input.table_id
5491 <<
"} = " << exp_range.toString() <<
", ";
5493 ss <<
"stringDictGenerations: ";
5494 for (
auto& [key, val] :
row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
5495 ss << key <<
" = " << val <<
", ";
5497 ss <<
"tableGenerations: ";
5499 ss << key <<
" = {" << val.tuple_count <<
", " << val.start_rowid <<
"}, ";
void logSystemGPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
A container to store requested and minimum neccessary resource requests across all resource types cur...
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
size_t getSlotCount() const
constexpr size_t kArenaBlockOverhead
const QueryPlanDAG getLatestQueryPlanDagExtracted() const
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
size_t getBufferSizeBytes(const RelAlgExecutionUnit &ra_exe_unit, const unsigned thread_count, const ExecutorDeviceType device_type) const
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
std::vector< int > ChunkKey
double g_running_query_interrupt_freq
robin_hood::unordered_set< int64_t > CountDistinctSet
std::string get_cuda_libdevice_dir(void)
void reduce(SpeculativeTopNMap &that)
static heavyai::shared_mutex execute_mutex_
static QuerySessionMap queries_session_map_
CudaMgr_Namespace::CudaMgr * cudaMgr() const
void log_system_memory_info_impl(std::string const &mem_log, size_t executor_id, size_t log_time_ms, std::string const &log_tag, size_t const thread_idx)
unsigned ceil_div(unsigned const dividend, unsigned const divisor)
bool checkIsQuerySessionInterrupted(const std::string &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
int64_t kernel_queue_time_ms_
size_t maxGpuSlabSize() const
HOST DEVICE int get_size() const
size_t getEntryCount() const
bool useCudaBuffers() const
Data_Namespace::DataMgr * data_mgr_
size_t getKeyCount() const
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< shared::TableKey, const TableFragments * > &selected_tables_fragments, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
int32_t getErrorCode() const
ExecutorDeviceType getDeviceType() const
int64_t compilation_queue_time_ms_
size_t g_cpu_sub_task_size
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, std::vector< TargetInfo > const &targets)
block_size_x_(block_size_x)
static void initialize_extension_module_sources()
void checkPendingQueryStatus(const QuerySessionId &query_session)
const StringDictionaryProxy::IdMap * getJoinIntersectionStringProxyTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &source_string_op_infos, const std::vector< StringOps_Namespace::StringOpInfo > &dest_source_string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner) const
static const int32_t ERR_INTERRUPTED
class for a per-database catalog. also includes metadata for the current database and the current use...
std::vector< int8_t * > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
std::unordered_map< shared::TableKey, const ColumnDescriptor * > DeletedColumnsMap
bool g_allow_memory_status_log
void setEntryCount(const size_t val)
input_table_info_cache_(this)
grid_size_x_(grid_size_x)
void set_mod_range(std::vector< int8_t const * > &frag_col_buffers, int8_t const *const ptr, size_t const local_col_id, size_t const N)
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const ExecutorDeviceType device_type, const int device_count)
const std::vector< uint64_t > & getFragOffsets()
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void >> checked_int64_t
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
std::atomic< bool > interrupted_
void setGeneration(const shared::StringDictKey &dict_key, const uint64_t generation)
std::map< shared::TableKey, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
static ResultSetRecyclerHolder resultset_recycler_holder_
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::string get_root_abs_path()
std::string toString() const
QueryPlanHash query_plan_dag_hash
static const int max_gpu_count
GpuSharedMemoryContext gpu_smem_context
OutVecOwner(const std::vector< int64_t * > &out_vec)
static ExecutorResourceMgr_Namespace::ResourcePoolInfo get_executor_resource_pool_info()
bool with_dynamic_watchdog
const std::optional< bool > union_all
unsigned g_pending_query_interrupt_freq
int64_t float_to_double_bin(int32_t val, bool nullable=false)
const table_functions::TableFunction table_func
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus >> QuerySessionMap
std::vector< size_t > outer_fragment_indices
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
SystemMemoryUsage getSystemMemoryUsage() const
HOST DEVICE int get_scale() const
Cache for physical column ranges. Set by the aggregator on the leaves.
std::pair< QuerySessionId, std::string > CurrentQueryStatus
size_t getDeviceBasedWatchdogScanLimit(size_t watchdog_max_projected_rows_per_device, const ExecutorDeviceType device_type, const int device_count)
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs)
static std::shared_ptr< ExecutorResourceMgr_Namespace::ExecutorResourceMgr > executor_resource_mgr_
size_t getSharedMemorySize() const
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
void updateQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
void clearMemory(const MemoryLevel memLevel)
std::unordered_set< int > get_available_gpus(const Data_Namespace::DataMgr *data_mgr)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t start_rowid, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
std::vector< InputDescriptor > input_descs
bool hasLazyFetchColumns(const std::vector< Analyzer::Expr * > &target_exprs) const
size_t g_preflight_count_query_threshold
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
void launchKernelsImpl(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type, const size_t requested_num_threads)
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
DEVICE void sort(ARGS &&...args)
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_string(const std::string &udf_ir_string, llvm::LLVMContext &ctx, bool is_gpu=false)
bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter *fragmenter)
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
ExpressionRange getColRange(const PhysicalInput &) const
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
ResourceType
Stores the resource type for a ExecutorResourcePool request.
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type) const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
TypeR::rep timer_stop(Type clock_begin)
Functions to support geospatial operations used by the executor.
const StringDictionaryProxy::IdMap * getStringProxyTranslationMap(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, const RowSetMemoryOwner::StringTranslationType translation_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
QuerySessionId current_query_session_
ResultSetRecyclerHolder & getResultSetRecyclerHolder()
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
heavyai::shared_mutex & getSessionLock()
static const int32_t ERR_GEOS
ExecutorResourceMgr_Namespace::ChunkRequestInfo getChunkRequestInfo(const ExecutorDeviceType device_type, const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos, const std::vector< std::pair< int32_t, FragmentsList >> &device_fragment_lists) const
Determines a unique list of chunks and their associated byte sizes for a given query plan...
AggregatedColRange agg_col_range_cache_
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
std::vector< FragmentInfo > fragments
std::unique_ptr< CgenState > cgen_state_
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr * > &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
std::string toString(const QueryDescriptionType &type)
bool g_enable_dynamic_watchdog
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
static void init_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_cpu_result_mem_oversubscription, const double max_available_resource_use_ratio)
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
Specifies the policies for resource grants in the presence of other requests, both under situations o...
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
static uint32_t gpu_active_modules_device_mask_
HOST DEVICE SQLTypes get_type() const
FragmentSkipStatus canSkipFragmentForFpQual(const Analyzer::BinOper *comp_expr, const Analyzer::ColumnVar *lhs_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Analyzer::Constant *rhs_const) const
static void invalidateCaches()
int deviceCount(const ExecutorDeviceType) const
quantile::TDigest * nullTDigest(double const q)
bool isSharedMemoryUsed() const
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
void reset(bool discard_runtime_modules_only=false)
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
static std::mutex kernel_mutex_
unsigned numBlocksPerMP() const
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key, const bool with_generation) const
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
Container for compilation results and assorted options for a single execution unit.
bool checkCurrentQuerySession(const std::string &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
void addTransientStringLiterals(const RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< RowSetMemoryOwner > &row_set_mem_owner)
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
std::vector< FragmentsPerTable > FragmentsList
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
RUNTIME_EXPORT void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t literalBytes(const CgenState::LiteralValue &lit)
bool filter_on_deleted_column
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
static void clearCardinalityCache()
bool checkNonKernelTimeInterrupted() const
size_t getRowSize() const
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
int8_t * getUnderlyingBuffer() const
bool g_inner_join_fragment_skipping
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
std::vector< Analyzer::Expr * > target_exprs_union
static void resume_executor_queue()
CardinalityCacheKey(const RelAlgExecutionUnit &ra_exe_unit)
bool g_enable_string_functions
std::unordered_map< shared::TableKey, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
std::shared_lock< T > shared_lock
std::unique_ptr< QueryMemoryInitializer > query_buffers_
size_t g_watchdog_none_encoded_string_translation_limit
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments)
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
bool g_enable_executor_resource_mgr
SQLOps get_optype() const
This file contains the class specification and related data structures for Catalog.
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
const ExecutorId executor_id_
Data_Namespace::DataMgr & getDataMgr() const
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
RUNTIME_EXPORT void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::map< QuerySessionId, bool > InterruptFlagMap
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const size_t max_gpu_slab_size_
TargetInfo operator()(Analyzer::Expr const *const target_expr) const
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
int8_t groupColWidth(const size_t key_idx) const
bool containsPreFlightFn() const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static const int32_t ERR_DIV_BY_ZERO
void setGeneration(const shared::TableKey &table_key, const TableGeneration &generation)
std::vector< std::pair< std::vector< size_t >, size_t > > per_device_cardinality
static SysCatalog & instance()
max_gpu_slab_size_(max_gpu_slab_size)
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
static void set_executor_resource_pool_resource(const ExecutorResourceMgr_Namespace::ResourceType resource_type, const size_t resource_quantity)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Classes representing a parse tree.
void logSystemCPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
int getDeviceCount() const
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
int64_t deviceCycles(int milliseconds) const
ExecutorType executor_type
void init(LogOptions const &log_opts)
std::mutex str_dict_mutex_
#define INJECT_TIMER(DESC)
static size_t addAligned(const size_t off_in, const size_t alignment)
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Fragmenter_Namespace::TableInfo getTableInfo(const shared::TableKey &table_key) const
static const int32_t ERR_OUT_OF_RENDER_MEM
const JoinQualsPerNestingLevel join_quals
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
std::vector< std::string > expr_container_to_string(const T &expr_container)
static void set_concurrent_resource_grant_policy(const ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
std::unique_ptr< llvm::Module > read_llvm_module_from_ir_file(const std::string &udf_ir_filename, llvm::LLVMContext &ctx, bool is_gpu=false)
bool threadsCanReuseGroupByBuffers() const
void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
static QueryPlanDAG latest_query_plan_extracted_
void addToCardinalityCache(const CardinalityCacheKey &cache_key, const size_t cache_value)
friend class QueryMemoryDescriptor
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr *results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const shared::TableKey &outer_table_key, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info, const bool optimize_cuda_block_and_grid_sizes, const int64_t rows_to_process=-1)
std::optional< size_t > limit
size_t getNumCurentSessionsEnrolled() const
TableIdToNodeMap table_id_to_node_map
bool estimate_output_cardinality
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
std::list< Analyzer::OrderEntry > order_entries
CachedCardinality getCachedCardinality(const CardinalityCacheKey &cache_key)
size_t g_watchdog_max_projected_rows_per_device
static InterruptFlagMap queries_interrupt_flag_
std::unique_lock< T > unique_lock
std::unique_ptr< PlanState > plan_state_
RUNTIME_EXPORT void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
static const int32_t ERR_OUT_OF_TIME
std::vector< int64_t * > out_vec_
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd, const shared::TableKey &table_key)
static const ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ExecutorResourceMgr_Namespace::ResourceType resource_type)
Checked json field retrieval.
DEVICE auto accumulate(ARGS &&...args)
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
size_t get_selected_input_descs_index(const shared::TableKey &table_key, std::vector< InputDescriptor > const &input_descs)
std::pair< bool, size_t > CachedCardinality
size_t watchdog_max_projected_rows_per_device
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< shared::TableKey > &phys_table_keys)
static void invalidateCardinalityCacheForTable(const shared::TableKey &table_key)
bool is_dict_encoded_type() const
bool is_distinct_target(const TargetInfo &target_info)
size_t g_approx_quantile_buffer
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
bool containsTableKey(const shared::TableKey &table_key) const
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
specifies the content in-memory of a row in the column metadata table
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
const ChunkMetadataMap & getChunkMetadataMap() const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
QuerySessionStatus::QueryStatus getQuerySessionStatus(const QuerySessionId &candidate_query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
size_t get_selected_input_col_descs_index(const shared::TableKey &table_key, std::list< std::shared_ptr< InputColDescriptor const >> const &input_col_descs)
static const int32_t ERR_OUT_OF_GPU_MEM
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static const size_t auto_num_threads
const TemporaryTables * getTemporaryTables()
int32_t getOrAddTransient(const std::string &)
RUNTIME_EXPORT void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
const SQLTypeInfo & get_type_info() const
size_t get_col_byte_width(const shared::ColumnKey &column_key)
QueryDescriptionType getQueryDescriptionType() const
void freeTemporaryCpuLinearizedIdxBuf()
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
ExecutorDeviceType device_type
void launchKernelsLocked(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type)
virtual ReductionCode codegen() const
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
static std::unordered_map< CardinalityCacheKey, size_t > cardinality_cache_
std::string dumpCache() const
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
const std::vector< size_t > getExecutorIdsRunningQuery(const QuerySessionId &interrupt_session) const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
void registerExtractedQueryPlanDag(const QueryPlanDAG &query_plan_dag)
#define REGULAR_DICT(TRANSIENTID)
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
ReductionCode get_reduction_code(const size_t executor_id, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
QuerySessionId & getCurrentQuerySession(heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
const shared::ColumnKey & getColumnKey() const
void setGridSize(unsigned grid_size)
static heavyai::shared_mutex recycler_mutex_
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
InputTableInfoCache input_table_info_cache_
size_t getNumBytesForFetchedRow(const std::set< shared::TableKey > &table_keys_to_fetch) const
bool g_enable_bbox_intersect_hashjoin
void setBlockSize(unsigned block_size)
Speculative top N algorithm.
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::chrono::steady_clock::time_point lock_queue_clock_
bool isHintRegistered(const QueryHint hint) const
Datum get_constval() const
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
std::unordered_map< size_t, SQLTypeInfo > target_exprs_original_type_infos
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const TableGeneration & getGeneration(const shared::TableKey &table_key) const
unsigned gridSize() const
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::unordered_map< int, CgenState::LiteralValues > literal_values
HOST DEVICE int get_dimension() const
TableGenerations computeTableGenerations(const std::unordered_set< shared::TableKey > &phys_table_keys)
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
static std::map< ExtModuleKinds, std::string > extension_module_sources
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
static const int32_t ERR_WIDTH_BUCKET_INVALID_ARGUMENT
Data_Namespace::DataMgr * getDataMgr() const
bool needLinearizeAllFragments(const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel) const
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
static heavyai::shared_mutex executor_session_mutex_
RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t DateTruncateHighPrecisionToDate(const int64_t timeval, const int64_t scale)
bool hasTableFunctionSpecifiedParameter() const
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
std::vector< std::vector< const int8_t * > > col_buffers
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
TableGenerations table_generations_
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
std::string getIR() const
const std::vector< InputTableInfo > & getQueryInfos() const
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
RUNTIME_EXPORT void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
const std::unordered_map< PhysicalInput, ExpressionRange > & asMap() const
std::string QuerySessionId
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
const std::unordered_map< shared::TableKey, TableGeneration > & asMap() const
RegisteredQueryHint query_hint
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > getUniqueThreadSharedResultSets(const std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device) const
constexpr char const * EMPTY_QUERY_PLAN
QueryPlanDagCache & getQueryPlanDagCache()
#define DEBUG_TIMER(name)
RUNTIME_EXPORT void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
std::map< shared::ColumnKey, size_t > getColumnByteWidthMap(const std::set< shared::TableKey > &table_ids_to_fetch, const bool include_lazy_fetched_cols) const
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
static const int32_t ERR_OUT_OF_SLOTS
uint64_t exp_to_scale(const unsigned exp)
double gpu_input_mem_limit_percent
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool g_enable_cpu_sub_tasks
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
CgenStateManager(Executor &executor)
std::mutex compilation_mutex_
RUNTIME_EXPORT ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
heavyai::shared_mutex & getDataRecyclerLock()
std::vector< std::vector< int64_t > > num_rows
static void pause_executor_queue()
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc)
size_t g_approx_quantile_centroids
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
static std::shared_ptr< QueryEngine > getInstance()
void setColRange(const PhysicalInput &, const ExpressionRange &)
std::shared_ptr< CompilationContext > compile(const TableFunctionExecutionUnit &exe_unit, bool emit_only_preflight_fn)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
const Expr * get_left_operand() const
static bool typeSupportsRange(const SQLTypeInfo &ti)
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
void invalidateRunningQuerySession(heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
std::shared_ptr< const query_state::QueryState > query_state
void setNumAllocatedThreads(size_t num_threads)
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
ExpressionRange getColRange(const PhysicalInput &) const
CurrentQueryStatus attachExecutorToQuerySession(const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< shared::TableKey, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
std::vector< Analyzer::Expr * > target_exprs
void launchKernelsViaResourceMgr(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels, const ExecutorDeviceType device_type, const std::vector< InputDescriptor > &input_descs, const QueryMemoryDescriptor &query_mem_desc)
Launches a vector of kernels for a given query step, gated/scheduled by ExecutorResourceMgr.
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
static constexpr ExecutorId UNITARY_EXECUTOR_ID
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
const TableGeneration & getTableGeneration(const shared::TableKey &table_key) const
const std::vector< DeviceProperties > & getAllDeviceProperties() const
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
RUNTIME_EXPORT void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
unsigned blockSize() const
std::vector< std::vector< uint64_t > > frag_offsets
std::shared_timed_mutex shared_mutex
static std::mutex register_runtime_extension_functions_mutex_
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
HOST DEVICE bool get_notnull() const
ExecutorId getExecutorId() const
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
static heavyai::shared_mutex executors_cache_mutex_
bool operator==(const CardinalityCacheKey &other) const
bool allow_runtime_query_interrupt
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::string &submitted_time_str)
RUNTIME_EXPORT void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
static size_t get_executor_resource_pool_total_resource_quantity(const ExecutorResourceMgr_Namespace::ResourceType resource_type)
temporary_tables_(nullptr)
std::list< std::shared_ptr< const InputColDescriptor > > get_selected_input_col_descs(const shared::TableKey &table_key, std::list< std::shared_ptr< InputColDescriptor const >> const &input_col_descs)
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, heavyai::shared_lock< heavyai::shared_mutex > &read_lock)
Descriptor for the fragments required for an execution kernel.
size_t getColOffInBytes(const size_t col_idx) const
static size_t getArenaBlockSize()
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
const StringDictionaryProxy::TranslationMap< Datum > * getStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query's parse tree etc.
int32_t getIdOfString(const std::string &str) const
bool g_enable_runtime_query_interrupt
ThreadLocalIds thread_local_ids()
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, heavyai::unique_lock< heavyai::shared_mutex > &write_lock)
static QueryPlanDagCache query_plan_dag_cache_
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
static constexpr int32_t literalsDictId
const Executor * getExecutor() const
static std::mutex gpu_active_modules_mutex_
std::unique_ptr< llvm::Module > read_llvm_module_from_bc_file(const std::string &udf_ir_filename, llvm::LLVMContext &ctx)
std::string get_table_name(int32_t db_id, int32_t table_id)
void clearMetaInfoCache()
const TemporaryTables * temporary_tables_
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map, bool optimize_cuda_block_and_grid_sizes)
HashTableBuildDagMap hash_table_build_plan_dag
void update_extension_modules(bool update_runtime_modules_only=false)
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< shared::TableKey, const TableFragments * > &, const FragmentsList &selected_fragments, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
StringDictionaryProxy * getOrAddStringDictProxy(const shared::StringDictKey &dict_key, const bool with_generation)
size_t get_loop_join_size(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
bool isFragmentFullyDeleted(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &fragment)
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const