60 #include <llvm/Transforms/Utils/BasicBlockUtils.h> 61 #include <boost/filesystem/operations.hpp> 62 #include <boost/filesystem/path.hpp> 138 const size_t block_size_x,
139 const size_t grid_size_x,
140 const size_t max_gpu_slab_size,
141 const std::string& debug_dir,
142 const std::string& debug_file)
158 const std::string& debug_dir,
159 const std::string& debug_file,
168 auto executor = std::make_shared<Executor>(executor_id,
174 CHECK(
executors_.insert(std::make_pair(executor_id, executor)).second);
179 switch (memory_level) {
182 mapd_unique_lock<mapd_shared_mutex> flush_lock(
196 throw std::runtime_error(
197 "Clearing memory levels other than the CPU level or GPU level is not " 208 const int dict_id_in,
209 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
210 const bool with_generation)
const {
211 CHECK(row_set_mem_owner);
212 std::lock_guard<std::mutex> lock(
214 return row_set_mem_owner->getOrAddStringDictProxy(
215 dict_id_in, with_generation,
catalog_);
219 const int dict_id_in,
220 const bool with_generation,
222 const int dict_id{dict_id_in < 0 ?
REGULAR_DICT(dict_id_in) : dict_id_in};
226 CHECK(dd->stringDict);
228 const int64_t generation =
229 with_generation ? string_dictionary_generations_.getGeneration(dict_id) : -1;
230 return addStringDict(dd->stringDict, dict_id, generation);
233 if (!lit_str_dict_proxy_) {
234 std::shared_ptr<StringDictionary> tsd =
238 return lit_str_dict_proxy_.get();
242 std::lock_guard<std::mutex> lock(state_mutex_);
243 auto& td = t_digests_.emplace_back(std::make_unique<quantile::TDigest>());
267 if (!cd || n > cd->columnType.get_physical_cols()) {
303 size_t num_bytes = 0;
307 for (
const auto& fetched_col_pair :
plan_state_->columns_to_fetch_) {
308 if (table_ids_to_fetch.count(fetched_col_pair.first) == 0) {
312 if (fetched_col_pair.first < 0) {
333 const std::vector<Analyzer::Expr*>& target_exprs)
const {
336 std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
337 for (
const auto target_expr : target_exprs) {
338 if (!
plan_state_->isLazyFetchColumn(target_expr)) {
339 col_lazy_fetch_info.emplace_back(
344 auto col_id = col_var->get_column_id();
345 auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
346 auto cd = (col_var->get_table_id() > 0)
349 if (cd &&
IS_GEO(cd->columnType.get_type())) {
356 CHECK(!cd0->isVirtualCol);
357 auto col0_var = makeExpr<Analyzer::ColumnVar>(
358 col0_ti, col_var->get_table_id(), cd0->columnId, rte_idx);
359 auto local_col0_id =
plan_state_->getLocalColumnId(col0_var.get(),
false);
360 col_lazy_fetch_info.emplace_back(
364 auto local_col_id =
plan_state_->getLocalColumnId(col_var,
false);
365 const auto& col_ti = col_var->get_type_info();
370 return col_lazy_fetch_info;
380 const std::unordered_map<int, CgenState::LiteralValues>& literals,
381 const int device_id) {
382 if (literals.empty()) {
385 const auto dev_literals_it = literals.find(device_id);
386 CHECK(dev_literals_it != literals.end());
387 const auto& dev_literals = dev_literals_it->second;
388 size_t lit_buf_size{0};
389 std::vector<std::string> real_strings;
390 std::vector<std::vector<double>> double_array_literals;
391 std::vector<std::vector<int8_t>> align64_int8_array_literals;
392 std::vector<std::vector<int32_t>> int32_array_literals;
393 std::vector<std::vector<int8_t>> align32_int8_array_literals;
394 std::vector<std::vector<int8_t>> int8_array_literals;
395 for (
const auto& lit : dev_literals) {
397 if (lit.which() == 7) {
398 const auto p = boost::get<std::string>(&lit);
400 real_strings.push_back(*p);
401 }
else if (lit.which() == 8) {
402 const auto p = boost::get<std::vector<double>>(&lit);
404 double_array_literals.push_back(*p);
405 }
else if (lit.which() == 9) {
406 const auto p = boost::get<std::vector<int32_t>>(&lit);
408 int32_array_literals.push_back(*p);
409 }
else if (lit.which() == 10) {
410 const auto p = boost::get<std::vector<int8_t>>(&lit);
412 int8_array_literals.push_back(*p);
413 }
else if (lit.which() == 11) {
414 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
416 if (p->second == 64) {
417 align64_int8_array_literals.push_back(p->first);
418 }
else if (p->second == 32) {
419 align32_int8_array_literals.push_back(p->first);
425 if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int16_t>::max())) {
428 int16_t crt_real_str_off = lit_buf_size;
429 for (
const auto& real_str : real_strings) {
430 CHECK_LE(real_str.size(),
static_cast<size_t>(std::numeric_limits<int16_t>::max()));
431 lit_buf_size += real_str.size();
433 if (double_array_literals.size() > 0) {
434 lit_buf_size =
align(lit_buf_size,
sizeof(
double));
436 int16_t crt_double_arr_lit_off = lit_buf_size;
437 for (
const auto& double_array_literal : double_array_literals) {
438 CHECK_LE(double_array_literal.size(),
439 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
440 lit_buf_size += double_array_literal.size() *
sizeof(double);
442 if (align64_int8_array_literals.size() > 0) {
443 lit_buf_size =
align(lit_buf_size,
sizeof(uint64_t));
445 int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
446 for (
const auto& align64_int8_array_literal : align64_int8_array_literals) {
447 CHECK_LE(align64_int8_array_literals.size(),
448 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
449 lit_buf_size += align64_int8_array_literal.size();
451 if (int32_array_literals.size() > 0) {
452 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
454 int16_t crt_int32_arr_lit_off = lit_buf_size;
455 for (
const auto& int32_array_literal : int32_array_literals) {
456 CHECK_LE(int32_array_literal.size(),
457 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
458 lit_buf_size += int32_array_literal.size() *
sizeof(int32_t);
460 if (align32_int8_array_literals.size() > 0) {
461 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
463 int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
464 for (
const auto& align32_int8_array_literal : align32_int8_array_literals) {
465 CHECK_LE(align32_int8_array_literals.size(),
466 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
467 lit_buf_size += align32_int8_array_literal.size();
469 int16_t crt_int8_arr_lit_off = lit_buf_size;
470 for (
const auto& int8_array_literal : int8_array_literals) {
472 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
473 lit_buf_size += int8_array_literal.size();
475 unsigned crt_real_str_idx = 0;
476 unsigned crt_double_arr_lit_idx = 0;
477 unsigned crt_align64_int8_arr_lit_idx = 0;
478 unsigned crt_int32_arr_lit_idx = 0;
479 unsigned crt_align32_int8_arr_lit_idx = 0;
480 unsigned crt_int8_arr_lit_idx = 0;
481 std::vector<int8_t> serialized(lit_buf_size);
483 for (
const auto& lit : dev_literals) {
486 switch (lit.which()) {
488 const auto p = boost::get<int8_t>(&lit);
490 serialized[off - lit_bytes] = *p;
494 const auto p = boost::get<int16_t>(&lit);
496 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
500 const auto p = boost::get<int32_t>(&lit);
502 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
506 const auto p = boost::get<int64_t>(&lit);
508 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
512 const auto p = boost::get<float>(&lit);
514 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
518 const auto p = boost::get<double>(&lit);
520 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
524 const auto p = boost::get<std::pair<std::string, int>>(&lit);
532 memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
536 const auto p = boost::get<std::string>(&lit);
538 int32_t off_and_len = crt_real_str_off << 16;
539 const auto& crt_real_str = real_strings[crt_real_str_idx];
540 off_and_len |=
static_cast<int16_t
>(crt_real_str.size());
541 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
542 memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
544 crt_real_str_off += crt_real_str.size();
548 const auto p = boost::get<std::vector<double>>(&lit);
550 int32_t off_and_len = crt_double_arr_lit_off << 16;
551 const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
552 int32_t len = crt_double_arr_lit.size();
554 off_and_len |=
static_cast<int16_t
>(len);
555 int32_t double_array_bytesize = len *
sizeof(double);
556 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
557 memcpy(&serialized[crt_double_arr_lit_off],
558 crt_double_arr_lit.data(),
559 double_array_bytesize);
560 ++crt_double_arr_lit_idx;
561 crt_double_arr_lit_off += double_array_bytesize;
565 const auto p = boost::get<std::vector<int32_t>>(&lit);
567 int32_t off_and_len = crt_int32_arr_lit_off << 16;
568 const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
569 int32_t len = crt_int32_arr_lit.size();
571 off_and_len |=
static_cast<int16_t
>(len);
572 int32_t int32_array_bytesize = len *
sizeof(int32_t);
573 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
574 memcpy(&serialized[crt_int32_arr_lit_off],
575 crt_int32_arr_lit.data(),
576 int32_array_bytesize);
577 ++crt_int32_arr_lit_idx;
578 crt_int32_arr_lit_off += int32_array_bytesize;
582 const auto p = boost::get<std::vector<int8_t>>(&lit);
584 int32_t off_and_len = crt_int8_arr_lit_off << 16;
585 const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
586 int32_t len = crt_int8_arr_lit.size();
588 off_and_len |=
static_cast<int16_t
>(len);
589 int32_t int8_array_bytesize = len;
590 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
591 memcpy(&serialized[crt_int8_arr_lit_off],
592 crt_int8_arr_lit.data(),
593 int8_array_bytesize);
594 ++crt_int8_arr_lit_idx;
595 crt_int8_arr_lit_off += int8_array_bytesize;
599 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
601 if (p->second == 64) {
602 int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
603 const auto& crt_align64_int8_arr_lit =
604 align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
605 int32_t len = crt_align64_int8_arr_lit.size();
607 off_and_len |=
static_cast<int16_t
>(len);
608 int32_t align64_int8_array_bytesize = len;
609 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
610 memcpy(&serialized[crt_align64_int8_arr_lit_off],
611 crt_align64_int8_arr_lit.data(),
612 align64_int8_array_bytesize);
613 ++crt_align64_int8_arr_lit_idx;
614 crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
615 }
else if (p->second == 32) {
616 int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
617 const auto& crt_align32_int8_arr_lit =
618 align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
619 int32_t len = crt_align32_int8_arr_lit.size();
621 off_and_len |=
static_cast<int16_t
>(len);
622 int32_t align32_int8_array_bytesize = len;
623 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
624 memcpy(&serialized[crt_align32_int8_arr_lit_off],
625 crt_align32_int8_arr_lit.data(),
626 align32_int8_array_bytesize);
627 ++crt_align32_int8_arr_lit_idx;
628 crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
645 return cuda_mgr->getDeviceCount();
660 const int64_t agg_init_val,
661 const int8_t out_byte_width,
662 const int64_t* out_vec,
663 const size_t out_vec_sz,
664 const bool is_group_by,
665 const bool float_argument_input) {
669 if (0 != agg_init_val) {
671 int64_t agg_result = agg_init_val;
672 for (
size_t i = 0; i < out_vec_sz; ++i) {
675 return {agg_result, 0};
678 switch (out_byte_width) {
680 int agg_result =
static_cast<int32_t
>(agg_init_val);
681 for (
size_t i = 0; i < out_vec_sz; ++i) {
684 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
685 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
687 const int64_t converted_bin =
689 ?
static_cast<int64_t
>(agg_result)
691 return {converted_bin, 0};
695 int64_t agg_result = agg_init_val;
696 for (
size_t i = 0; i < out_vec_sz; ++i) {
699 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
700 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
702 return {agg_result, 0};
711 int64_t agg_result = 0;
712 for (
size_t i = 0; i < out_vec_sz; ++i) {
713 agg_result += out_vec[i];
715 return {agg_result, 0};
718 switch (out_byte_width) {
721 for (
size_t i = 0; i < out_vec_sz; ++i) {
722 r += *
reinterpret_cast<const float*
>(may_alias_ptr(&out_vec[i]));
724 const auto float_bin = *
reinterpret_cast<const int32_t*
>(may_alias_ptr(&r));
725 const int64_t converted_bin =
727 return {converted_bin, 0};
731 for (
size_t i = 0; i < out_vec_sz; ++i) {
732 r += *
reinterpret_cast<const double*
>(may_alias_ptr(&out_vec[i]));
734 return {*
reinterpret_cast<const int64_t*
>(may_alias_ptr(&r)), 0};
742 uint64_t agg_result = 0;
743 for (
size_t i = 0; i < out_vec_sz; ++i) {
744 const uint64_t out =
static_cast<uint64_t
>(out_vec[i]);
747 return {
static_cast<int64_t
>(agg_result), 0};
751 int64_t agg_result = agg_init_val;
752 for (
size_t i = 0; i < out_vec_sz; ++i) {
755 return {agg_result, 0};
757 switch (out_byte_width) {
759 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
760 for (
size_t i = 0; i < out_vec_sz; ++i) {
763 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
764 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
766 const int64_t converted_bin =
768 ?
static_cast<int64_t
>(agg_result)
770 return {converted_bin, 0};
773 int64_t agg_result = agg_init_val;
774 for (
size_t i = 0; i < out_vec_sz; ++i) {
777 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
778 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
780 return {agg_result, 0};
789 int64_t agg_result = agg_init_val;
790 for (
size_t i = 0; i < out_vec_sz; ++i) {
793 return {agg_result, 0};
795 switch (out_byte_width) {
797 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
798 for (
size_t i = 0; i < out_vec_sz; ++i) {
801 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
802 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
804 const int64_t converted_bin =
805 float_argument_input ?
static_cast<int64_t
>(agg_result)
807 return {converted_bin, 0};
810 int64_t agg_result = agg_init_val;
811 for (
size_t i = 0; i < out_vec_sz; ++i) {
814 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
815 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
817 return {agg_result, 0};
824 int64_t agg_result = agg_init_val;
825 for (
size_t i = 0; i < out_vec_sz; ++i) {
826 if (out_vec[i] != agg_init_val) {
827 if (agg_result == agg_init_val) {
828 agg_result = out_vec[i];
829 }
else if (out_vec[i] != agg_result) {
834 return {agg_result, 0};
837 int64_t agg_result = agg_init_val;
838 for (
size_t i = 0; i < out_vec_sz; ++i) {
839 if (out_vec[i] != agg_init_val) {
840 agg_result = out_vec[i];
844 return {agg_result, 0};
855 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device) {
856 auto& first = results_per_device.front().first;
858 for (
size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
859 const auto& next = results_per_device[dev_idx].first;
861 first->append(*next);
863 return std::move(first);
871 if (results_per_device.empty()) {
872 std::vector<TargetInfo> targets;
873 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
876 return std::make_shared<ResultSet>(targets,
884 using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
886 results_per_device.end(),
887 [](
const IndexedResultSet& lhs,
const IndexedResultSet& rhs) {
888 CHECK_GE(lhs.second.size(), size_t(1));
889 CHECK_GE(rhs.second.size(), size_t(1));
890 return lhs.second.front() < rhs.second.front();
898 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
899 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
906 if (results_per_device.empty()) {
907 std::vector<TargetInfo> targets;
908 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
911 return std::make_shared<ResultSet>(targets,
929 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
930 int64_t* compilation_queue_time) {
933 *compilation_queue_time =
timer_stop(clock_begin);
934 const auto& this_result_set = results_per_device[0].first;
936 this_result_set->getTargetInfos(),
937 this_result_set->getTargetInitVals());
938 return reduction_jit.
codegen();
944 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
945 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
948 std::shared_ptr<ResultSet> reduced_results;
950 const auto& first = results_per_device.front().first;
954 results_per_device.size() > 1) {
956 results_per_device.begin(),
957 results_per_device.end(),
959 [](
const size_t init,
const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
960 const auto& r = rs.first;
961 return init + r->getQueryMemDesc().getEntryCount();
963 CHECK(total_entry_count);
964 auto query_mem_desc = first->getQueryMemDesc();
966 reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
973 auto result_storage = reduced_results->allocateStorage(
plan_state_->init_agg_vals_);
974 reduced_results->initializeStorage();
975 switch (query_mem_desc.getEffectiveKeyWidth()) {
977 first->getStorage()->moveEntriesToBuffer<int32_t>(
978 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
981 first->getStorage()->moveEntriesToBuffer<int64_t>(
982 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
988 reduced_results = first;
991 int64_t compilation_queue_time = 0;
992 const auto reduction_code =
995 for (
size_t i = 1; i < results_per_device.size(); ++i) {
996 reduced_results->getStorage()->reduce(
997 *(results_per_device[i].first->getStorage()), {}, reduction_code);
999 reduced_results->addCompilationQueueTime(compilation_queue_time);
1000 return reduced_results;
1005 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1006 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1008 if (results_per_device.size() == 1) {
1009 return std::move(results_per_device.front().first);
1013 for (
const auto&
result : results_per_device) {
1014 auto rows =
result.first;
1022 std::max(
size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1027 return m.
asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc,
this, top_n, desc);
1031 std::unordered_set<int> available_gpus;
1035 for (
int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1036 available_gpus.insert(gpu_id);
1039 return available_gpus;
1043 const size_t cpu_count,
1044 const size_t gpu_count) {
1046 :
static_cast<size_t>(cpu_count);
1057 using checked_size_t = boost::multiprecision::number<
1058 boost::multiprecision::cpp_int_backend<64,
1060 boost::multiprecision::unsigned_magnitude,
1061 boost::multiprecision::checked,
1063 checked_size_t max_groups_buffer_entry_guess = 1;
1064 for (
const auto& query_info : query_infos) {
1065 CHECK(!query_info.info.fragments.empty());
1066 auto it = std::max_element(query_info.info.fragments.begin(),
1067 query_info.info.fragments.end(),
1068 [](
const FragmentInfo& f1,
const FragmentInfo& f2) {
1069 return f1.getNumTuples() < f2.getNumTuples();
1071 max_groups_buffer_entry_guess *= it->getNumTuples();
1075 constexpr
size_t max_groups_buffer_entry_guess_cap = 100000000;
1077 return std::min(static_cast<size_t>(max_groups_buffer_entry_guess),
1078 max_groups_buffer_entry_guess_cap);
1080 return max_groups_buffer_entry_guess_cap;
1090 return td->tableName;
1097 const int device_count) {
1105 const std::vector<InputTableInfo>& table_infos,
1108 const int device_count) {
1109 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1110 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1114 if (!ra_exe_unit.
scan_limit && table_infos.size() == 1 &&
1128 std::vector<std::string> table_names;
1129 const auto& input_descs = ra_exe_unit.
input_descs;
1130 for (
const auto& input_desc : input_descs) {
1135 "Projection query would require a scan without a limit on table(s): " +
1139 "Projection query output result set on table(s): " +
1142 " rows, which is more than the current system limit of " +
1158 const auto inner_table_id = ra_exe_unit.
input_descs.back().getTableId();
1160 std::optional<size_t> inner_table_idx;
1161 for (
size_t i = 0; i < query_infos.size(); ++i) {
1162 if (query_infos[i].table_id == inner_table_id) {
1163 inner_table_idx = i;
1167 CHECK(inner_table_idx);
1168 return query_infos[*inner_table_idx].info.getNumTuples() <=
1174 template <
typename T>
1176 std::vector<std::string> expr_strs;
1177 for (
const auto& expr : expr_container) {
1179 expr_strs.emplace_back(
"NULL");
1181 expr_strs.emplace_back(expr->toString());
1189 const std::list<Analyzer::OrderEntry>& expr_container) {
1190 std::vector<std::string> expr_strs;
1191 for (
const auto& expr : expr_container) {
1192 expr_strs.emplace_back(expr.toString());
1211 switch (algorithm) {
1215 return "Speculative Top N";
1217 return "Streaming Top N";
1228 std::ostringstream os;
1230 const auto& scan_desc = input_col_desc->getScanDesc();
1231 os << scan_desc.getTableId() <<
"," << input_col_desc->getColId() <<
"," 1232 << scan_desc.getNestLevel();
1237 os << qual->toString() <<
",";
1241 if (!ra_exe_unit.
quals.empty()) {
1242 for (
const auto& qual : ra_exe_unit.
quals) {
1244 os << qual->toString() <<
",";
1249 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1250 const auto& join_condition = ra_exe_unit.
join_quals[i];
1252 for (
const auto& qual : join_condition.quals) {
1254 os << qual->toString() <<
",";
1262 os << qual->toString() <<
",";
1268 os << expr->toString() <<
",";
1277 os <<
"\n\tTable/Col/Levels: ";
1279 const auto& scan_desc = input_col_desc->getScanDesc();
1280 os <<
"(" << scan_desc.getTableId() <<
", " << input_col_desc->getColId() <<
", " 1281 << scan_desc.getNestLevel() <<
") ";
1284 os <<
"\n\tSimple Quals: " 1288 if (!ra_exe_unit.
quals.empty()) {
1293 os <<
"\n\tJoin Quals: ";
1294 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1295 const auto& join_condition = ra_exe_unit.
join_quals[i];
1302 os <<
"\n\tGroup By: " 1306 os <<
"\n\tProjected targets: " 1309 os <<
"\n\tSort Info: ";
1310 const auto& sort_info = ra_exe_unit.
sort_info;
1311 os <<
"\n\t Order Entries: " 1319 os <<
"\n\tUnion: " << std::string(*ra_exe_unit.
union_all ?
"UNION ALL" :
"UNION");
1327 const size_t new_scan_limit) {
1331 ra_exe_unit_in.
quals,
1348 const std::vector<InputTableInfo>& query_infos,
1354 const bool has_cardinality_estimation,
1356 VLOG(1) <<
"Executor " <<
executor_id_ <<
" is executing work unit:" << ra_exe_unit_in;
1378 has_cardinality_estimation,
1384 result->setValidationOnlyRes();
1400 has_cardinality_estimation,
1406 result->setValidationOnlyRes();
1414 size_t& max_groups_buffer_entry_guess,
1416 const bool allow_single_frag_table_opt,
1417 const std::vector<InputTableInfo>& query_infos,
1422 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1424 const bool has_cardinality_estimation,
1427 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1429 CHECK(!query_infos.empty());
1430 if (!max_groups_buffer_entry_guess) {
1442 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1443 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1451 query_mem_desc_owned =
1452 query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1454 has_cardinality_estimation,
1470 CHECK(query_mem_desc_owned);
1471 crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1478 plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1479 CHECK(!query_mem_desc_owned);
1480 query_mem_desc_owned.reset(
1487 for (
const auto target_expr : ra_exe_unit.target_exprs) {
1488 plan_state_->target_exprs_.push_back(target_expr);
1495 const auto context_count =
1504 allow_single_frag_table_opt,
1506 *query_comp_desc_owned,
1507 *query_mem_desc_owned,
1513 VLOG(1) <<
"Using TBB thread pool for kernel dispatch.";
1514 launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1515 std::move(kernels));
1517 throw std::runtime_error(
1518 "This build is not TBB enabled. Restart the server with " 1519 "\"enable-modern-thread-pool\" disabled.");
1522 launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1523 std::move(kernels));
1536 static_cast<size_t>(crt_min_byte_width << 1) <=
sizeof(int64_t)) {
1537 crt_min_byte_width <<= 1;
1547 *query_mem_desc_owned,
1548 query_comp_desc_owned->getDeviceType(),
1553 crt_min_byte_width <<= 1;
1559 }
while (static_cast<size_t>(crt_min_byte_width) <=
sizeof(int64_t));
1561 return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1576 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
1579 std::vector<InputTableInfo> table_infos{table_info};
1583 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1584 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1589 query_mem_desc_owned =
1590 query_comp_desc_owned->compile(0,
1602 CHECK(query_mem_desc_owned);
1603 CHECK_EQ(
size_t(1), ra_exe_unit.input_descs.size());
1604 const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1612 for (
size_t fragment_index = 0; fragment_index < outer_fragments.size();
1622 *query_comp_desc_owned,
1623 *query_mem_desc_owned,
1628 kernel.
run(
this, kernel_context);
1634 for (
size_t fragment_index = 0; fragment_index < outer_fragments.size();
1636 const auto fragment_results = all_fragment_results[fragment_index];
1637 cb(fragment_results.first, outer_fragments[fragment_index]);
1643 const std::vector<InputTableInfo>& table_infos,
1655 compilation_context.
compile(exe_unit, co,
this);
1659 exe_unit, table_infos, &compilation_context, column_fetcher, co.
device_type,
this);
1663 return std::make_shared<ResultSet>(query_comp_desc.
getIR());
1669 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1673 if ((agg_info.agg_kind ==
kAVG || agg_info.agg_kind ==
kSUM) &&
1674 agg_info.agg_arg_type.get_type() ==
kDOUBLE) {
1678 if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
1682 return requested_device_type;
1691 int64_t float_null_val = 0;
1692 *
reinterpret_cast<float*
>(may_alias_ptr(&float_null_val)) =
1694 return float_null_val;
1697 return *
reinterpret_cast<const int64_t*
>(may_alias_ptr(&double_null_val));
1703 std::vector<int64_t>& entry,
1704 const std::vector<Analyzer::Expr*>& target_exprs,
1706 for (
size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
1707 const auto target_expr = target_exprs[target_idx];
1709 CHECK(agg_info.is_agg);
1710 target_infos.push_back(agg_info);
1712 const auto executor = query_mem_desc.
getExecutor();
1714 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1715 CHECK(row_set_mem_owner);
1716 const auto& count_distinct_desc =
1719 CHECK(row_set_mem_owner);
1720 auto count_distinct_buffer = row_set_mem_owner->allocateCountDistinctBuffer(
1721 count_distinct_desc.bitmapPaddedSizeBytes());
1722 entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
1726 auto count_distinct_set =
new std::set<int64_t>();
1727 CHECK(row_set_mem_owner);
1728 row_set_mem_owner->addCountDistinctSet(count_distinct_set);
1729 entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
1736 }
else if (agg_info.agg_kind ==
kAVG) {
1737 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
1740 if (agg_info.sql_type.is_geometry()) {
1741 for (
int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
1744 }
else if (agg_info.sql_type.is_varlen()) {
1748 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
1751 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
1757 const std::vector<Analyzer::Expr*>& target_exprs_in,
1760 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
1761 std::vector<Analyzer::Expr*> target_exprs;
1762 for (
const auto target_expr : target_exprs_in) {
1763 const auto target_expr_copy =
1765 CHECK(target_expr_copy);
1766 auto ti = target_expr->get_type_info();
1767 ti.set_notnull(
false);
1768 target_expr_copy->set_type_info(ti);
1769 if (target_expr_copy->get_arg()) {
1770 auto arg_ti = target_expr_copy->get_arg()->get_type_info();
1771 arg_ti.set_notnull(
false);
1772 target_expr_copy->get_arg()->set_type_info(arg_ti);
1774 target_exprs_owned_copies.push_back(target_expr_copy);
1775 target_exprs.push_back(target_expr_copy.get());
1777 std::vector<TargetInfo> target_infos;
1778 std::vector<int64_t> entry;
1780 const auto executor = query_mem_desc.
getExecutor();
1782 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
1783 CHECK(row_set_mem_owner);
1784 auto rs = std::make_shared<ResultSet>(target_infos,
1788 executor->getCatalog(),
1789 executor->blockSize(),
1790 executor->gridSize());
1791 rs->allocateStorage();
1792 rs->fillOneEntry(entry);
1803 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1809 ra_exe_unit.
target_exprs, query_mem_desc, device_type);
1814 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1815 }
catch (
const std::bad_alloc&) {
1819 const auto shard_count =
1824 if (shard_count && !result_per_device.empty()) {
1828 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1845 size_t output_row_index,
1847 const std::vector<uint32_t>& top_permutation) {
1850 for (
const auto sorted_idx : top_permutation) {
1852 for (
size_t group_idx = 0; group_idx < input_query_mem_desc.
getKeyCount();
1854 const auto input_column_ptr =
1857 const auto output_column_ptr =
1860 output_row_index * output_query_mem_desc.
groupColWidth(group_idx);
1861 memcpy(output_column_ptr,
1866 for (
size_t slot_idx = 0; slot_idx < input_query_mem_desc.
getSlotCount();
1868 const auto input_column_ptr =
1871 const auto output_column_ptr =
1874 memcpy(output_column_ptr,
1880 return output_row_index;
1894 size_t output_row_index,
1896 const std::vector<uint32_t>& top_permutation) {
1899 for (
const auto sorted_idx : top_permutation) {
1900 const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.
getRowSize();
1901 memcpy(output_buffer + output_row_index * output_query_mem_desc.
getRowSize(),
1906 return output_row_index;
1917 const auto first_result_set = result_per_device.front().first;
1918 CHECK(first_result_set);
1919 auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1920 CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1922 top_query_mem_desc.setEntryCount(0);
1923 for (
auto&
result : result_per_device) {
1927 size_t new_entry_cnt = top_query_mem_desc.getEntryCount() +
result_set->rowCount();
1928 top_query_mem_desc.setEntryCount(new_entry_cnt);
1930 auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1931 first_result_set->getDeviceType(),
1933 first_result_set->getRowSetMemOwner(),
1937 auto top_storage = top_result_set->allocateStorage();
1938 size_t top_output_row_idx{0};
1939 for (
auto&
result : result_per_device) {
1942 const auto& top_permutation =
result_set->getPermutationBuffer();
1943 CHECK_LE(top_permutation.size(), top_n);
1944 if (top_query_mem_desc.didOutputColumnar()) {
1959 CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1960 return top_result_set;
1965 std::unordered_map<int, const Analyzer::BinOper*> id_to_cond;
1967 CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
1968 for (
size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
1969 int inner_table_id = join_info.join_hash_tables_[i]->getInnerTableId();
1971 std::make_pair(inner_table_id, join_info.equi_join_tautologies_[i].get()));
1979 for (
const auto& col : fetched_cols) {
1980 if (col.is_lazily_fetched) {
1993 const std::vector<InputTableInfo>& table_infos,
1996 const bool allow_single_frag_table_opt,
1997 const size_t context_count,
2001 std::unordered_set<int>& available_gpus,
2002 int& available_cpus) {
2003 std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2010 : std::vector<Data_Namespace::MemoryInfo>{},
2016 const bool uses_lazy_fetch =
2021 const auto device_count =
deviceCount(device_type);
2024 fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2028 use_multifrag_kernel,
2031 if (eo.
with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2035 if (use_multifrag_kernel) {
2036 VLOG(1) <<
"Creating multifrag execution kernels";
2044 auto multifrag_kernel_dispatch = [&ra_exe_unit,
2050 render_info](
const int device_id,
2052 const int64_t rowid_lookup_key) {
2053 execution_kernels.emplace_back(
2054 std::make_unique<ExecutionKernel>(ra_exe_unit,
2066 fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2068 VLOG(1) <<
"Creating one execution kernel per fragment";
2073 table_infos.size() == 1 && table_infos.front().table_id > 0) {
2074 const auto max_frag_size =
2075 table_infos.front().info.getFragmentNumTuplesUpperBound();
2078 <<
" to match max fragment size " << max_frag_size
2079 <<
" for kernel per fragment execution path.";
2084 size_t frag_list_idx{0};
2085 auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2093 render_info](
const int device_id,
2095 const int64_t rowid_lookup_key) {
2096 if (!frag_list.size()) {
2101 execution_kernels.emplace_back(
2102 std::make_unique<ExecutionKernel>(ra_exe_unit,
2116 fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2120 return execution_kernels;
2123 template <
typename THREAD_POOL>
2125 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels) {
2130 THREAD_POOL thread_pool;
2131 VLOG(1) <<
"Launching " << kernels.size() <<
" kernels for query.";
2132 for (
auto& kernel : kernels) {
2138 kernel->
run(
this, shared_context);
2148 const size_t table_idx,
2149 const size_t outer_frag_idx,
2150 std::map<int, const TableFragments*>& selected_tables_fragments,
2151 const std::unordered_map<int, const Analyzer::BinOper*>&
2152 inner_table_id_to_join_condition) {
2153 const int table_id = ra_exe_unit.
input_descs[table_idx].getTableId();
2154 auto table_frags_it = selected_tables_fragments.find(table_id);
2155 CHECK(table_frags_it != selected_tables_fragments.end());
2156 const auto& outer_input_desc = ra_exe_unit.
input_descs[0];
2157 const auto outer_table_fragments_it =
2158 selected_tables_fragments.find(outer_input_desc.getTableId());
2159 const auto outer_table_fragments = outer_table_fragments_it->second;
2160 CHECK(outer_table_fragments_it != selected_tables_fragments.end());
2161 CHECK_LT(outer_frag_idx, outer_table_fragments->size());
2163 return {outer_frag_idx};
2165 const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
2166 auto& inner_frags = table_frags_it->second;
2168 std::vector<size_t> all_frag_ids;
2169 for (
size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
2171 const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
2175 inner_table_id_to_join_condition,
2180 all_frag_ids.push_back(inner_frag_idx);
2182 return all_frag_ids;
2190 const int table_idx,
2191 const std::unordered_map<int, const Analyzer::BinOper*>&
2192 inner_table_id_to_join_condition,
2198 CHECK(table_idx >= 0 &&
2199 static_cast<size_t>(table_idx) < ra_exe_unit.
input_descs.size());
2200 const int inner_table_id = ra_exe_unit.
input_descs[table_idx].getTableId();
2202 if (outer_fragment_info.
shard == -1 || inner_fragment_info.
shard == -1 ||
2203 outer_fragment_info.
shard == inner_fragment_info.
shard) {
2208 CHECK(!inner_table_id_to_join_condition.empty());
2209 auto condition_it = inner_table_id_to_join_condition.find(inner_table_id);
2210 CHECK(condition_it != inner_table_id_to_join_condition.end());
2211 join_condition = condition_it->second;
2212 CHECK(join_condition);
2215 plan_state_->join_info_.join_hash_tables_.size());
2216 for (
size_t i = 0; i <
plan_state_->join_info_.join_hash_tables_.size(); ++i) {
2217 if (
plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
2219 CHECK(!join_condition);
2220 join_condition =
plan_state_->join_info_.equi_join_tautologies_[i].get();
2224 if (!join_condition) {
2228 if (join_condition->is_overlaps_oper()) {
2231 size_t shard_count{0};
2232 if (dynamic_cast<const Analyzer::ExpressionTuple*>(
2233 join_condition->get_left_operand())) {
2234 auto inner_outer_pairs =
2237 join_condition,
this, inner_outer_pairs);
2241 if (shard_count && !ra_exe_unit.
join_quals.empty()) {
2242 plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
2252 const int col_id = col_desc->
getColId();
2259 const std::vector<InputDescriptor>& input_descs,
2260 const std::map<int, const TableFragments*>& all_tables_fragments) {
2261 std::map<size_t, std::vector<uint64_t>> tab_id_to_frag_offsets;
2262 for (
auto& desc : input_descs) {
2263 const auto fragments_it = all_tables_fragments.find(desc.getTableId());
2264 CHECK(fragments_it != all_tables_fragments.end());
2265 const auto& fragments = *fragments_it->second;
2266 std::vector<uint64_t> frag_offsets(fragments.size(), 0);
2267 for (
size_t i = 0, off = 0; i < fragments.size(); ++i) {
2268 frag_offsets[i] = off;
2269 off += fragments[i].getNumTuples();
2271 tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableId(), frag_offsets));
2273 return tab_id_to_frag_offsets;
2276 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
2279 const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
2280 const std::vector<InputDescriptor>& input_descs,
2281 const std::map<int, const TableFragments*>& all_tables_fragments) {
2282 std::vector<std::vector<int64_t>> all_num_rows;
2283 std::vector<std::vector<uint64_t>> all_frag_offsets;
2284 const auto tab_id_to_frag_offsets =
2286 std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
2287 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2288 std::vector<int64_t> num_rows;
2289 std::vector<uint64_t> frag_offsets;
2291 CHECK_EQ(selected_frag_ids.size(), input_descs.size());
2293 for (
size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
2294 const auto frag_id = ra_exe_unit.
union_all ? 0 : selected_frag_ids[tab_idx];
2295 const auto fragments_it =
2296 all_tables_fragments.find(input_descs[tab_idx].getTableId());
2297 CHECK(fragments_it != all_tables_fragments.end());
2298 const auto& fragments = *fragments_it->second;
2299 if (ra_exe_unit.
join_quals.empty() || tab_idx == 0 ||
2300 plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
2301 const auto& fragment = fragments[frag_id];
2302 num_rows.push_back(fragment.getNumTuples());
2304 size_t total_row_count{0};
2305 for (
const auto& fragment : fragments) {
2306 total_row_count += fragment.getNumTuples();
2308 num_rows.push_back(total_row_count);
2310 const auto frag_offsets_it =
2311 tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableId());
2312 CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
2313 const auto& offsets = frag_offsets_it->second;
2315 frag_offsets.push_back(offsets[frag_id]);
2317 all_num_rows.push_back(num_rows);
2319 all_frag_offsets.push_back(frag_offsets);
2321 return {all_num_rows, all_frag_offsets};
2329 const auto& input_descs = ra_exe_unit.
input_descs;
2331 if (nest_level < 1 ||
2333 ra_exe_unit.
join_quals.empty() || input_descs.size() < 2 ||
2335 plan_state_->isLazyFetchColumn(inner_col_desc))) {
2339 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
2340 CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
2341 const auto& fragments = selected_fragments[nest_level].fragment_ids;
2342 return fragments.size() > 1;
2354 const int device_id,
2356 const std::map<int, const TableFragments*>& all_tables_fragments,
2359 std::list<ChunkIter>& chunk_iterators,
2360 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2365 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2366 std::vector<size_t> local_col_to_frag_pos;
2368 local_col_to_frag_pos,
2374 selected_fragments_crossjoin);
2376 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2377 std::vector<std::vector<int64_t>> all_num_rows;
2378 std::vector<std::vector<uint64_t>> all_frag_offsets;
2380 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2381 std::vector<const int8_t*> frag_col_buffers(
2383 for (
const auto& col_id : col_global_ids) {
2391 const int table_id = col_id->getScanDesc().getTableId();
2393 if (cd && cd->isVirtualCol) {
2397 const auto fragments_it = all_tables_fragments.find(table_id);
2398 CHECK(fragments_it != all_tables_fragments.end());
2399 const auto fragments = fragments_it->second;
2400 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2402 CHECK_LT(static_cast<size_t>(it->second),
2404 const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2405 if (!fragments->size()) {
2408 CHECK_LT(frag_id, fragments->size());
2409 auto memory_level_for_column = memory_level;
2411 std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2417 col_id.get(), memory_level_for_column, device_id, device_allocator);
2420 frag_col_buffers[it->second] =
2423 all_tables_fragments,
2424 memory_level_for_column,
2428 frag_col_buffers[it->second] =
2432 all_tables_fragments,
2435 memory_level_for_column,
2441 all_frag_col_buffers.push_back(frag_col_buffers);
2444 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
2445 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2453 const int device_id,
2455 const std::map<int, const TableFragments*>& all_tables_fragments,
2458 std::list<ChunkIter>& chunk_iterators,
2459 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
2464 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2465 std::vector<std::vector<int64_t>> all_num_rows;
2466 std::vector<std::vector<uint64_t>> all_frag_offsets;
2468 CHECK(!selected_fragments.empty());
2471 using TableId = int;
2472 TableId
const selected_table_id = selected_fragments.front().table_id;
2473 bool const input_descs_index =
2474 selected_table_id == ra_exe_unit.
input_descs[1].getTableId();
2475 if (!input_descs_index) {
2478 bool const input_col_descs_index =
2479 selected_table_id ==
2480 (*std::next(ra_exe_unit.
input_col_descs.begin()))->getScanDesc().getTableId();
2481 if (!input_col_descs_index) {
2485 VLOG(2) <<
"selected_fragments.size()=" << selected_fragments.size()
2486 <<
" selected_table_id=" << selected_table_id
2487 <<
" input_descs_index=" << int(input_descs_index)
2488 <<
" input_col_descs_index=" << int(input_col_descs_index)
2489 <<
" ra_exe_unit.input_descs=" 2491 <<
" ra_exe_unit.input_col_descs=" 2495 std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2496 table_id_to_input_col_descs;
2498 TableId
const table_id = input_col_desc->getScanDesc().getTableId();
2499 table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2501 for (
auto const& pair : table_id_to_input_col_descs) {
2502 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2503 std::vector<size_t> local_col_to_frag_pos;
2506 local_col_to_frag_pos,
2512 selected_fragments_crossjoin);
2514 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
2515 std::vector<const int8_t*> frag_col_buffers(
2517 for (
const auto& col_id : pair.second) {
2519 const int table_id = col_id->getScanDesc().getTableId();
2522 if (cd && cd->isVirtualCol) {
2526 const auto fragments_it = all_tables_fragments.find(table_id);
2527 CHECK(fragments_it != all_tables_fragments.end());
2528 const auto fragments = fragments_it->second;
2529 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2531 CHECK_LT(static_cast<size_t>(it->second),
2533 const size_t frag_id = ra_exe_unit.
union_all 2535 : selected_frag_ids[local_col_to_frag_pos[it->second]];
2536 if (!fragments->size()) {
2539 CHECK_LT(frag_id, fragments->size());
2540 auto memory_level_for_column = memory_level;
2542 std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2548 col_id.get(), memory_level_for_column, device_id, device_allocator);
2551 frag_col_buffers[it->second] =
2554 all_tables_fragments,
2555 memory_level_for_column,
2559 frag_col_buffers[it->second] =
2563 all_tables_fragments,
2566 memory_level_for_column,
2572 all_frag_col_buffers.push_back(frag_col_buffers);
2574 std::vector<std::vector<int64_t>> num_rows;
2575 std::vector<std::vector<uint64_t>> frag_offsets;
2577 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
2578 all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end());
2579 all_frag_offsets.insert(
2580 all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end());
2584 for (
size_t i = 0; i < all_frag_col_buffers.front().size(); ++i) {
2585 all_frag_col_buffers[i & 1][i] = all_frag_col_buffers[i & 1][i ^ 1];
2587 if (input_descs_index == input_col_descs_index) {
2588 std::swap(all_frag_col_buffers[0], all_frag_col_buffers[1]);
2594 <<
" input_col_descs_index=" << input_col_descs_index;
2595 return {{all_frag_col_buffers[input_descs_index]},
2596 {{all_num_rows[0][input_descs_index]}},
2597 {{all_frag_offsets[0][input_descs_index]}}};
2601 const size_t scan_idx,
2605 !
plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
2606 !selected_fragments[scan_idx].fragment_ids.empty()) {
2611 return selected_fragments[scan_idx].fragment_ids;
2615 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2616 std::vector<size_t>& local_col_to_frag_pos,
2617 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2620 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
2622 const auto& input_descs = ra_exe_unit.
input_descs;
2623 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2624 const int table_id = input_descs[scan_idx].getTableId();
2625 CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2626 selected_fragments_crossjoin.push_back(
2628 for (
const auto& col_id : col_global_ids) {
2630 const auto& input_desc = col_id->getScanDesc();
2631 if (input_desc.getTableId() != table_id ||
2632 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
2635 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2637 CHECK_LT(static_cast<size_t>(it->second),
2639 local_col_to_frag_pos[it->second] = frag_pos;
2646 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
2647 std::vector<size_t>& local_col_to_frag_pos,
2648 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
2651 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
2653 const auto& input_descs = ra_exe_unit.
input_descs;
2654 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2655 const int table_id = input_descs[scan_idx].getTableId();
2658 if (selected_fragments[0].table_id != table_id) {
2662 selected_fragments_crossjoin.push_back(
2665 for (
const auto& col_id : col_global_ids) {
2667 const auto& input_desc = col_id->getScanDesc();
2668 if (input_desc.getTableId() != table_id ||
2669 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
2672 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
2674 CHECK_LT(static_cast<size_t>(it->second),
2676 local_col_to_frag_pos[it->second] = frag_pos;
2686 OutVecOwner(
const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
2688 for (
auto out : out_vec_) {
2701 const bool hoist_literals,
2703 const std::vector<Analyzer::Expr*>& target_exprs,
2705 std::vector<std::vector<const int8_t*>>& col_buffers,
2707 const std::vector<std::vector<int64_t>>& num_rows,
2708 const std::vector<std::vector<uint64_t>>& frag_offsets,
2710 const int device_id,
2711 const uint32_t start_rowid,
2712 const uint32_t num_tables,
2717 if (col_buffers.empty()) {
2726 <<
"CUDA disabled rendering in the executePlanWithoutGroupBy query path is " 2727 "currently unsupported.";
2732 std::vector<int64_t*> out_vec;
2735 std::unique_ptr<OutVecOwner> output_memory_scope;
2744 CHECK(cpu_generated_code);
2746 cpu_generated_code.get(),
2755 join_hash_table_ptrs);
2756 output_memory_scope.reset(
new OutVecOwner(out_vec));
2760 CHECK(gpu_generated_code);
2764 gpu_generated_code.get(),
2778 join_hash_table_ptrs,
2779 render_allocator_map_ptr);
2780 output_memory_scope.reset(
new OutVecOwner(out_vec));
2783 }
catch (
const std::exception& e) {
2784 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
2801 std::vector<int64_t> reduced_outs;
2802 const auto num_frags = col_buffers.size();
2803 const size_t entry_count =
2809 if (
size_t(1) == entry_count) {
2810 for (
auto out : out_vec) {
2812 reduced_outs.push_back(*out);
2815 size_t out_vec_idx = 0;
2817 for (
const auto target_expr : target_exprs) {
2819 CHECK(agg_info.is_agg);
2821 const int num_iterations = agg_info.sql_type.is_geometry()
2822 ? agg_info.sql_type.get_physical_coord_cols()
2825 for (
int i = 0; i < num_iterations; i++) {
2831 val1 = out_vec[out_vec_idx][0];
2834 const auto chosen_bytes =
static_cast<size_t>(
2840 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
2841 out_vec[out_vec_idx],
2844 float_argument_input);
2849 reduced_outs.push_back(val1);
2850 if (agg_info.agg_kind ==
kAVG ||
2851 (agg_info.agg_kind ==
kSAMPLE &&
2852 (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2853 const auto chosen_bytes =
static_cast<size_t>(
2858 agg_info.agg_kind ==
kAVG ?
kCOUNT : agg_info.agg_kind,
2861 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
2862 out_vec[out_vec_idx + 1],
2869 reduced_outs.push_back(val2);
2882 auto rows_ptr = std::shared_ptr<ResultSet>(
2884 rows_ptr->fillOneEntry(reduced_outs);
2885 results = std::move(rows_ptr);
2893 return results && results->rowCount() < scan_limit;
2901 const bool hoist_literals,
2904 std::vector<std::vector<const int8_t*>>& col_buffers,
2905 const std::vector<size_t> outer_tab_frag_ids,
2907 const std::vector<std::vector<int64_t>>& num_rows,
2908 const std::vector<std::vector<uint64_t>>& frag_offsets,
2910 const int device_id,
2911 const int outer_table_id,
2912 const int64_t scan_limit,
2913 const uint32_t start_rowid,
2914 const uint32_t num_tables,
2919 if (col_buffers.empty()) {
2940 VLOG(2) <<
"bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.
union_all)
2941 <<
" ra_exe_unit.input_descs=" 2943 <<
" ra_exe_unit.input_col_descs=" 2945 <<
" ra_exe_unit.scan_limit=" << ra_exe_unit.
scan_limit 2948 <<
" query_exe_context->query_buffers_->num_rows_=" 2950 <<
" query_exe_context->query_mem_desc_.getEntryCount()=" 2952 <<
" device_id=" << device_id <<
" outer_table_id=" << outer_table_id
2953 <<
" scan_limit=" << scan_limit <<
" start_rowid=" << start_rowid
2954 <<
" num_tables=" << num_tables;
2961 std::stable_sort(ra_exe_unit_copy.
input_descs.begin(),
2963 [outer_table_id](
auto const& a,
auto const& b) {
2964 return a.getTableId() == outer_table_id &&
2965 b.getTableId() != outer_table_id;
2968 ra_exe_unit_copy.
input_descs.back().getTableId() != outer_table_id) {
2973 [outer_table_id](
auto const& input_col_desc) {
2974 return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2982 CHECK(cpu_generated_code);
2985 cpu_generated_code.get(),
2994 join_hash_table_ptrs);
2999 CHECK(gpu_generated_code);
3002 gpu_generated_code.get(),
3016 join_hash_table_ptrs,
3017 render_allocator_map_ptr);
3024 }
catch (
const std::exception& e) {
3025 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3040 results = query_exe_context->
getRowSet(ra_exe_unit_copy,
3043 VLOG(2) <<
"results->rowCount()=" << results->rowCount();
3044 results->holdLiterals(hoist_buf);
3046 if (error_code < 0 && render_allocator_map_ptr) {
3047 auto const adjusted_scan_limit =
3051 if (adjusted_scan_limit != 0) {
3065 const int device_id) {
3066 std::vector<int64_t> table_ptrs;
3067 const auto& join_hash_tables =
plan_state_->join_info_.join_hash_tables_;
3068 for (
auto hash_table : join_hash_tables) {
3070 CHECK(table_ptrs.empty());
3073 table_ptrs.push_back(hash_table->getJoinHashBuffer(
3080 const std::vector<InputTableInfo>& query_infos,
3085 const bool contains_left_deep_outer_join =
3086 ra_exe_unit && std::find_if(ra_exe_unit->
join_quals.begin(),
3099 const std::vector<InputTableInfo>& query_infos) {
3101 const auto ld_count = input_descs.size();
3103 for (
size_t i = 0; i < ld_count; ++i) {
3105 const auto frag_count = query_infos[i].info.fragments.size();
3109 if (frag_count > 1) {
3111 cgen_state_->ir_builder_.CreateLoad(frag_off_ptr));
3120 const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
3121 const std::vector<InputTableInfo>& query_infos,
3123 const HashType preferred_hash_type,
3127 return {
nullptr,
"Overlaps hash join disabled, attempting to fall back to loop join"};
3139 preferred_hash_type,
3146 return {
nullptr, e.what()};
3154 const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3155 CHECK(!dev_props.empty());
3156 return dev_props.front().warpSize;
3194 const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3195 return static_cast<int64_t
>(dev_props.front().clockKhz) * milliseconds;
3202 if (value->getType()->isIntegerTy() && from_ti.
is_number() && to_ti.
is_fp() &&
3207 fp_type = llvm::Type::getFloatTy(
cgen_state_->context_);
3210 fp_type = llvm::Type::getDoubleTy(
cgen_state_->context_);
3215 value =
cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3227 CHECK(val->getType()->isPointerTy());
3229 const auto val_ptr_type =
static_cast<llvm::PointerType*
>(val->getType());
3230 const auto val_type = val_ptr_type->getElementType();
3231 size_t val_width = 0;
3232 if (val_type->isIntegerTy()) {
3233 val_width = val_type->getIntegerBitWidth();
3235 if (val_type->isFloatTy()) {
3238 CHECK(val_type->isDoubleTy());
3243 if (bitWidth == val_width) {
3250 #define EXECUTE_INCLUDE 3254 #undef EXECUTE_INCLUDE 3262 auto ra_exe_unit_with_deleted = ra_exe_unit;
3264 for (
const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3274 CHECK(deleted_cd->columnType.is_boolean());
3277 for (
const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3278 if (input_col.get()->getColId() == deleted_cd->columnId &&
3279 input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3280 input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3287 deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3288 auto deleted_cols_it = deleted_cols_map.find(deleted_cd->tableId);
3289 if (deleted_cols_it == deleted_cols_map.end()) {
3290 CHECK(deleted_cols_map.insert(std::make_pair(deleted_cd->tableId, deleted_cd))
3293 CHECK_EQ(deleted_cd, deleted_cols_it->second);
3297 return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3305 const int64_t chunk_min,
3306 const int64_t chunk_max,
3311 CHECK(ldim != rdim);
3315 return {
true, chunk_min / scale, chunk_max / scale};
3319 boost::multiprecision::cpp_int_backend<64,
3321 boost::multiprecision::signed_magnitude,
3322 boost::multiprecision::checked,
3327 std::make_tuple(
true,
3331 }
catch (
const std::overflow_error& e) {
3334 return std::make_tuple(
false, chunk_min, chunk_max);
3342 const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
3343 const std::vector<uint64_t>& frag_offsets,
3344 const size_t frag_idx) {
3345 const int table_id = table_desc.
getTableId();
3346 for (
const auto& simple_qual : simple_quals) {
3347 const auto comp_expr =
3355 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3359 CHECK(lhs_uexpr->get_optype() ==
3362 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3369 const auto rhs = comp_expr->get_right_operand();
3375 if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time()) {
3378 const int col_id = lhs_col->get_column_id();
3380 int64_t chunk_min{0};
3381 int64_t chunk_max{0};
3382 bool is_rowid{
false};
3383 size_t start_rowid{0};
3386 if (cd->isVirtualCol) {
3387 CHECK(cd->columnName ==
"rowid");
3389 start_rowid = table_generation.start_rowid;
3390 chunk_min = frag_offsets[frag_idx] + start_rowid;
3391 chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
3395 const auto& chunk_type = lhs_col->get_type_info();
3396 chunk_min =
extract_min_stat(chunk_meta_it->second->chunkStats, chunk_type);
3397 chunk_max =
extract_max_stat(chunk_meta_it->second->chunkStats, chunk_type);
3399 if (chunk_min > chunk_max) {
3403 if (lhs->get_type_info().is_timestamp() &&
3404 (lhs_col->get_type_info().get_dimension() !=
3405 rhs_const->get_type_info().get_dimension()) &&
3406 (lhs_col->get_type_info().is_high_precision_timestamp() ||
3407 rhs_const->get_type_info().is_high_precision_timestamp())) {
3416 std::tie(is_valid, chunk_min, chunk_max) =
3418 chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
3420 VLOG(4) <<
"Overflow/Underflow detecting in fragments skipping logic.\nChunk min " 3424 <<
"\nLHS col precision is: " 3426 <<
"\nRHS precision is: " 3427 <<
std::to_string(rhs_const->get_type_info().get_dimension()) <<
".";
3431 llvm::LLVMContext local_context;
3432 CgenState local_cgen_state(local_context);
3435 const auto rhs_val =
3438 switch (comp_expr->get_optype()) {
3440 if (chunk_max < rhs_val) {
3445 if (chunk_max <= rhs_val) {
3450 if (chunk_min > rhs_val) {
3455 if (chunk_min >= rhs_val) {
3460 if (chunk_min > rhs_val || chunk_max < rhs_val) {
3462 }
else if (is_rowid) {
3463 return {
false, rhs_val - start_rowid};
3501 const std::vector<uint64_t>& frag_offsets,
3502 const size_t frag_idx) {
3503 std::pair<bool, int64_t> skip_frag{
false, -1};
3504 for (
auto& inner_join : ra_exe_unit.
join_quals) {
3511 std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
3512 for (
auto& qual : inner_join.quals) {
3514 inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
3515 temp_qual.simple_quals.begin(),
3516 temp_qual.simple_quals.end());
3519 table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
3520 if (temp_skip_frag.second != -1) {
3521 skip_frag.second = temp_skip_frag.second;
3524 skip_frag.first = skip_frag.first || temp_skip_frag.first;
3531 const std::unordered_set<PhysicalInput>& phys_inputs) {
3534 std::unordered_set<int> phys_table_ids;
3535 for (
const auto& phys_input : phys_inputs) {
3536 phys_table_ids.insert(phys_input.table_id);
3538 std::vector<InputTableInfo> query_infos;
3539 for (
const int table_id : phys_table_ids) {
3542 for (
const auto& phys_input : phys_inputs) {
3547 const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3548 cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3550 agg_col_range_cache.
setColRange(phys_input, col_range);
3553 return agg_col_range_cache;
3557 const std::unordered_set<PhysicalInput>& phys_inputs) {
3560 for (
const auto& phys_input : phys_inputs) {
3564 const auto& col_ti =
3565 cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3566 if (col_ti.is_string() && col_ti.get_compression() ==
kENCODING_DICT) {
3567 const int dict_id = col_ti.get_comp_param();
3569 CHECK(dd && dd->stringDict);
3571 dd->stringDict->storageEntryCount());
3574 return string_dictionary_generations;
3578 std::unordered_set<int> phys_table_ids) {
3580 for (
const int table_id : phys_table_ids) {
3584 TableGeneration{
static_cast<int64_t
>(table_info.getPhysicalNumTuples()), 0});
3586 return table_generations;
3590 const std::unordered_set<int>& phys_table_ids) {
3604 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
3613 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
3616 return !candidate_query_session.empty() &&
3621 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3627 std::shared_ptr<const query_state::QueryState>&
query_state) {
3629 std::string query_str =
"N/A";
3633 if (query_state !=
nullptr && query_state->getConstSessionInfo() !=
nullptr) {
3634 query_session_id = query_state->getConstSessionInfo()->get_session_id();
3635 query_str = query_state->getQueryStr();
3640 if (!query_session_id.empty()) {
3646 query_state->getQuerySubmittedTime(),
3647 QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3650 return {query_session_id, query_str};
3657 if (query_session.empty()) {
3664 VLOG(1) <<
"Interrupting pending query is not available since the query session is " 3669 <<
"Interrupting pending query is not available since its interrupt flag is " 3681 const std::chrono::time_point<std::chrono::system_clock> submitted,
3682 bool acquire_spin_lock) {
3685 if (query_session.empty()) {
3692 if (acquire_spin_lock) {
3702 std::shared_ptr<const query_state::QueryState>&
query_state,
3707 auto query_session = query_state->getConstSessionInfo()->get_session_id();
3708 if (query_session.empty()) {
3711 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3716 query_state->getQuerySubmittedTime(),
3718 session_write_lock);
3724 const std::chrono::time_point<std::chrono::system_clock> submitted,
3728 if (query_session.empty()) {
3731 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING) {
3736 query_session, submitted, new_query_status, session_write_lock);
3741 const std::string& query_str,
3742 const std::chrono::time_point<std::chrono::system_clock> submitted,
3743 const size_t executor_id,
3747 if (query_session.empty()) {
3754 query_session_status,
3755 session_write_lock);
3760 const std::string& query_str,
3761 const std::chrono::time_point<std::chrono::system_clock> submitted,
3762 const size_t executor_id,
3764 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3773 query_session, executor_id, query_str, submitted, query_status));
3778 query_session, executor_id, query_str, submitted, query_status));
3781 std::map<std::string, QuerySessionStatus> executor_per_query_map;
3782 executor_per_query_map.emplace(
3785 query_session, executor_id, query_str, submitted, query_status));
3793 const std::chrono::time_point<std::chrono::system_clock> submitted,
3795 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3797 if (query_session.empty()) {
3803 auto target_submitted_t_str =
3804 ::toString(query_status.second.getQuerySubmittedTime());
3806 if (t_str.compare(target_submitted_t_str) == 0) {
3807 auto prev_status = query_status.second.getQueryStatus();
3808 if (prev_status == updated_query_status) {
3811 query_status.second.setQueryStatus(updated_query_status);
3821 const std::chrono::time_point<std::chrono::system_clock> submitted,
3822 const size_t executor_id,
3823 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3825 if (query_session.empty()) {
3831 for (
auto it = storage.begin(); it != storage.end(); it++) {
3832 auto target_submitted_t_str =
::toString(it->second.getQuerySubmittedTime());
3834 if (t_str.compare(target_submitted_t_str) == 0) {
3845 const std::chrono::time_point<std::chrono::system_clock> submitted,
3846 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3847 if (query_session.empty()) {
3853 if (storage.size() > 1) {
3855 for (
auto it = storage.begin(); it != storage.end(); it++) {
3856 auto target_submitted_t_str =
::toString(it->second.getQuerySubmittedTime());
3859 t_str.compare(target_submitted_t_str) == 0) {
3864 }
else if (storage.size() == 1) {
3871 VLOG(1) <<
"RESET Executor " <<
this <<
" that had previously been interrupted";
3881 mapd_unique_lock<mapd_shared_mutex>&
write_lock) {
3882 if (query_session.empty()) {
3892 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
3900 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
3905 const double runtime_query_check_freq,
3906 const unsigned pending_query_check_freq)
const {
3920 const size_t cache_value) {
3924 VLOG(1) <<
"Put estimated cardinality to the cache";
3932 VLOG(1) <<
"Reuse cached cardinality";
3940 mapd_shared_lock<mapd_shared_mutex>&
read_lock) {
3943 std::vector<QuerySessionStatus> ret;
3944 for (
auto& info : query_infos) {
3946 info.second.getExecutorId(),
3947 info.second.getQueryStr(),
3948 info.second.getQuerySubmittedTime(),
3949 info.second.getQueryStatus()));
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr *> &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
int32_t getIdOfString(const std::string &str) const
const std::string debug_dir_
bool is_agg(const Analyzer::Expr *expr)
ALWAYS_INLINE int64_t agg_sum_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
std::vector< Analyzer::Expr * > target_exprs
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr *> &target_exprs) const
std::map< size_t, std::vector< uint64_t > > get_table_id_to_frag_offsets(const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
static mapd_shared_mutex executor_session_mutex_
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
constexpr size_t kArenaBlockOverhead
int32_t getErrorCode() const
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
double g_running_query_interrupt_freq
bool g_enable_smem_group_by
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
void reduce(SpeculativeTopNMap &that)
float g_filter_push_down_low_frac
std::unordered_map< int, const Analyzer::BinOper * > getInnerTabIdToJoinCond() const
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
static QuerySessionMap queries_session_map_
static mapd_shared_mutex execute_mutex_
void agg_max_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
CurrentQueryStatus attachExecutorToQuerySession(std::shared_ptr< const query_state::QueryState > &query_state)
int64_t kernel_queue_time_ms_
ResultSetPtr reduceMultiDeviceResultSets(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
size_t g_constrained_by_in_threshold
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
int64_t compilation_queue_time_ms_
const ColumnDescriptor * getPhysicalColumnDescriptor(const Analyzer::ColumnVar *, int) const
double g_bump_allocator_step_reduction
std::string toString(const ExtArgumentType &sig_type)
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
size_t getSlotCount() const
DEVICE void sort(ARGS &&... args)
int get_column_id() const
const ChunkMetadataMap & getChunkMetadataMap() const
std::string getIR() const
void checkPendingQueryStatus(const QuerySessionId &query_session)
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
static const int32_t ERR_INTERRUPTED
class for a per-database catalog. also includes metadata for the current database and the current use...
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
void setEntryCount(const size_t val)
bool is_trivial_loop_join(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
bool g_strip_join_covered_quals
std::unique_ptr< llvm::Module > udf_gpu_module
const std::vector< uint64_t > & getFragOffsets()
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const QueryHint &query_hint)
size_t getSharedMemorySize() const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool g_enable_direct_columnarization
std::tuple< bool, int64_t, int64_t > get_hpt_overflow_underflow_safe_scaled_values(const int64_t chunk_min, const int64_t chunk_max, const SQLTypeInfo &lhs_type, const SQLTypeInfo &rhs_type)
void agg_max_double_skip_val(int64_t *agg, const double val, const double skip_val)
std::string get_table_name(const InputDescriptor &input_desc, const Catalog_Namespace::Catalog &cat)
void clearQuerySessionStatus(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, bool acquire_spin_lock)
ResultSetPtr executeTableFunction(const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
Compiles and dispatches a table function; that is, a function that takes as input one or more columns...
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Data_Namespace::DataMgr & getDataMgr() const
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
static const int max_gpu_count
GpuSharedMemoryContext gpu_smem_context
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
const std::optional< bool > union_all
Data_Namespace::DataMgr & getDataMgr() const
unsigned g_pending_query_interrupt_freq
int64_t float_to_double_bin(int32_t val, bool nullable=false)
static const size_t code_cache_size
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
static std::atomic_flag execute_spin_lock_
bool g_enable_columnar_output
void agg_min_skip_val(int64_t *agg, const int64_t val, const int64_t skip_val)
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
std::pair< QuerySessionId, std::string > CurrentQueryStatus
static mapd_shared_mutex executors_cache_mutex_
const std::list< Analyzer::OrderEntry > order_entries
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
unsigned gridSize() const
void clearMemory(const MemoryLevel memLevel)
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr *> &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t *>> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
std::string join_type_to_string(const JoinType type)
std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
TableGenerations computeTableGenerations(std::unordered_set< int > phys_table_ids)
std::vector< InputDescriptor > input_descs
const Catalog_Namespace::Catalog * getCatalog() const
const SortAlgorithm algorithm
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
OutVecOwner(const std::vector< int64_t *> &out_vec)
TypeR::rep timer_stop(Type clock_begin)
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
HOST DEVICE int get_size() const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
bool checkCurrentQuerySession(const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
size_t g_filter_push_down_passing_row_ubound
static const int32_t ERR_GEOS
std::ostream & operator<<(std::ostream &os, const RelAlgExecutionUnit &ra_exe_unit)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
AggregatedColRange agg_col_range_cache_
std::shared_ptr< ResultSet > ResultSetPtr
static void * gpu_active_modules_[max_gpu_count]
std::vector< FragmentInfo > fragments
std::unique_ptr< CgenState > cgen_state_
bool with_dynamic_watchdog
size_t maxGpuSlabSize() const
ExecutorOptLevel opt_level
bool g_enable_dynamic_watchdog
bool g_enable_experimental_string_functions
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool takes_float_argument(const TargetInfo &target_info)
unsigned g_trivial_loop_join_threshold
static uint32_t gpu_active_modules_device_mask_
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
bool isArchPascalOrLater(const ExecutorDeviceType dt) const
static void invalidateCaches()
int64_t getAggInitValForIndex(const size_t index) const
void buildSelectedFragsMappingForUnion(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
bool isSharedMemoryUsed() const
llvm::Value * castToIntPtrTyIn(llvm::Value *val, const size_t bit_width)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
int getDeviceCount() const
static std::mutex kernel_mutex_
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
Container for compilation results and assorted options for a single execution unit.
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::vector< QuerySessionStatus > getQuerySessionInfo(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
HOST DEVICE int get_scale() const
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
std::vector< FragmentsPerTable > FragmentsList
std::string toString() const
virtual ReductionCode codegen() const
static size_t literalBytes(const CgenState::LiteralValue &lit)
bool filter_on_deleted_column
mapd_shared_mutex & getSessionLock()
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
bool g_enable_overlaps_hashjoin
size_t getRowSize() const
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
int8_t get_min_byte_width()
size_t getNumBytesForFetchedRow(const std::set< int > &table_ids_to_fetch) const
bool g_inner_join_fragment_skipping
const Executor * getExecutor() const
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query's parse tree etc.
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
int64_t deviceCycles(int milliseconds) const
bool useCudaBuffers() const
static const size_t high_scan_limit
bool g_enable_smem_non_grouped_agg
ResultSetPtr get_merged_result(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
void run(Executor *executor, SharedKernelContext &shared_context)
CodeCache gpu_code_cache_
std::shared_ptr< ResultSet > asRows(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const QueryMemoryDescriptor &query_mem_desc, const Executor *executor, const size_t top_n, const bool desc) const
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
bool checkIsQuerySessionEnrolled(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
HOST DEVICE bool get_notnull() const
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
const ExecutorId executor_id_
std::map< QuerySessionId, bool > InterruptFlagMap
const size_t max_gpu_slab_size_
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool isPotentialInSituRender() const
static const int32_t ERR_DIV_BY_ZERO
static SysCatalog & instance()
const bool allow_multifrag
bool g_from_table_reordering
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Classes representing a parse tree.
void setGeneration(const uint32_t id, const uint64_t generation)
bool g_enable_hashjoin_many_to_many
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
FetchResult fetchUnionChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments *> &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
ExecutorType executor_type
void init(LogOptions const &log_opts)
std::mutex str_dict_mutex_
const Catalog_Namespace::Catalog * catalog_
#define INJECT_TIMER(DESC)
static size_t addAligned(const size_t off_in, const size_t alignment)
const bool with_dynamic_watchdog
void setQuerySessionAsInterrupted(const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
static const int32_t ERR_OUT_OF_RENDER_MEM
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
const JoinQualsPerNestingLevel join_quals
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
std::shared_timed_mutex mapd_shared_mutex
const std::string debug_file_
int deviceCount(const ExecutorDeviceType) const
const double gpu_input_mem_limit_percent
CachedCardinality getCachedCardinality(const std::string &cache_key)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
void agg_min_double_skip_val(int64_t *agg, const double val, const double skip_val)
void setCatalog(const Catalog_Namespace::Catalog *catalog)
friend class QueryMemoryDescriptor
ExecutorDeviceType getDeviceType() const
ExecutorExplainType explain_type
int64_t inline_null_val(const SQLTypeInfo &ti, const bool float_argument_input)
size_t g_big_group_threshold
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
static std::unordered_map< std::string, size_t > cardinality_cache_
float g_filter_push_down_high_frac
static InterruptFlagMap queries_interrupt_flag_
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
std::unique_ptr< PlanState > plan_state_
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
static const int32_t ERR_OUT_OF_TIME
std::vector< int64_t * > out_vec_
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::chrono::time_point< std::chrono::system_clock > submitted, mapd_unique_lock< mapd_shared_mutex > &write_lock)
void agg_min_float_skip_val(int32_t *agg, const float val, const float skip_val)
llvm::Value * castToFP(llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
std::pair< bool, size_t > CachedCardinality
size_t g_max_memory_allocation_size
bool is_distinct_target(const TargetInfo &target_info)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t g_approx_quantile_buffer
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
void agg_max_float_skip_val(int32_t *agg, const float val, const float skip_val)
const unsigned block_size_x_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const unsigned grid_size_x_
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
quantile::TDigest * newTDigest()
specifies the content in-memory of a row in the column metadata table
size_t g_overlaps_max_table_size_bytes
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
const std::shared_ptr< Analyzer::Estimator > estimator
static std::map< int, std::shared_ptr< Executor > > executors_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
static const int32_t ERR_OUT_OF_GPU_MEM
const TemporaryTables * getTemporaryTables()
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
ExpressionRange getColRange(const PhysicalInput &) const
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
ExecutorDeviceType device_type
bool g_enable_window_functions
void enrollQuerySession(const QuerySessionId &query_session, const std::string &query_str, const std::chrono::time_point< std::chrono::system_clock > submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
int8_t groupColWidth(const size_t key_idx) const
std::string sort_algorithm_to_string(const SortAlgorithm algorithm)
std::vector< size_t > getTableFragmentIndices(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments *> &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition)
const std::vector< size_t > outer_fragment_indices
size_t g_min_memory_allocation_size
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
#define REGULAR_DICT(TRANSIENTID)
static QuerySessionId current_query_session_
static llvm::ConstantInt * codegenIntConst(const Analyzer::Constant *constant, CgenState *cgen_state)
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
InputTableInfoCache input_table_info_cache_
Speculative top N algorithm.
void setGeneration(const uint32_t id, const TableGeneration &generation)
void launchKernels(SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
bool g_enable_smem_grouped_non_count_agg
CodeCache cpu_code_cache_
HOST DEVICE int get_dimension() const
bool checkIsQuerySessionInterrupted(const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
std::pair< bool, int64_t > skipFragment(const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
std::vector< std::string > expr_container_to_string(const std::list< Analyzer::OrderEntry > &expr_container)
std::map< const QuerySessionId, std::map< std::string, QuerySessionStatus > > QuerySessionMap
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
StringDictionaryGenerations computeStringDictionaryGenerations(const std::unordered_set< PhysicalInput > &phys_inputs)
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
void updateQuerySessionStatus(std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
std::vector< std::vector< const int8_t * > > col_buffers
unsigned numBlocksPerMP() const
std::pair< bool, int64_t > skipFragmentInnerJoins(const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
TableGenerations table_generations_
int64_t extract_min_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t getDeviceBasedScanLimit(const ExecutorDeviceType device_type, const int device_count)
bool g_enable_filter_push_down
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool g_use_estimator_result_cache
bool g_enable_bump_allocator
size_t getRunningExecutorId(mapd_shared_lock< mapd_shared_mutex > &read_lock)
std::string QuerySessionId
void addToCardinalityCache(const std::string &cache_key, const size_t cache_value)
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
static std::atomic< bool > interrupted_
#define DEBUG_TIMER(name)
void agg_sum_double_skip_val(int64_t *agg, const double val, const double skip_val)
HOST DEVICE SQLTypes get_type() const
int32_t getOrAddTransient(const std::string &str)
constexpr int64_t get_timestamp_precision_scale(const int32_t dimen)
static const int32_t ERR_OUT_OF_SLOTS
uint64_t exp_to_scale(const unsigned exp)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
static std::mutex compilation_mutex_
std::vector< std::vector< int64_t > > num_rows
size_t g_approx_quantile_centroids
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
void setColRange(const PhysicalInput &, const ExpressionRange &)
static size_t running_query_executor_id_
DEVICE void swap(ARGS &&... args)
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
static bool typeSupportsRange(const SQLTypeInfo &ti)
size_t getColOffInBytes(const size_t col_idx) const
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments *> &all_tables_fragments)
std::shared_ptr< const query_state::QueryState > query_state
ReductionCode get_reduction_code(std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device, int64_t *compilation_queue_time)
bool register_intel_jit_listener
ResultSetPtr reduce_estimator_results(const RelAlgExecutionUnit &ra_exe_unit, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &results_per_device)
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 64, 64, boost::multiprecision::signed_magnitude, boost::multiprecision::checked, void > > checked_int64_t
int8_t * getUnderlyingBuffer() const
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
mapd_unique_lock< mapd_shared_mutex > write_lock
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
size_t getKeyCount() const
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
std::vector< std::vector< uint64_t > > frag_offsets
const TableGeneration & getTableGeneration(const int table_id) const
const bool allow_runtime_query_interrupt
const std::vector< DeviceProperties > & getAllDeviceProperties() const
std::unique_ptr< ResultSet > estimator_result_set_
static size_t align(const size_t off_in, const size_t alignment)
const ColumnDescriptor * getColumnDescriptor(const Analyzer::ColumnVar *) const
bool skipFragmentPair(const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper *> &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
unsigned g_dynamic_watchdog_time_limit
QueryDescriptionType getQueryDescriptionType() const
DEVICE auto accumulate(ARGS &&... args)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
void fill_entries_for_empty_input(std::vector< TargetInfo > &target_infos, std::vector< int64_t > &entry, const std::vector< Analyzer::Expr *> &target_exprs, const QueryMemoryDescriptor &query_mem_desc)
Descriptor for the fragments required for an execution kernel.
void agg_sum_float_skip_val(int32_t *agg, const float val, const float skip_val)
static size_t getArenaBlockSize()
ResultSetPtr executeWorkUnit(size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
ExpressionRange getColRange(const PhysicalInput &) const
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
size_t getEntryCount() const
bool g_enable_runtime_query_interrupt
static mapd_shared_mutex recycler_mutex_
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
Functions to support array operations used by the executor.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
int64_t extract_max_stat(const ChunkStats &stats, const SQLTypeInfo &ti)
static std::mutex gpu_active_modules_mutex_
void setupCaching(const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
void clearMetaInfoCache()
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
bool g_enable_table_functions
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
const TableGeneration & getGeneration(const uint32_t id) const
const TemporaryTables * temporary_tables_
unsigned blockSize() const
static const ExecutorId UNITARY_EXECUTOR_ID
size_t g_gpu_smem_threshold
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
const Expr * get_left_operand() const
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)