46 #include <boost/algorithm/cxx11/any_of.hpp>
47 #include <boost/range/adaptor/reversed.hpp>
69 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
70 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
71 return ((compound && compound->isAggregate()) || aggregate);
78 std::unordered_set<PhysicalInput> phys_inputs2;
79 for (
auto& phi : phys_inputs) {
88 std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
89 parallelism_hints_per_table;
91 int table_id = physical_input.table_id;
94 !table->is_in_memory_system_table) {
98 for (
const auto& fragment :
99 foreign_table->fragmenter->getFragmentsForQuery().fragments) {
102 catalog.
getDatabaseId(), table_id, col_id, fragment.fragmentId};
111 if (!chunk.isChunkOnDevice(
113 parallelism_hints_per_table[{catalog.
getDatabaseId(), table_id}].insert(
115 fragment.fragmentId});
120 if (!parallelism_hints_per_table.empty()) {
121 auto foreign_storage_mgr =
123 CHECK(foreign_storage_mgr);
124 foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
152 std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
154 int table_id = physical_input.table_id;
156 if (table && table->is_in_memory_system_table) {
158 system_table_columns_by_table_id[table_id].emplace_back(column_id);
162 if (!system_table_columns_by_table_id.empty() &&
167 for (
const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
181 CHECK(td->fragmenter);
182 auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
183 CHECK_LE(fragment_count, static_cast<size_t>(1))
184 <<
"In-memory system tables are expected to have a single fragment.";
185 if (fragment_count > 0) {
186 for (
auto column_id : column_ids) {
206 std::list<Analyzer::OrderEntry>
result;
209 result.emplace_back(sort_field.getField() + 1,
217 const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
218 const std::vector<TargetMetaInfo>& targets_meta) {
219 CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
221 for (
size_t i = 0; i < targets_meta.size(); ++i) {
222 render_info.
targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
223 targets_meta[i].get_resname(),
224 work_unit_target_exprs[i]->get_shared_ptr(),
237 return {left_deep_join_tree->
getId()};
242 const std::vector<unsigned>& aggregate,
243 const std::vector<unsigned>& next_result)
const override {
245 std::copy(next_result.begin(), next_result.end(), std::back_inserter(
result));
255 const size_t text_encoding_casts)
256 : text_decoding_casts(text_decoding_casts)
257 , text_encoding_casts(text_encoding_casts) {}
263 : default_disregard_casts_to_none_encoding_(
264 default_disregard_casts_to_none_encoding) {}
271 const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
272 result = aggregateResult(result, visit(u_oper->
get_operand()));
278 if (!operand_ti.is_string() || !casted_ti.is_string()) {
285 if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
288 if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
289 if (!disregard_casts_to_none_encoding) {
310 const auto prev_disregard_casts_to_none_encoding_state =
311 disregard_casts_to_none_encoding_;
312 const auto left_u_oper =
314 if (left_u_oper && left_u_oper->get_optype() ==
kCAST) {
315 disregard_casts_to_none_encoding_ =
false;
316 result = aggregateResult(result, visitUOper(left_u_oper));
318 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
322 const auto right_u_oper =
324 if (right_u_oper && right_u_oper->get_optype() ==
kCAST) {
325 disregard_casts_to_none_encoding_ =
false;
326 result = aggregateResult(result, visitUOper(right_u_oper));
328 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
331 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
342 const auto prev_disregard_casts_to_none_encoding_state =
343 disregard_casts_to_none_encoding_;
344 if (u_oper && u_oper->get_optype() ==
kCAST) {
345 disregard_casts_to_none_encoding_ =
true;
346 result = aggregateResult(result, visitUOper(u_oper));
347 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
349 result = aggregateResult(result, visit(like->
get_arg()));
351 result = aggregateResult(result, visit(like->
get_like_expr()));
368 disregard_casts_to_none_encoding_ = default_disregard_casts_to_none_encoding_;
376 mutable bool disregard_casts_to_none_encoding_ =
false;
383 auto check_node_for_text_casts = [&cast_counts](
385 const bool disregard_casts_to_none_encoding) {
390 const auto this_node_cast_counts = visitor.
visit(expr);
395 for (
const auto& qual : ra_exe_unit.
quals) {
396 check_node_for_text_casts(qual.get(),
false);
398 for (
const auto& simple_qual : ra_exe_unit.
simple_quals) {
399 check_node_for_text_casts(simple_qual.get(),
false);
402 check_node_for_text_casts(groupby_expr.get(),
false);
404 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
405 check_node_for_text_casts(target_expr,
false);
407 for (
const auto& join_condition : ra_exe_unit.
join_quals) {
408 for (
const auto& join_qual : join_condition.quals) {
414 check_node_for_text_casts(join_qual.get(),
422 const std::vector<InputTableInfo>& query_infos,
427 auto const tuples_upper_bound =
431 [](
auto max,
auto const& query_info) {
432 return std::max(max, query_info.info.getNumTuples());
439 const bool has_text_casts =
440 text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
442 if (!has_text_casts) {
445 std::ostringstream oss;
446 oss <<
"Query requires one or more casts between none-encoded and dictionary-encoded "
447 <<
"strings, and the estimated table size (" << tuples_upper_bound <<
" rows) "
448 <<
"exceeds the configured watchdog none-encoded string translation limit of "
450 throw std::runtime_error(oss.str());
461 !query_for_partial_outer_frag &&
480 auto lock =
executor_->acquireExecuteMutex();
491 CHECK(!ed_seq.empty());
492 if (ed_seq.size() > 1) {
501 auto exec_desc_ptr = ed_seq.getDescriptor(0);
502 CHECK(exec_desc_ptr);
503 auto& exec_desc = *exec_desc_ptr;
504 const auto body = exec_desc.getBody();
509 const auto project =
dynamic_cast<const RelProject*
>(body);
518 const auto compound =
dynamic_cast<const RelCompound*
>(body);
520 if (compound->isDeleteViaSelect()) {
522 }
else if (compound->isUpdateViaSelect()) {
525 if (compound->isAggregate()) {
529 const auto work_unit =
542 const bool just_explain_plan,
546 << static_cast<int>(
query_dag_->getBuildState());
552 auto execution_result =
555 constexpr
bool vlog_result_set_summary{
false};
556 if constexpr (vlog_result_set_summary) {
557 VLOG(1) << execution_result.getRows()->summaryToString();
561 VLOG(1) <<
"Running post execution callback.";
562 (*post_execution_callback_)();
564 return execution_result;
574 LOG(
INFO) <<
"Query unable to run in GPU mode, retrying on CPU";
585 const bool just_explain_plan,
589 auto timer_setup =
DEBUG_TIMER(
"Query pre-execution steps");
599 std::string query_session{
""};
600 std::string query_str{
"N/A"};
601 std::string query_submitted_time{
""};
604 query_session =
query_state_->getConstSessionInfo()->get_session_id();
606 query_submitted_time =
query_state_->getQuerySubmittedTime();
609 auto validate_or_explain_query =
611 auto interruptable = !render_info && !query_session.empty() &&
618 std::tie(query_session, query_str) =
executor_->attachExecutorToQuerySession(
619 query_session, query_str, query_submitted_time);
625 query_submitted_time,
626 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
631 [
this, &query_session, &interruptable, &query_submitted_time] {
635 executor_->clearQuerySessionStatus(query_session, query_submitted_time);
639 auto acquire_execute_mutex = [](Executor * executor) ->
auto {
640 auto ret = executor->acquireExecuteMutex();
645 auto lock = acquire_execute_mutex(
executor_);
654 executor_->checkPendingQueryStatus(query_session);
657 throw std::runtime_error(
"Query execution has been interrupted (pending query)");
661 throw std::runtime_error(
"Checking pending query status failed: unknown error");
664 int64_t queue_time_ms =
timer_stop(clock_begin);
677 if (just_explain_plan) {
678 std::stringstream ss;
679 std::vector<const RelAlgNode*> nodes;
680 for (
size_t i = 0; i < ed_seq.size(); i++) {
681 nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
683 size_t ctr_node_id_in_plan = nodes.size();
687 auto node_id_in_plan_tree = ctr_node_id_in_plan--;
688 body->setIdInPlanTree(node_id_in_plan_tree);
690 size_t ctr = nodes.size();
695 const auto index = ctr--;
696 const auto tabs = std::string(tab_ctr++,
'\t');
698 ss << tabs <<
std::to_string(index) <<
" : " << body->toString(config) <<
"\n";
699 if (
auto sort = dynamic_cast<const RelSort*>(body)) {
700 ss << tabs <<
" : " <<
sort->getInput(0)->toString(config) <<
"\n";
702 if (dynamic_cast<const RelProject*>(body) ||
703 dynamic_cast<const RelCompound*>(body)) {
704 if (
auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
705 ss << tabs <<
" : " <<
join->toString(config) <<
"\n";
710 if (!subqueries.empty()) {
713 for (
const auto& subquery : subqueries) {
714 const auto ra = subquery->getRelAlg();
715 ss <<
"\t" << ra->toString(config) <<
"\n";
718 auto rs = std::make_shared<ResultSet>(ss.str());
726 ed_seq, co, eo, render_info, queue_time_ms);
732 const auto subquery_ra = subquery->getRelAlg();
734 if (subquery_ra->hasContextData()) {
741 subquery->setExecutionResult(std::make_shared<ExecutionResult>(
result));
749 return executor_->computeColRangesCache(phys_inputs);
754 return executor_->computeStringDictionaryGenerations(phys_inputs);
759 return executor_->computeTableGenerations(phys_table_ids);
771 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
773 auto sort_node =
dynamic_cast<const RelSort*
>(root_node);
779 RelLeftDeepTreeIdsCollector visitor;
780 auto left_deep_tree_ids = visitor.visit(root_node);
788 const auto source = sort->
getInput(0);
789 if (dynamic_cast<const RelSort*>(source)) {
790 throw std::runtime_error(
"Sort node not supported as input to another sort");
798 const size_t step_idx,
806 const auto sort =
dynamic_cast<const RelSort*
>(exe_desc_ptr->getBody());
808 size_t shard_count{0};
818 source_work_unit.exe_unit, *
executor_->getCatalog());
822 const auto source =
sort->getInput(0);
825 CHECK_EQ(temp_seq.size(), size_t(1));
856 std::make_pair(step_idx, step_idx + 1),
861 merge_type(exe_desc_ptr->getBody()),
862 exe_desc_ptr->getBody()->getId(),
865 VLOG(1) <<
"Running post execution callback.";
866 (*post_execution_callback_)();
883 executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
884 executor_->table_generations_ = table_generations;
885 executor_->agg_col_range_cache_ = agg_col_range;
892 const int64_t queue_time_ms,
893 const bool with_existing_temp_tables) {
896 if (!with_existing_temp_tables) {
907 auto get_descriptor_count = [&seq, &eo]() ->
size_t {
922 const auto exec_desc_count = get_descriptor_count();
936 const auto cached_res =
937 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
943 const auto num_steps = exec_desc_count - 1;
944 for (
size_t i = 0; i < exec_desc_count; i++) {
945 VLOG(1) <<
"Executing query step " << i <<
" / " << num_steps;
948 seq, i, co, eo_copied, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
957 LOG(
INFO) <<
"Retrying current query step " << i <<
" / " << num_steps <<
" on CPU";
959 if (render_info && i == num_steps) {
967 (i == num_steps) ? render_info :
nullptr,
973 auto eo_extern = eo_copied;
974 eo_extern.executor_type = ::ExecutorType::Extern;
976 const auto body = exec_desc_ptr->
getBody();
977 const auto compound =
dynamic_cast<const RelCompound*
>(body);
978 if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979 LOG(
INFO) <<
"Also failed to run the query using interoperability";
983 seq, i, co, eo_extern, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
992 const std::pair<size_t, size_t> interval,
996 const int64_t queue_time_ms) {
1002 for (
size_t i = interval.first; i < interval.second; i++) {
1009 (i == interval.second - 1) ? render_info :
nullptr,
1019 LOG(
INFO) <<
"Retrying current query step " << i <<
" on CPU";
1021 if (render_info && i == interval.second - 1) {
1028 (i == interval.second - 1) ? render_info :
nullptr,
1037 const size_t step_idx,
1041 const int64_t queue_time_ms) {
1045 CHECK(exec_desc_ptr);
1046 auto& exec_desc = *exec_desc_ptr;
1047 const auto body = exec_desc.getBody();
1048 if (body->isNop()) {
1073 auto handle_hint = [co,
1076 this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1079 auto target_node = body;
1080 if (
auto sort_body = dynamic_cast<const RelSort*>(body)) {
1081 target_node = sort_body->getInput(0);
1084 auto columnar_output_hint_enabled =
false;
1085 auto rowwise_output_hint_enabled =
false;
1088 VLOG(1) <<
"A user forces to run the query on the CPU execution mode";
1093 VLOG(1) <<
"A user enables keeping query resultset but is skipped since data "
1094 "recycler is disabled";
1097 VLOG(1) <<
"A user enables keeping query resultset but is skipped since query "
1098 "resultset recycler is disabled";
1100 VLOG(1) <<
"A user enables keeping query resultset";
1107 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1108 "since data recycler is disabled";
1111 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1112 "since query resultset recycler is disabled";
1114 VLOG(1) <<
"A user enables keeping table function's resultset";
1119 VLOG(1) <<
"A user forces the query to run with columnar output";
1120 columnar_output_hint_enabled =
true;
1122 VLOG(1) <<
"A user forces the query to run with rowwise output";
1123 rowwise_output_hint_enabled =
true;
1126 auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1127 ? !rowwise_output_hint_enabled
1128 : columnar_output_hint_enabled;
1129 if (
g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1130 LOG(
INFO) <<
"Currently, we do not support applying query hint to change query "
1131 "output layout in distributed mode.";
1134 return std::make_pair(co_hint_applied, eo_hint_applied);
1137 auto hint_applied = handle_hint();
1141 if (
auto cached_resultset =
1142 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1143 body->getQueryPlanDagHash())) {
1144 VLOG(1) <<
"recycle resultset of the root node " << body->getRelNodeDagId()
1145 <<
" from resultset cache";
1146 body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1148 std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1149 executor_->getRecultSetRecyclerHolder().getTargetExprs(
1150 body->getQueryPlanDagHash());
1151 std::vector<Analyzer::Expr*> copied_target_exprs;
1152 for (
const auto& expr : cached_target_exprs) {
1153 copied_target_exprs.push_back(expr.get());
1156 *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1158 exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1164 const auto compound =
dynamic_cast<const RelCompound*
>(body);
1166 if (compound->isDeleteViaSelect()) {
1167 executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1168 }
else if (compound->isUpdateViaSelect()) {
1169 executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1172 compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1173 VLOG(3) <<
"Returned from executeCompound(), addTemporaryTable("
1174 <<
static_cast<int>(-compound->getId()) <<
", ...)"
1175 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1176 << exec_desc.getResult().getDataPtr()->rowCount();
1177 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1184 const auto project =
dynamic_cast<const RelProject*
>(body);
1186 if (project->isDeleteViaSelect()) {
1187 executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1188 }
else if (project->isUpdateViaSelect()) {
1189 executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1191 std::optional<size_t> prev_count;
1197 if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1202 const auto& prev_exe_result = prev_exec_desc->getResult();
1203 const auto prev_result = prev_exe_result.getRows();
1205 prev_count = prev_result->rowCount();
1206 VLOG(3) <<
"Setting output row count for projection node to previous node ("
1207 << prev_exec_desc->getBody()->toString(
1209 <<
") to " << *prev_count;
1216 hint_applied.second,
1220 VLOG(3) <<
"Returned from executeProject(), addTemporaryTable("
1221 <<
static_cast<int>(-project->getId()) <<
", ...)"
1222 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1223 << exec_desc.getResult().getDataPtr()->rowCount();
1224 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1231 const auto aggregate =
dynamic_cast<const RelAggregate*
>(body);
1234 aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1238 const auto filter =
dynamic_cast<const RelFilter*
>(body);
1241 filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1245 const auto sort =
dynamic_cast<const RelSort*
>(body);
1248 sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1249 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1256 if (logical_values) {
1258 addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1261 const auto modify =
dynamic_cast<const RelModify*
>(body);
1263 exec_desc.setResult(
executeModify(modify, hint_applied.second));
1266 const auto logical_union =
dynamic_cast<const RelLogicalUnion*
>(body);
1267 if (logical_union) {
1271 hint_applied.second,
1280 table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1284 LOG(
FATAL) <<
"Unhandled body type: "
1291 CHECK(dynamic_cast<const RelAggregate*>(body));
1292 CHECK_EQ(
size_t(1), body->inputCount());
1293 const auto input = body->getInput(0);
1294 body->setOutputMetainfo(input->getOutputMetainfo());
1300 ed.setResult({it->second, input->getOutputMetainfo()});
1310 return synthesized_physical_inputs_owned;
1314 const RexInput* rex_input)
const override {
1317 const auto scan_ra =
dynamic_cast<const RelScan*
>(input_ra);
1321 const auto col_id = rex_input->
getIndex();
1323 if (cd && cd->columnType.get_physical_cols() > 0) {
1325 std::unordered_set<const RexInput*> synthesized_physical_inputs;
1326 for (
auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1327 auto physical_input =
1329 synthesized_physical_inputs_owned.emplace_back(physical_input);
1330 synthesized_physical_inputs.insert(physical_input);
1332 return synthesized_physical_inputs;
1341 const std::unordered_set<const RexInput*>& aggregate,
1342 const std::unordered_set<const RexInput*>& next_result)
const override {
1344 result.insert(next_result.begin(), next_result.end());
1354 if (
auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1357 if (
auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1361 if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1364 auto only_src = ra_node->
getInput(0);
1365 const bool is_join =
dynamic_cast<const RelJoin*
>(only_src) ||
1366 dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1367 return is_join ? only_src : ra_node;
1370 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1374 std::unordered_set<const RexInput*> used_inputs =
1375 filter_expr ? visitor.
visit(filter_expr) : std::unordered_set<const RexInput*>{};
1377 for (
size_t i = 0; i < sources_size; ++i) {
1379 used_inputs.insert(source_inputs.begin(), source_inputs.end());
1381 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1382 return std::make_pair(used_inputs, used_inputs_owned);
1385 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1388 std::unordered_set<const RexInput*> used_inputs;
1389 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1390 const auto source = aggregate->
getInput(0);
1393 CHECK_GE(in_metainfo.size(), group_count);
1394 for (
size_t i = 0; i < group_count; ++i) {
1395 auto synthesized_used_input =
new RexInput(source, i);
1396 used_inputs_owned.emplace_back(synthesized_used_input);
1397 used_inputs.insert(synthesized_used_input);
1399 for (
const auto& agg_expr : aggregate->
getAggExprs()) {
1400 for (
size_t i = 0; i < agg_expr->size(); ++i) {
1401 const auto operand_idx = agg_expr->getOperand(i);
1402 CHECK_GE(in_metainfo.size(),
static_cast<size_t>(operand_idx));
1403 auto synthesized_used_input =
new RexInput(source, operand_idx);
1404 used_inputs_owned.emplace_back(synthesized_used_input);
1405 used_inputs.insert(synthesized_used_input);
1408 return std::make_pair(used_inputs, used_inputs_owned);
1411 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1414 std::unordered_set<const RexInput*> used_inputs;
1415 for (
size_t i = 0; i < project->
size(); ++i) {
1417 used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1419 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1420 return std::make_pair(used_inputs, used_inputs_owned);
1423 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1427 std::unordered_set<const RexInput*> used_inputs;
1430 used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1432 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1433 return std::make_pair(used_inputs, used_inputs_owned);
1436 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1438 std::unordered_set<const RexInput*> used_inputs;
1439 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1441 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1442 const auto source = data_sink_node->getInput(nest_level);
1443 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
1445 CHECK(source->getOutputMetainfo().empty());
1446 for (
size_t i = 0; i < scan_source->size(); ++i) {
1447 auto synthesized_used_input =
new RexInput(scan_source, i);
1448 used_inputs_owned.emplace_back(synthesized_used_input);
1449 used_inputs.insert(synthesized_used_input);
1452 const auto& partial_in_metadata = source->getOutputMetainfo();
1453 for (
size_t i = 0; i < partial_in_metadata.size(); ++i) {
1454 auto synthesized_used_input =
new RexInput(source, i);
1455 used_inputs_owned.emplace_back(synthesized_used_input);
1456 used_inputs.insert(synthesized_used_input);
1460 return std::make_pair(used_inputs, used_inputs_owned);
1463 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1465 std::unordered_set<const RexInput*> used_inputs(logical_union->
inputCount());
1466 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1467 used_inputs_owned.reserve(logical_union->
inputCount());
1468 VLOG(3) <<
"logical_union->inputCount()=" << logical_union->
inputCount();
1469 auto const n_inputs = logical_union->
inputCount();
1470 for (
size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1471 auto input = logical_union->
getInput(nest_level);
1472 for (
size_t i = 0; i < input->size(); ++i) {
1473 used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1474 used_inputs.insert(used_inputs_owned.back().get());
1477 return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1481 const auto scan_ra =
dynamic_cast<const RelScan*
>(ra_node);
1487 return -ra_node->
getId();
1492 const std::vector<size_t>& input_permutation) {
1494 std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1495 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1496 const auto input_node_idx =
1497 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1498 const auto input_ra = data_sink_node->getInput(input_node_idx);
1502 size_t const idx =
dynamic_cast<const RelLogicalUnion*
>(ra_node) ? 0 : input_idx;
1503 const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1504 CHECK(it_ok.second);
1507 <<
" to nest level " << input_idx;
1509 return input_to_nest_level;
1512 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1516 if (
auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1518 const auto condition =
join->getCondition();
1520 auto condition_inputs = visitor.
visit(condition);
1521 std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1523 return std::make_pair(condition_inputs, condition_inputs_owned);
1526 if (
auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1527 CHECK_GE(left_deep_join->inputCount(), 2u);
1528 const auto condition = left_deep_join->getInnerCondition();
1531 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1533 const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1534 if (outer_condition) {
1535 const auto outer_result = visitor.
visit(outer_condition);
1536 result.insert(outer_result.begin(), outer_result.end());
1539 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1540 return std::make_pair(
result, used_inputs_owned);
1543 if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1546 }
else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1554 return std::make_pair(std::unordered_set<const RexInput*>{},
1555 std::vector<std::shared_ptr<RexInput>>{});
1559 std::vector<InputDescriptor>& input_descs,
1561 std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1563 const std::unordered_set<const RexInput*>& source_used_inputs,
1564 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1566 <<
" input_col_descs_unique.size()=" << input_col_descs_unique.size()
1567 <<
" source_used_inputs.size()=" << source_used_inputs.size();
1568 for (
const auto used_input : source_used_inputs) {
1569 const auto input_ra = used_input->getSourceNode();
1571 const auto col_id = used_input->getIndex();
1572 auto it = input_to_nest_level.find(input_ra);
1573 if (it != input_to_nest_level.end()) {
1574 const int input_desc = it->second;
1575 input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1576 dynamic_cast<const RelScan*>(input_ra)
1581 }
else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1582 throw std::runtime_error(
"Bushy joins not supported");
1588 std::pair<std::vector<InputDescriptor>,
1589 std::list<std::shared_ptr<const InputColDescriptor>>>
1591 const std::unordered_set<const RexInput*>& used_inputs,
1592 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1593 const std::vector<size_t>& input_permutation,
1595 std::vector<InputDescriptor> input_descs;
1597 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1598 const auto input_node_idx =
1599 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1600 auto input_ra = data_sink_node->getInput(input_node_idx);
1602 input_descs.emplace_back(table_id, input_idx);
1604 std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1607 input_col_descs_unique,
1610 input_to_nest_level);
1611 std::unordered_set<const RexInput*> join_source_used_inputs;
1612 std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1613 std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1617 input_col_descs_unique,
1619 join_source_used_inputs,
1620 input_to_nest_level);
1621 std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1622 input_col_descs_unique.begin(), input_col_descs_unique.end());
1625 input_col_descs.end(),
1626 [](std::shared_ptr<const InputColDescriptor>
const& lhs,
1627 std::shared_ptr<const InputColDescriptor>
const& rhs) {
1628 return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1630 lhs->getScanDesc().getTableId()) <
1631 std::make_tuple(rhs->getScanDesc().getNestLevel(),
1633 rhs->getScanDesc().getTableId());
1635 return {input_descs,
1636 std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1637 input_col_descs.end())};
1641 std::tuple<std::vector<InputDescriptor>,
1642 std::list<std::shared_ptr<const InputColDescriptor>>,
1643 std::vector<std::shared_ptr<RexInput>>>
1645 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1646 const std::vector<size_t>& input_permutation,
1648 std::unordered_set<const RexInput*> used_inputs;
1649 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1650 std::tie(used_inputs, used_inputs_owned) =
get_used_inputs(ra_node, cat);
1651 VLOG(3) <<
"used_inputs.size() = " << used_inputs.size();
1653 ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1654 return std::make_tuple(
1655 input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1663 return project->
size();
1683 const std::shared_ptr<Analyzer::Expr> expr) {
1684 const auto& ti = expr->get_type_info();
1688 auto transient_dict_ti = ti;
1691 transient_dict_ti.set_fixed_size();
1692 return expr->add_cast(transient_dict_ti);
1696 std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1697 const std::shared_ptr<Analyzer::Expr>& expr) {
1701 scalar_sources.push_back(
fold_expr(expr.get()));
1706 const std::shared_ptr<Analyzer::Expr>& input) {
1707 const auto& input_ti = input->get_type_info();
1708 if (input_ti.is_string() && input_ti.get_compression() ==
kENCODING_DICT) {
1719 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1721 VLOG(3) <<
"get_scalar_sources_size("
1723 <<
") = " << scalar_sources_size;
1724 for (
size_t i = 0; i < scalar_sources_size; ++i) {
1725 const auto scalar_rex =
scalar_at(i, ra_node);
1726 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1732 const auto scalar_expr =
1734 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1738 scalar_sources.push_back(
fold_expr(rewritten_expr.get()));
1744 return scalar_sources;
1754 size_t starting_projection_column_idx) {
1755 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1757 const auto scalar_rex =
scalar_at(i, ra_node);
1758 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1764 std::shared_ptr<Analyzer::Expr> translated_expr;
1769 colNames[i - starting_projection_column_idx]);
1774 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1778 return scalar_sources;
1783 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1787 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1788 for (
size_t group_idx = 0; group_idx < compound->
getGroupByCount(); ++group_idx) {
1791 return groupby_exprs;
1796 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1797 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1798 for (
size_t group_idx = 0; group_idx < aggregate->
getGroupByCount(); ++group_idx) {
1801 return groupby_exprs;
1807 const auto filter_expr =
1819 if (agg_expr->get_is_distinct()) {
1820 SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1822 target_expr = target_expr->deep_copy();
1831 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1832 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1833 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1837 std::vector<Analyzer::Expr*> target_exprs;
1838 for (
size_t i = 0; i < compound->
size(); ++i) {
1840 const auto target_rex_agg =
dynamic_cast<const RexAgg*
>(target_rex);
1841 std::shared_ptr<Analyzer::Expr> target_expr;
1842 if (target_rex_agg) {
1847 const auto target_rex_scalar =
dynamic_cast<const RexScalar*
>(target_rex);
1848 const auto target_rex_ref =
dynamic_cast<const RexRef*
>(target_rex_scalar);
1849 if (target_rex_ref) {
1850 const auto ref_idx = target_rex_ref->
getIndex();
1852 CHECK_LE(ref_idx, groupby_exprs.size());
1853 const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1858 target_expr =
fold_expr(rewritten_expr.get());
1871 target_exprs_owned.push_back(target_expr);
1872 target_exprs.push_back(target_expr.get());
1874 return target_exprs;
1878 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1879 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1880 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1883 std::vector<Analyzer::Expr*> target_exprs;
1884 size_t group_key_idx = 1;
1885 for (
const auto& groupby_expr : groupby_exprs) {
1888 target_exprs_owned.push_back(target_expr);
1889 target_exprs.push_back(target_expr.get());
1892 for (
const auto& target_rex_agg : aggregate->
getAggExprs()) {
1896 target_expr =
fold_expr(target_expr.get());
1897 target_exprs_owned.push_back(target_expr);
1898 target_exprs.push_back(target_expr.get());
1900 return target_exprs;
1910 if (agg_expr && agg_expr->get_contains_agg()) {
1923 }
else if (
is_agg(&expr)) {
1932 const std::vector<Analyzer::Expr*>& target_exprs) {
1933 std::vector<TargetMetaInfo> targets_meta;
1934 CHECK_EQ(ra_node->size(), target_exprs.size());
1935 for (
size_t i = 0; i < ra_node->size(); ++i) {
1936 CHECK(target_exprs[i]);
1938 targets_meta.emplace_back(ra_node->getFieldName(i),
1940 target_exprs[i]->get_type_info());
1942 return targets_meta;
1948 const std::vector<Analyzer::Expr*>& target_exprs) {
1950 if (
auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1952 }
else if (
auto const* input = dynamic_cast<RelProject const*>(input0)) {
1954 }
else if (
auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1956 }
else if (
auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1958 }
else if (
auto const* input = dynamic_cast<RelScan const*>(input0)) {
1971 const int64_t queue_time_ms) {
1979 auto execute_update_for_node = [
this, &co, &eo_in](
const auto node,
1981 const bool is_aggregate) {
1982 auto table_descriptor = node->getModifiedTableDescriptor();
1983 CHECK(table_descriptor);
1984 if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1985 throw std::runtime_error(
1986 "UPDATE queries involving variable length columns are only supported on tables "
1987 "with the vacuum attribute set to 'delayed'");
1992 auto updated_table_desc = node->getModifiedTableDescriptor();
1994 std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1995 node->getTargetColumns(),
1996 node->getOutputMetainfo(),
1997 node->isVarlenUpdateRequired());
2001 auto execute_update_ra_exe_unit =
2002 [
this, &co, &eo_in, &table_infos, &updated_table_desc](
2016 CHECK(update_transaction_parameters);
2019 auto table_update_metadata =
2033 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2042 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2046 auto update_transaction_params =
2048 CHECK(update_transaction_params);
2049 const auto td = update_transaction_params->getTableDescriptor();
2051 const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2052 if (update_column_names.size() > 1) {
2053 throw std::runtime_error(
2054 "Multi-column update is not yet supported for temporary tables.");
2059 auto projected_column_to_update =
2060 makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
2061 const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2062 work_unit.exe_unit, projected_column_to_update);
2063 if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2064 throw std::runtime_error(
2065 "Variable length updates not yet supported on temporary tables.");
2067 execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2069 execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2073 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2077 execute_update_for_node(compound, work_unit, compound->isAggregate());
2078 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2082 if (project->isSimple()) {
2083 CHECK_EQ(
size_t(1), project->inputCount());
2084 const auto input_ra = project->getInput(0);
2085 if (dynamic_cast<const RelSort*>(input_ra)) {
2086 const auto& input_table =
2089 work_unit.exe_unit.scan_limit = input_table->rowCount();
2093 execute_update_for_node(project, work_unit,
false);
2095 throw std::runtime_error(
"Unsupported parent node for update: " +
2103 const int64_t queue_time_ms) {
2107 auto execute_delete_for_node = [
this, &co, &eo_in](
const auto node,
2109 const bool is_aggregate) {
2110 auto* table_descriptor = node->getModifiedTableDescriptor();
2111 CHECK(table_descriptor);
2112 if (!table_descriptor->hasDeletedCol) {
2113 throw std::runtime_error(
2114 "DELETE queries are only supported on tables with the vacuum attribute set to "
2122 auto execute_delete_ra_exe_unit =
2123 [
this, &table_infos, &table_descriptor, &eo_in, &co](
const auto& exe_unit,
2124 const bool is_aggregate) {
2126 std::make_unique<DeleteTransactionParameters>(table_descriptor);
2129 CHECK(delete_params);
2139 CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2143 auto table_update_metadata =
2157 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2165 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2168 auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2169 cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2170 const auto rewritten_exe_unit =
2171 query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2172 execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2174 execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2178 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2179 const auto work_unit =
2181 execute_delete_for_node(compound, work_unit, compound->isAggregate());
2182 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2185 if (project->isSimple()) {
2186 CHECK_EQ(
size_t(1), project->inputCount());
2187 const auto input_ra = project->getInput(0);
2188 if (dynamic_cast<const RelSort*>(input_ra)) {
2189 const auto& input_table =
2192 work_unit.exe_unit.scan_limit = input_table->rowCount();
2195 execute_delete_for_node(project, work_unit,
false);
2197 throw std::runtime_error(
"Unsupported parent node for delete: " +
2206 const int64_t queue_time_ms) {
2208 const auto work_unit =
2224 const int64_t queue_time_ms) {
2255 const int64_t queue_time_ms,
2256 const std::optional<size_t> previous_count) {
2262 const auto input_ra = project->
getInput(0);
2263 if (dynamic_cast<const RelSort*>(input_ra)) {
2265 const auto& input_table =
2268 work_unit.exe_unit.scan_limit =
2269 std::min(input_table->getLimit(), input_table->rowCount());
2285 const int64_t queue_time_ms) {
2292 throw std::runtime_error(
"Table functions not supported in distributed mode yet");
2295 throw std::runtime_error(
"Table function support is disabled");
2301 const auto body = table_func_work_unit.body;
2304 const auto table_infos =
2319 auto cached_resultset =
2320 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2322 if (cached_resultset) {
2323 VLOG(1) <<
"recycle table function's resultset of the root node "
2325 result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2334 table_func_work_unit.exe_unit, table_infos, co, eo,
cat_),
2335 body->getOutputMetainfo()};
2339 throw std::runtime_error(
"Table function ran out of memory during execution");
2341 auto query_exec_time =
timer_stop(query_exec_time_begin);
2342 result.setQueueTime(queue_time_ms);
2343 auto resultset_ptr =
result.getDataPtr();
2344 auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2346 resultset_ptr->getBufferSizeBytes(co.
device_type) <=
2349 if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2351 resultset_ptr->setExecTime(query_exec_time);
2352 resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2353 resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2355 resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2356 if (allow_auto_caching_resultset) {
2357 VLOG(1) <<
"Automatically keep table function's query resultset to recycler";
2359 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2360 table_func_work_unit.exe_unit.query_plan_dag_hash,
2361 resultset_ptr->getInputTableKeys(),
2363 resultset_ptr->getBufferSizeBytes(co.
device_type),
2368 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since we do not "
2369 "support resultset recycling on distributed mode";
2371 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2372 "has union-(all) operator";
2374 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2375 "is either validate or explain query";
2377 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored";
2394 std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2395 for (
const auto& element : tuple->getTuple()) {
2398 return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2402 throw std::runtime_error(
"Only columns supported in the window partition for now");
2404 return makeExpr<Analyzer::ColumnVar>(
2405 col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2414 const int64_t queue_time_ms) {
2416 CHECK_EQ(query_infos.size(), size_t(1));
2417 if (query_infos.front().info.fragments.size() != 1) {
2418 throw std::runtime_error(
2419 "Only single fragment tables supported for window functions for now");
2424 query_infos.push_back(query_infos.front());
2433 std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2434 std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2435 sorted_partition_cache;
2436 std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2437 std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2438 window_function_context_map;
2448 std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2449 if (partition_keys.size() >= 1) {
2450 std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2451 if (partition_keys.size() > 1) {
2452 partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2454 CHECK_EQ(partition_keys.size(), size_t(1));
2455 partition_key_tuple = partition_keys.front();
2458 partition_key_cond =
2459 makeExpr<Analyzer::BinOper>(
kBOOLEAN,
2462 partition_key_tuple,
2467 partition_key_cond ,
2469 sorted_partition_key_ref_count_map,
2475 CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2478 for (
auto& kv : window_function_context_map) {
2479 kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2480 window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2486 const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2487 std::unordered_map<
QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2488 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2490 const std::vector<InputTableInfo>& query_infos,
2493 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2494 const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2498 std::unique_ptr<WindowFunctionContext> context;
2500 if (partition_key_cond) {
2501 auto partition_cond_str = partition_key_cond->toString();
2502 auto partition_key_hash = boost::hash_value(partition_cond_str);
2503 boost::hash_combine(partition_cache_key, partition_key_hash);
2504 std::shared_ptr<HashJoin> partition_ptr;
2505 auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2506 if (cached_hash_table_it != partition_cache.end()) {
2507 partition_ptr = cached_hash_table_it->second;
2508 VLOG(1) <<
"Reuse a hash table to compute window function context (key: "
2509 << partition_cache_key <<
", partition condition: " << partition_cond_str
2512 const auto hash_table_or_err =
executor_->buildHashTableForQualifier(
2522 if (!hash_table_or_err.fail_reason.empty()) {
2523 throw std::runtime_error(hash_table_or_err.fail_reason);
2526 partition_ptr = hash_table_or_err.hash_table;
2527 CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2529 VLOG(1) <<
"Put a generated hash table for computing window function context to "
2531 << partition_cache_key <<
", partition condition: " << partition_cond_str
2534 CHECK(partition_ptr);
2538 VLOG(1) <<
"Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2540 context = std::make_unique<WindowFunctionContext>(window_func,
2541 partition_cache_key,
2546 aggregate_tree_fanout);
2548 context = std::make_unique<WindowFunctionContext>(
2549 window_func, elem_count, co.
device_type, row_set_mem_owner);
2552 if (!order_keys.empty()) {
2553 auto sorted_partition_cache_key = partition_cache_key;
2554 for (
auto& order_key : order_keys) {
2555 boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2558 boost::hash_combine(sorted_partition_cache_key, collation.toString());
2560 context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2561 auto cache_key_cnt_it =
2562 sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2563 if (!cache_key_cnt_it.second) {
2564 sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2565 cache_key_cnt_it.first->second + 1;
2568 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2569 for (
const auto& order_key : order_keys) {
2570 const auto order_col =
2573 throw std::runtime_error(
"Only order by columns supported for now");
2575 const int8_t* column;
2576 size_t join_col_elem_count;
2577 std::tie(column, join_col_elem_count) =
2580 query_infos.front().info.fragments.front(),
2588 CHECK_EQ(join_col_elem_count, elem_count);
2589 context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2592 if (context->needsToBuildAggregateTree()) {
2596 auto& window_function_expression_args = window_func->
getArgs();
2597 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2598 for (
auto& expr : window_function_expression_args) {
2601 const int8_t* column;
2602 size_t join_col_elem_count;
2603 std::tie(column, join_col_elem_count) =
2606 query_infos.front().info.fragments.front(),
2614 CHECK_EQ(join_col_elem_count, elem_count);
2615 context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2625 const int64_t queue_time_ms) {
2627 const auto work_unit =
2630 work_unit, filter->
getOutputMetainfo(),
false, co, eo, render_info, queue_time_ms);
2634 std::vector<TargetMetaInfo>
const& rhs) {
2635 if (lhs.size() == rhs.size()) {
2636 for (
size_t i = 0; i < lhs.size(); ++i) {
2637 if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2655 const int64_t queue_time_ms) {
2657 if (!logical_union->
isAll()) {
2658 throw std::runtime_error(
"UNION without ALL is not supported yet.");
2664 throw std::runtime_error(
"UNION does not support subqueries with geo-columns.");
2687 for (
size_t i = 0; i < tuple_type.size(); ++i) {
2688 auto& target_meta_info = tuple_type[i];
2689 if (target_meta_info.get_type_info().is_varlen()) {
2690 throw std::runtime_error(
"Variable length types not supported in VALUES yet.");
2692 if (target_meta_info.get_type_info().get_type() ==
kNULLT) {
2698 {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2702 std::vector<TargetInfo> target_infos;
2703 for (
const auto& tuple_type_component : tuple_type) {
2706 tuple_type_component.get_type_info(),
2713 std::shared_ptr<ResultSet> rs{
2722 return {rs, tuple_type};
2729 const std::string& columnName,
2740 CHECK(dd && dd->stringDict);
2741 int32_t str_id = dd->stringDict->getOrAdd(str);
2742 if (!dd->dictIsTemp) {
2743 const auto checkpoint_ok = dd->stringDict->checkpoint();
2744 if (!checkpoint_ok) {
2745 throw std::runtime_error(
"Failed to checkpoint dictionary for column " +
2749 const bool invalid = str_id > max_valid_int_value<T>();
2750 if (invalid || str_id == inline_int_null_value<int32_t>()) {
2752 LOG(
ERROR) <<
"Could not encode string: " << str
2753 <<
", the encoded value doesn't fit in " <<
sizeof(
T) * 8
2754 <<
" bits. Will store NULL instead.";
2777 throw std::runtime_error(
"EXPLAIN not supported for ModifyTable");
2788 std::vector<TargetMetaInfo> empty_targets;
2789 return {rs, empty_targets};
2804 size_t rows_number = values_lists.size();
2808 size_t rows_per_leaf = rows_number;
2809 if (td->nShards == 0) {
2811 ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2813 auto max_number_of_rows_per_package =
2814 std::max(
size_t(1), std::min(rows_per_leaf,
size_t(64 * 1024)));
2816 std::vector<const ColumnDescriptor*> col_descriptors;
2817 std::vector<int> col_ids;
2818 std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2819 std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2820 std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2821 std::unordered_map<int, int> sequential_ids;
2823 for (
const int col_id : col_id_list) {
2825 const auto col_enc = cd->columnType.get_compression();
2826 if (cd->columnType.is_string()) {
2830 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2831 CHECK(it_ok.second);
2837 const auto it_ok = col_buffers.emplace(
2839 std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2840 max_number_of_rows_per_package));
2841 CHECK(it_ok.second);
2847 }
else if (cd->columnType.is_geometry()) {
2849 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2850 CHECK(it_ok.second);
2851 }
else if (cd->columnType.is_array()) {
2853 arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2854 CHECK(it_ok.second);
2856 const auto it_ok = col_buffers.emplace(
2858 std::unique_ptr<uint8_t[]>(
new uint8_t[cd->columnType.get_logical_size() *
2859 max_number_of_rows_per_package]()));
2860 CHECK(it_ok.second);
2862 col_descriptors.push_back(cd);
2863 sequential_ids[col_id] = col_ids.size();
2864 col_ids.push_back(col_id);
2869 auto table_key = boost::hash_value(table_chunk_key_prefix);
2873 size_t start_row = 0;
2874 size_t rows_left = rows_number;
2875 while (rows_left != 0) {
2877 for (
const auto& kv : col_buffers) {
2878 memset(kv.second.get(), 0, max_number_of_rows_per_package);
2880 for (
auto& kv : str_col_buffers) {
2883 for (
auto& kv : arr_col_buffers) {
2887 auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2892 for (
size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2893 const auto& values_list = values_lists[row_idx + start_row];
2894 for (
size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2895 CHECK(values_list.size() == col_descriptors.size());
2900 dynamic_cast<const Analyzer::UOper*
>(values_list[col_idx]->get_expr());
2906 const auto cd = col_descriptors[col_idx];
2907 auto col_datum = col_cv->get_constval();
2908 auto col_type = cd->columnType.get_type();
2909 uint8_t* col_data_bytes{
nullptr};
2910 if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2911 (!cd->columnType.is_string() ||
2913 const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2914 CHECK(col_data_bytes_it != col_buffers.end());
2915 col_data_bytes = col_data_bytes_it->second.get();
2919 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
2920 auto null_bool_val =
2922 col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2924 : (col_datum.boolval ? 1 : 0);
2928 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
2929 col_data[row_idx] = col_cv->get_is_null()
2931 : col_datum.tinyintval;
2935 auto col_data =
reinterpret_cast<int16_t*
>(col_data_bytes);
2936 col_data[row_idx] = col_cv->get_is_null()
2938 : col_datum.smallintval;
2942 auto col_data =
reinterpret_cast<int32_t*
>(col_data_bytes);
2943 col_data[row_idx] = col_cv->get_is_null()
2951 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
2952 col_data[row_idx] = col_cv->get_is_null()
2954 : col_datum.bigintval;
2958 auto col_data =
reinterpret_cast<float*
>(col_data_bytes);
2959 col_data[row_idx] = col_datum.floatval;
2963 auto col_data =
reinterpret_cast<double*
>(col_data_bytes);
2964 col_data[row_idx] = col_datum.doubleval;
2970 switch (cd->columnType.get_compression()) {
2972 str_col_buffers[col_ids[col_idx]].push_back(
2973 col_datum.stringval ? *col_datum.stringval :
"");
2976 switch (cd->columnType.get_size()) {
2979 &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
2986 &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
2993 &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3011 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
3012 col_data[row_idx] = col_cv->get_is_null()
3014 : col_datum.bigintval;
3018 const auto is_null = col_cv->get_is_null();
3019 const auto size = cd->columnType.get_size();
3022 const auto is_point_coords =
3024 if (
is_null && !is_point_coords) {
3028 for (int8_t* p = buf + elem_ti.
get_size(); (p - buf) < size;
3030 put_null(static_cast<void*>(p), elem_ti,
"");
3032 arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf,
is_null);
3034 arr_col_buffers[col_ids[col_idx]].emplace_back(0,
nullptr,
is_null);
3038 const auto l = col_cv->get_value_list();
3039 size_t len = l.size() * elem_ti.
get_size();
3040 if (size > 0 && static_cast<size_t>(size) != len) {
3041 throw std::runtime_error(
"Array column " + cd->columnName +
" expects " +
3043 " values, " +
"received " +
3051 int32_t* p =
reinterpret_cast<int32_t*
>(buf);
3058 &p[elemIndex], cd->columnName, elem_ti, c.get(),
cat_);
3080 str_col_buffers[col_ids[col_idx]].push_back(
3081 col_datum.stringval ? *col_datum.stringval :
"");
3088 start_row += package_size;
3089 rows_left -= package_size;
3093 insert_data.
tableId = table_id;
3094 insert_data.
data.resize(col_ids.size());
3096 for (
const auto& kv : col_buffers) {
3098 p.
numbersPtr =
reinterpret_cast<int8_t*
>(kv.second.get());
3099 insert_data.
data[sequential_ids[kv.first]] = p;
3101 for (
auto& kv : str_col_buffers) {
3104 insert_data.
data[sequential_ids[kv.first]] = p;
3106 for (
auto& kv : arr_col_buffers) {
3109 insert_data.
data[sequential_ids[kv.first]] = p;
3111 insert_data.
numRows = package_size;
3123 std::vector<TargetMetaInfo> empty_targets;
3124 return {rs, empty_targets};
3130 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
3134 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
3135 return (compound && compound->isAggregate()) ? 0 : limit;
3139 return !order_entries.empty() && order_entries.front().is_desc;
3148 const int64_t queue_time_ms) {
3151 const auto source = sort->
getInput(0);
3158 executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3161 auto& aggregated_result = it->second;
3162 auto& result_rows = aggregated_result.rs;
3163 const size_t limit = sort->
getLimit();
3164 const size_t offset = sort->
getOffset();
3165 if (limit || offset) {
3166 if (!order_entries.empty()) {
3167 result_rows->sort(order_entries, limit + offset,
executor_);
3169 result_rows->dropFirstN(offset);
3171 result_rows->keepFirstN(limit);
3181 source_work_unit.exe_unit.target_exprs,
3182 aggregated_result.targets_meta);
3191 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3192 bool is_desc{
false};
3193 bool use_speculative_top_n_sort{
false};
3195 auto execute_sort_query = [
this,
3207 const size_t limit = sort->
getLimit();
3208 const size_t offset = sort->
getOffset();
3210 auto source_node = sort->
getInput(0);
3213 auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3217 if (
auto cached_resultset =
3218 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3219 source_query_plan_dag)) {
3220 CHECK(cached_resultset->canUseSpeculativeTopNSort());
3221 VLOG(1) <<
"recycle resultset of the root node " << source_node->getRelNodeDagId()
3222 <<
" from resultset cache";
3224 ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3228 use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3230 source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3234 if (!source_result.getDataPtr()) {
3242 eo.allow_loop_joins,
3246 eo.with_dynamic_watchdog,
3247 eo.dynamic_watchdog_time_limit,
3248 eo.find_push_down_candidates,
3249 eo.just_calcite_explain,
3250 eo.gpu_input_mem_limit_percent,
3251 eo.allow_runtime_query_interrupt,
3252 eo.running_query_interrupt_freq,
3253 eo.pending_query_interrupt_freq,
3257 groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3259 source->getOutputMetainfo(),
3265 use_speculative_top_n_sort =
3266 source_result.
getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3268 source_result.getRows()->getQueryMemDesc());
3270 if (render_info && render_info->isPotentialInSituRender()) {
3271 return source_result;
3273 if (source_result.isFilterPushDownEnabled()) {
3274 return source_result;
3276 auto rows_to_sort = source_result.getRows();
3277 if (eo.just_explain) {
3278 return {rows_to_sort, {}};
3280 if (sort->
collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3281 !use_speculative_top_n_sort) {
3282 const size_t top_n = limit == 0 ? 0 : limit + offset;
3283 rows_to_sort->sort(order_entries, top_n,
executor_);
3285 if (limit || offset) {
3287 if (offset >= rows_to_sort->rowCount()) {
3288 rows_to_sort->dropFirstN(offset);
3290 rows_to_sort->keepFirstN(limit + offset);
3293 rows_to_sort->dropFirstN(offset);
3295 rows_to_sort->keepFirstN(limit);
3299 return {rows_to_sort, source_result.getTargetsMeta()};
3303 return execute_sort_query();
3305 CHECK_EQ(
size_t(1), groupby_exprs.size());
3306 CHECK(groupby_exprs.front());
3308 return execute_sort_query();
3314 std::list<Analyzer::OrderEntry>& order_entries,
3316 const auto source = sort->
getInput(0);
3317 const size_t limit = sort->
getLimit();
3318 const size_t offset = sort->
getOffset();
3320 const size_t scan_total_limit =
3322 size_t max_groups_buffer_entry_guess{
3328 const auto& source_exe_unit = source_work_unit.exe_unit;
3331 for (
auto order_entry : order_entries) {
3333 const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3335 if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3336 throw std::runtime_error(
3337 "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3341 if (source_exe_unit.groupby_exprs.size() == 1) {
3342 if (!source_exe_unit.groupby_exprs.front()) {
3356 std::move(source_exe_unit.input_col_descs),
3357 source_exe_unit.simple_quals,
3358 source_exe_unit.quals,
3359 source_exe_unit.join_quals,
3360 source_exe_unit.groupby_exprs,
3361 source_exe_unit.target_exprs,
3363 {sort_info.order_entries,
3367 sort_info.limit_delivered},
3369 source_exe_unit.query_hint,
3370 source_exe_unit.query_plan_dag_hash,
3371 source_exe_unit.hash_table_build_plan_dag,
3372 source_exe_unit.table_id_to_node_map,
3373 source_exe_unit.use_bump_allocator,
3374 source_exe_unit.union_all,
3375 source_exe_unit.query_state},
3377 max_groups_buffer_entry_guess,
3378 std::move(source_work_unit.query_rewriter),
3379 source_work_unit.input_permutation,
3380 source_work_unit.left_deep_join_input_sizes};
3392 CHECK(!table_infos.empty());
3393 const auto& first_table = table_infos.front();
3394 size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3395 for (
const auto& table_info : table_infos) {
3396 if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3397 max_num_groups = table_info.info.getNumTuplesUpperBound();
3400 return std::max(max_num_groups,
size_t(1));
3420 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
3423 if (target_expr->get_type_info().is_varlen()) {
3427 if (
auto top_project = dynamic_cast<const RelProject*>(body)) {
3428 if (top_project->isRowwiseOutputForced()) {
3447 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
3448 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3460 return !(ra_exe_unit.
quals.empty() && ra_exe_unit.
join_quals.empty() &&
3466 const std::vector<InputTableInfo>& table_infos,
3467 const Executor* executor,
3469 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3471 for (
size_t i = 0; i < ra_exe_unit.
target_exprs.size(); ++i) {
3477 CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3480 const auto& arg_ti = arg->get_type_info();
3486 if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3487 (arg_ti.is_string() && arg_ti.get_compression() ==
kENCODING_DICT))) {
3498 const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3499 const auto sub_bitmap_count =
3501 int64_t approx_bitmap_sz_bits{0};
3502 const auto error_rate =
static_cast<Analyzer::AggExpr*
>(target_expr)->get_arg1();
3504 CHECK(error_rate->get_type_info().get_type() ==
kINT);
3505 CHECK_GE(error_rate->get_constval().intval, 1);
3511 arg_range.getIntMin(),
3512 approx_bitmap_sz_bits,
3517 arg_range.getIntMin(),
3522 if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3523 precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3524 auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3526 target_exprs_owned.push_back(precise_count_distinct);
3527 ra_exe_unit.
target_exprs[i] = precise_count_distinct.get();
3544 const std::vector<TargetMetaInfo>& targets_meta,
3549 const int64_t queue_time_ms,
3550 const std::optional<size_t> previous_count) {
3561 ScopeGuard clearWindowContextIfNecessary = [&]() {
3568 throw std::runtime_error(
"Window functions support is disabled");
3572 computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3584 const auto body = work_unit.
body;
3587 VLOG(3) <<
"body->getId()=" << body->getId()
3589 <<
" it==leaf_results_.end()=" << (it ==
leaf_results_.end());
3593 auto& aggregated_result = it->second;
3594 auto& result_rows = aggregated_result.rs;
3596 body->setOutputMetainfo(aggregated_result.targets_meta);
3610 auto candidate =
query_dag_->getQueryHint(body);
3612 ra_exe_unit.query_hint = *candidate;
3617 CHECK_EQ(table_infos.size(), size_t(1));
3618 CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3619 max_groups_buffer_entry_guess =
3620 table_infos.front().info.fragments.front().getNumTuples();
3621 ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3624 ra_exe_unit.scan_limit = *previous_count;
3628 ra_exe_unit.scan_limit = 0;
3629 ra_exe_unit.use_bump_allocator =
true;
3631 ra_exe_unit.scan_limit = 0;
3634 if (filter_count_all) {
3635 ra_exe_unit.scan_limit = std::max(*filter_count_all,
size_t(1));
3646 VLOG(1) <<
"Using columnar layout for projection as output size of "
3647 << ra_exe_unit.scan_limit <<
" rows exceeds threshold of "
3664 auto execute_and_handle_errors = [&](
const auto max_groups_buffer_entry_guess_in,
3665 const bool has_cardinality_estimation,
3670 auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3672 return {
executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3680 has_cardinality_estimation,
3689 {ra_exe_unit, work_unit.
body, local_groups_buffer_entry_guess},
3701 for (
const auto& table_info : table_infos) {
3703 if (td && (td->isTemporaryTable() || td->isView)) {
3704 use_resultset_cache =
false;
3706 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has either "
3707 "temporary table or view";
3714 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3715 auto card = cached_cardinality.second;
3716 if (cached_cardinality.first && card >= 0) {
3717 result = execute_and_handle_errors(
3720 result = execute_and_handle_errors(
3721 max_groups_buffer_entry_guess,
3727 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3728 auto card = cached_cardinality.second;
3729 if (cached_cardinality.first && card >= 0) {
3730 result = execute_and_handle_errors(card,
true,
true);
3732 const auto ndv_groups_estimation =
3734 const auto estimated_groups_buffer_entry_guess =
3735 ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3738 CHECK_GT(estimated_groups_buffer_entry_guess,
size_t(0));
3739 result = execute_and_handle_errors(
3740 estimated_groups_buffer_entry_guess,
true,
true);
3742 executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3747 result.setQueueTime(queue_time_ms);
3752 return {std::make_shared<ResultSet>(
3756 ?
executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3762 for (
auto& target_info :
result.getTargetsMeta()) {
3763 if (target_info.get_type_info().is_string() &&
3764 !target_info.get_type_info().is_dict_encoded_string()) {
3766 use_resultset_cache =
false;
3768 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has non-encoded "
3769 "string column projection";
3775 auto allow_auto_caching_resultset =
3778 if (use_resultset_cache && (eo.
keep_result || allow_auto_caching_resultset) &&
3780 auto query_exec_time =
timer_stop(query_exec_time_begin);
3781 res->setExecTime(query_exec_time);
3782 res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3783 res->setTargetMetaInfo(body->getOutputMetainfo());
3785 res->setInputTableKeys(std::move(input_table_keys));
3786 if (allow_auto_caching_resultset) {
3787 VLOG(1) <<
"Automatically keep query resultset to recycler";
3789 res->setUseSpeculativeTopNSort(
3791 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3792 ra_exe_unit.query_plan_dag_hash,
3793 res->getInputTableKeys(),
3800 VLOG(1) <<
"Query hint \'keep_result\' is ignored since we do not support "
3801 "resultset recycling on distributed mode";
3803 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has union-(all) "
3806 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is classified as "
3807 "a in-situ rendering query";
3809 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is either "
3810 "validate or explain query";
3812 VLOG(1) <<
"Query hint \'keep_result\' is ignored";
3830 const auto count_all_exe_unit =
3852 }
catch (
const std::exception& e) {
3853 LOG(
WARNING) <<
"Failed to run pre-flight filtered count with error " << e.what();
3854 return std::nullopt;
3856 const auto count_row = count_all_result->getNextRow(
false,
false);
3857 CHECK_EQ(
size_t(1), count_row.size());
3858 const auto& count_tv = count_row.front();
3859 const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3860 CHECK(count_scalar_tv);
3861 const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3864 auto count_upper_bound =
static_cast<size_t>(*count_ptr);
3865 return std::max(count_upper_bound,
size_t(1));
3869 const auto& ra_exe_unit = work_unit.
exe_unit;
3870 if (ra_exe_unit.input_descs.size() != 1) {
3873 const auto& table_desc = ra_exe_unit.
input_descs.front();
3877 const int table_id = table_desc.getTableId();
3878 for (
const auto& simple_qual : ra_exe_unit.simple_quals) {
3879 const auto comp_expr =
3881 if (!comp_expr || comp_expr->get_optype() !=
kEQ) {
3886 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3889 const auto rhs = comp_expr->get_right_operand();
3895 if (cd->isVirtualCol) {
3905 const std::vector<TargetMetaInfo>& targets_meta,
3910 const bool was_multifrag_kernel_launch,
3911 const int64_t queue_time_ms) {
3916 auto ra_exe_unit_in = work_unit.
exe_unit;
3949 if (was_multifrag_kernel_launch) {
3953 LOG(
WARNING) <<
"Multifrag query ran out of memory, retrying with multifragment "
3954 "kernels disabled.";
3969 result.setQueueTime(queue_time_ms);
3972 LOG(
WARNING) <<
"Kernel per fragment query ran out of memory, retrying on CPU.";
3986 VLOG(1) <<
"Resetting max groups buffer entry guess.";
3987 max_groups_buffer_entry_guess = 0;
3989 int iteration_ctr = -1;
4013 CHECK(max_groups_buffer_entry_guess);
4017 throw std::runtime_error(
"Query ran out of output slots in the result");
4019 max_groups_buffer_entry_guess *= 2;
4020 LOG(
WARNING) <<
"Query ran out of slots in the output buffer, retrying with max "
4021 "groups buffer entry "
4023 << max_groups_buffer_entry_guess;
4029 result.setQueueTime(queue_time_ms);
4036 LOG(
ERROR) <<
"Query execution failed with error "
4042 LOG(
INFO) <<
"Query ran out of GPU memory, attempting punt to CPU";
4044 throw std::runtime_error(
4045 "Query ran out of GPU memory, unable to automatically retry on CPU");
4054 const char* code{
nullptr};
4055 const char* description{
nullptr};
4060 switch (error_code) {
4062 return {
"ERR_DIV_BY_ZERO",
"Division by zero"};
4064 return {
"ERR_OUT_OF_GPU_MEM",
4066 "Query couldn't keep the entire working set of columns in GPU memory"};
4068 return {
"ERR_UNSUPPORTED_SELF_JOIN",
"Self joins not supported yet"};
4070 return {
"ERR_OUT_OF_CPU_MEM",
"Not enough host memory to execute the query"};
4072 return {
"ERR_OVERFLOW_OR_UNDERFLOW",
"Overflow or underflow"};
4074 return {
"ERR_OUT_OF_TIME",
"Query execution has exceeded the time limit"};
4076 return {
"ERR_INTERRUPTED",
"Query execution has been interrupted"};
4078 return {
"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4079 "Columnar conversion not supported for variable length types"};
4081 return {
"ERR_TOO_MANY_LITERALS",
"Too many literals in the query"};
4083 return {
"ERR_STRING_CONST_IN_RESULTSET",
4085 "NONE ENCODED String types are not supported as input result set."};
4087 return {
"ERR_OUT_OF_RENDER_MEM",
4089 "Insufficient GPU memory for query results in render output buffer "
4090 "sized by render-mem-bytes"};
4092 return {
"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4093 "Streaming-Top-N not supported in Render Query"};
4095 return {
"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4096 "Multiple distinct values encountered"};
4098 return {
"ERR_GEOS",
"ERR_GEOS"};
4100 return {
"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4102 "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4104 return {
nullptr,
nullptr};
4111 if (error_code < 0) {
4112 return "Ran out of slots in the query output buffer";
4116 if (errorInfo.code) {
4117 return errorInfo.code +
": "s + errorInfo.description;
4125 VLOG(1) <<
"Running post execution callback.";
4126 (*post_execution_callback_)();
4133 const auto compound =
dynamic_cast<const RelCompound*
>(node);
4137 const auto project =
dynamic_cast<const RelProject*
>(node);
4141 const auto aggregate =
dynamic_cast<const RelAggregate*
>(node);
4145 const auto filter =
dynamic_cast<const RelFilter*
>(node);
4149 LOG(
FATAL) <<
"Unhandled node type: "
4158 if (
auto join = dynamic_cast<const RelJoin*>(sink)) {
4159 return join->getJoinType();
4161 if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4169 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4170 if (!condition || condition->getOperator() !=
kOR || condition->size() != 2) {
4173 const auto equi_join_condition =
4174 dynamic_cast<const RexOperator*
>(condition->getOperand(0));
4175 if (!equi_join_condition || equi_join_condition->getOperator() !=
kEQ) {
4178 const auto both_are_null_condition =
4179 dynamic_cast<const RexOperator*
>(condition->getOperand(1));
4180 if (!both_are_null_condition || both_are_null_condition->getOperator() !=
kAND ||
4181 both_are_null_condition->size() != 2) {
4184 const auto lhs_is_null =
4185 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(0));
4186 const auto rhs_is_null =
4187 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(1));
4188 if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() !=
kISNULL ||
4189 rhs_is_null->getOperator() !=
kISNULL) {
4192 CHECK_EQ(
size_t(1), lhs_is_null->size());
4193 CHECK_EQ(
size_t(1), rhs_is_null->size());
4194 CHECK_EQ(
size_t(2), equi_join_condition->size());
4195 const auto eq_lhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(0));
4196 const auto eq_rhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(1));
4197 const auto is_null_lhs =
dynamic_cast<const RexInput*
>(lhs_is_null->getOperand(0));
4198 const auto is_null_rhs =
dynamic_cast<const RexInput*
>(rhs_is_null->getOperand(0));
4199 if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4202 std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4203 if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4205 auto lhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(0));
4206 auto rhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(1));
4207 eq_operands.emplace_back(lhs_op_copy.release());
4208 eq_operands.emplace_back(rhs_op_copy.release());
4209 return boost::make_unique<const RexOperator>(
4210 kBW_EQ, eq_operands, equi_join_condition->getType());
4217 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4218 if (condition && condition->getOperator() ==
kAND) {
4219 CHECK_GE(condition->size(), size_t(2));
4224 for (
size_t i = 1; i < condition->size(); ++i) {
4225 std::vector<std::unique_ptr<const RexScalar>> and_operands;
4226 and_operands.emplace_back(std::move(acc));
4229 boost::make_unique<const RexOperator>(
kAND, and_operands, condition->getType());
4239 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->
inputCount() - 1;
4244 auto cur_level_join_type = left_deep_join->
getJoinType(nesting_level);
4246 join_types[nesting_level - 1] = cur_level_join_type;
4254 std::vector<InputDescriptor>& input_descs,
4255 std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4257 std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4259 const std::vector<InputTableInfo>& query_infos,
4260 const Executor* executor) {
4266 const auto&
cat = *executor->getCatalog();
4267 for (
const auto& table_info : query_infos) {
4268 if (table_info.table_id < 0) {
4271 const auto td =
cat.getMetadataForTable(table_info.table_id);
4277 const auto input_permutation =
4280 std::tie(input_descs, input_col_descs, std::ignore) =
4282 return input_permutation;
4287 std::vector<size_t> input_sizes;
4288 for (
size_t i = 0; i < left_deep_join->
inputCount(); ++i) {
4290 input_sizes.push_back(inputs.size());
4296 const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4297 std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4298 for (
const auto& qual : quals) {
4300 rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4302 return rewritten_quals;
4311 std::vector<InputDescriptor> input_descs;
4312 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4314 std::tie(input_descs, input_col_descs, std::ignore) =
4319 const auto left_deep_join =
4324 std::vector<size_t> input_permutation;
4325 std::vector<size_t> left_deep_join_input_sizes;
4326 std::optional<unsigned> left_deep_tree_id;
4327 if (left_deep_join) {
4328 left_deep_tree_id = left_deep_join->getId();
4331 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4333 std::find(join_types.begin(), join_types.end(),
JoinType::LEFT) ==
4337 left_deep_join_quals,
4338 input_to_nest_level,
4343 std::tie(input_descs, input_col_descs, std::ignore) =
4346 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4352 input_to_nest_level,
4356 const auto scalar_sources =
4369 auto candidate =
query_dag_->getQueryHint(compound);
4371 query_hint = *candidate;
4379 left_deep_join_quals,
4392 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4393 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4394 const auto targets_meta =
get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4397 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4398 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4399 rewritten_exe_unit.join_quals);
4403 compound, left_deep_tree_id, left_deep_trees_info,
executor_);
4404 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4405 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4407 return {rewritten_exe_unit,
4410 std::move(query_rewriter),
4412 left_deep_join_input_sizes};
4418 const auto left_deep_join =
4422 return std::make_shared<RelAlgTranslator>(
4430 const auto bin_oper =
dynamic_cast<const RexOperator*
>(qual_expr);
4431 if (!bin_oper || bin_oper->getOperator() !=
kAND) {
4434 CHECK_GE(bin_oper->size(), size_t(2));
4436 for (
size_t i = 1; i < bin_oper->size(); ++i) {
4438 lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4444 const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4446 CHECK(!factors.empty());
4447 auto acc = factors.front();
4448 for (
size_t i = 1; i < factors.size(); ++i) {
4454 template <
class QualsList>
4456 const std::shared_ptr<Analyzer::Expr>& needle) {
4457 for (
const auto& qual : haystack) {
4458 if (*qual == *needle) {
4469 const std::shared_ptr<Analyzer::Expr>& expr) {
4471 CHECK_GE(expr_terms.size(), size_t(1));
4472 const auto& first_term = expr_terms.front();
4474 std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4477 for (
const auto& first_term_factor : first_term_factors.quals) {
4479 expr_terms.size() > 1;
4480 for (
size_t i = 1; i < expr_terms.size(); ++i) {
4488 common_factors.push_back(first_term_factor);
4491 if (common_factors.empty()) {
4495 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4496 for (
const auto& term : expr_terms) {
4498 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4499 term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4500 for (
const auto& qual : term_cf.quals) {
4502 remaining_quals.push_back(qual);
4505 if (!remaining_quals.empty()) {
4511 if (remaining_terms.empty()) {
4522 const std::vector<JoinType>& join_types,
4523 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4524 const bool just_explain)
const {
4528 std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4529 for (
const auto rex_condition_component : rex_condition_cf) {
4531 const auto join_condition =
4533 bw_equals ? bw_equals.get() : rex_condition_component));
4536 auto append_folded_cf_quals = [&join_condition_quals](
const auto& cf_quals) {
4537 for (
const auto& cf_qual : cf_quals) {
4538 join_condition_quals.emplace_back(
fold_expr(cf_qual.get()));
4542 append_folded_cf_quals(join_condition_cf.quals);
4543 append_folded_cf_quals(join_condition_cf.simple_quals);
4553 const std::vector<InputDescriptor>& input_descs,
4554 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4555 const bool just_explain) {
4561 std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4562 for (
size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4564 if (outer_condition) {
4565 result[rte_idx - 1].quals =
4566 makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4567 CHECK_LE(rte_idx, join_types.size());
4572 for (
const auto& qual : join_condition_quals) {
4573 if (visited_quals.count(qual)) {
4576 const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4577 if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4578 const auto it_ok = visited_quals.emplace(qual);
4579 CHECK(it_ok.second);
4580 result[rte_idx - 1].quals.push_back(qual);
4583 CHECK_LE(rte_idx, join_types.size());
4587 result[rte_idx - 1].type = join_types[rte_idx - 1];
4596 const size_t nest_level,
4597 const std::vector<TargetMetaInfo>& in_metainfo,
4598 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4601 const auto input = ra_node->
getInput(nest_level);
4602 const auto it_rte_idx = input_to_nest_level.find(input);
4603 CHECK(it_rte_idx != input_to_nest_level.end());
4604 const int rte_idx = it_rte_idx->second;
4606 std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4607 const auto scan_ra =
dynamic_cast<const RelScan*
>(input);
4609 for (
const auto& input_meta : in_metainfo) {
4611 std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4613 scan_ra ? input_idx + 1 : input_idx,
4621 std::vector<std::shared_ptr<Analyzer::Expr>>
const& input) {
4622 std::vector<Analyzer::Expr*> output(input.size());
4623 auto const raw_ptr = [](
auto& shared_ptr) {
return shared_ptr.get(); };
4624 std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4633 const bool just_explain) {
4634 std::vector<InputDescriptor> input_descs;
4635 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4636 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4638 std::tie(input_descs, input_col_descs, used_inputs_owned) =
4645 input_to_nest_level,
4650 const auto source = aggregate->
getInput(0);
4652 const auto scalar_sources =
4664 auto candidate =
query_dag_->getQueryHint(aggregate);
4666 query_hint = *candidate;
4683 join_info.hash_table_plan_dag,
4684 join_info.table_id_to_node_map,
4697 std::vector<InputDescriptor> input_descs;
4698 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4700 std::tie(input_descs, input_col_descs, std::ignore) =
4704 const auto left_deep_join =
4709 std::vector<size_t> input_permutation;
4710 std::vector<size_t> left_deep_join_input_sizes;
4711 std::optional<unsigned> left_deep_tree_id;
4712 if (left_deep_join) {
4713 left_deep_tree_id = left_deep_join->getId();
4717 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4721 left_deep_join_quals,
4722 input_to_nest_level,
4727 std::tie(input_descs, input_col_descs, std::ignore) =
4730 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4737 input_to_nest_level,
4741 const auto target_exprs_owned =
4749 auto candidate =
query_dag_->getQueryHint(project);
4751 query_hint = *candidate;
4758 left_deep_join_quals,
4771 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4772 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4773 const auto targets_meta =
get_targets_meta(project, rewritten_exe_unit.target_exprs);
4776 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4777 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4778 rewritten_exe_unit.join_quals);
4782 project, left_deep_tree_id, left_deep_trees_info,
executor_);
4783 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4784 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4786 return {rewritten_exe_unit,
4789 std::move(query_rewriter),
4791 left_deep_join_input_sizes};
4800 const int negative_node_id = -input_node->
getId();
4801 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4802 target_exprs.reserve(tmis.size());
4803 for (
size_t i = 0; i < tmis.size(); ++i) {
4804 target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4805 tmis[i].get_type_info(), negative_node_id, i, 0));
4807 return target_exprs;
4816 std::vector<InputDescriptor> input_descs;
4817 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4820 std::tie(input_descs, input_col_descs, std::ignore) =
4823 auto const max_num_tuples =
4827 [](
auto max,
auto const& query_info) {
4828 return std::max(max, query_info.info.getNumTuples());
4831 VLOG(3) <<
"input_to_nest_level.size()=" << input_to_nest_level.size() <<
" Pairs are:";
4832 for (
auto& pair : input_to_nest_level) {
4834 << pair.second <<
')';
4842 std::vector<Analyzer::Expr*> target_exprs_pair[2];
4843 for (
unsigned i = 0; i < 2; ++i) {
4845 CHECK(!input_exprs_owned.empty())
4846 <<
"No metainfo found for input node(" << i <<
") "
4848 VLOG(3) <<
"i(" << i <<
") input_exprs_owned.size()=" << input_exprs_owned.size();
4849 for (
auto& input_expr : input_exprs_owned) {
4850 VLOG(3) <<
" " << input_expr->toString();
4858 <<
" target_exprs.size()=" << target_exprs_pair[0].size()
4859 <<
" max_num_tuples=" << max_num_tuples;
4867 target_exprs_pair[0],
4876 logical_union->
isAll(),
4878 target_exprs_pair[1]};
4879 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4880 const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4883 if (
auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4886 }
else if (
auto const* node = dynamic_cast<const RelProject*>(input0)) {
4889 }
else if (
auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4892 }
else if (
auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4895 }
else if (
auto const* node = dynamic_cast<const RelScan*>(input0)) {
4898 }
else if (
auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4901 }
else if (dynamic_cast<const RelSort*>(input0)) {
4902 throw QueryNotSupported(
"LIMIT and OFFSET are not currently supported with UNION.");
4907 VLOG(3) <<
"logical_union->getOutputMetainfo()="
4909 <<
" rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4910 << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4912 return {rewritten_exe_unit,
4915 std::move(query_rewriter)};
4920 const bool just_explain,
4921 const bool is_gpu) {
4922 std::vector<InputDescriptor> input_descs;
4923 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4925 std::tie(input_descs, input_col_descs, std::ignore) =
4936 const auto table_function_impl_and_type_infos = [=]() {
4942 LOG(
WARNING) <<
"createTableFunctionWorkUnit[GPU]: " << e.what()
4944 <<
" step to run on CPU.";
4952 LOG(
WARNING) <<
"createTableFunctionWorkUnit[CPU]: " << e.what();
4957 const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4958 const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4960 size_t output_row_sizing_param = 0;
4961 if (table_function_impl
4962 .hasUserSpecifiedOutputSizeParameter()) {
4963 const auto parameter_index =
4964 table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4965 CHECK_GT(parameter_index,
size_t(0));
4967 const auto parameter_expr =
4969 const auto parameter_expr_literal =
dynamic_cast<const RexLiteral*
>(parameter_expr);
4970 if (!parameter_expr_literal) {
4971 throw std::runtime_error(
4972 "Provided output buffer sizing parameter is not a literal. Only literal "
4973 "values are supported with output buffer sizing configured table "
4976 int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4977 if (literal_val < 0) {
4978 throw std::runtime_error(
"Provided output sizing parameter " +
4980 " must be positive integer.");
4982 output_row_sizing_param =
static_cast<size_t>(literal_val);
4985 output_row_sizing_param = 1;
4987 static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4988 makeExpr<Analyzer::Constant>(
kINT,
false, d);
4990 input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4991 DEFAULT_ROW_MULTIPLIER_EXPR.get());
4993 }
else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4994 output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4999 std::vector<Analyzer::ColumnVar*> input_col_exprs;
5000 size_t input_index = 0;
5001 size_t arg_index = 0;
5002 const auto table_func_args = table_function_impl.getInputArgs();
5003 CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5004 for (
const auto& ti : table_function_type_infos) {
5005 if (ti.is_column_list()) {
5006 for (
int i = 0; i < ti.get_dimension(); i++) {
5007 auto& input_expr = input_exprs[input_index];
5013 auto type_info = input_expr->get_type_info();
5014 type_info.set_subtype(type_info.get_type());
5015 type_info.set_type(ti.get_type());
5016 type_info.set_dimension(ti.get_dimension());
5017 input_expr->set_type_info(type_info);
5019 input_col_exprs.push_back(col_var);
5022 }
else if (ti.is_column()) {
5023 auto& input_expr = input_exprs[input_index];
5029 auto type_info = input_expr->get_type_info();
5030 type_info.set_subtype(type_info.get_type());
5031 type_info.set_type(ti.get_type());
5032 input_expr->set_type_info(type_info);
5034 input_col_exprs.push_back(col_var);
5037 auto input_expr = input_exprs[input_index];
5039 if (ext_func_arg_ti != input_expr->get_type_info()) {
5040 input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
5047 std::vector<Analyzer::Expr*> table_func_outputs;
5048 constexpr int32_t transient_pos{-1};
5049 for (
size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5050 auto ti = table_function_impl.getOutputSQLType(i);
5051 if (ti.is_dict_encoded_string()) {
5052 auto p = table_function_impl.getInputID(i);
5054 int32_t input_pos = p.first;
5055 if (input_pos == transient_pos) {
5061 for (
int j = 0; j < input_pos; j++) {
5062 const auto ti = table_function_type_infos[j];
5063 offset += ti.is_column_list() ? ti.get_dimension() : 1;
5065 input_pos = offset + p.second;
5067 CHECK_LT(input_pos, input_exprs.size());
5068 int32_t comp_param =
5069 input_exprs_owned[input_pos]->get_type_info().get_comp_param();
5070 ti.set_comp_param(comp_param);
5082 output_row_sizing_param,
5083 table_function_impl};
5086 return {exe_unit, rel_table_func};
5091 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5094 const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5095 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5096 std::vector<TargetMetaInfo> in_metainfo;
5097 std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5099 auto input_it = inputs_owned.begin();
5100 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5101 const auto source = data_sink_node->getInput(nest_level);
5102 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
5104 CHECK(source->getOutputMetainfo().empty());
5105 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5106 for (
size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5109 const auto source_metadata =
5112 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5114 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5116 const auto& source_metadata = source->getOutputMetainfo();
5117 input_it += source_metadata.size();
5119 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5121 data_sink_node, nest_level, source_metadata, input_to_nest_level);
5123 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5126 return std::make_pair(in_metainfo, exprs_owned);
5133 const bool just_explain) {
5135 std::vector<InputDescriptor> input_descs;
5136 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5137 std::vector<TargetMetaInfo> in_metainfo;
5138 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5139 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5142 std::tie(input_descs, input_col_descs, used_inputs_owned) =
5148 input_to_nest_level,
5152 std::tie(in_metainfo, target_exprs_owned) =
5153 get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5154 const auto filter_expr = translator.translateScalarRex(filter->
getCondition());
5157 const auto qual =
fold_expr(filter_expr.get());
5166 auto candidate =
query_dag_->getQueryHint(filter);
5168 query_hint = *candidate;
5173 return {{input_descs,
5176 {rewritten_qual ? rewritten_qual : qual},
5185 join_info.hash_table_plan_dag,
5186 join_info.table_id_to_node_map},
5195 if (
auto foreign_storage_mgr =
5200 foreign_storage_mgr->setParallelismHints({});
5209 executor_->setupCaching(phys_inputs, phys_table_ids);
const size_t getGroupByCount() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
static void invalidateCachesByTable(size_t table_key)
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const