47 #include <boost/algorithm/cxx11/any_of.hpp>
48 #include <boost/range/adaptor/reversed.hpp>
70 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
71 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
72 return ((compound && compound->isAggregate()) || aggregate);
79 std::unordered_set<PhysicalInput> phys_inputs2;
80 for (
auto& phi : phys_inputs) {
89 std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
90 parallelism_hints_per_table;
92 int table_id = physical_input.table_id;
95 !table->is_system_table) {
99 for (
const auto& fragment :
100 foreign_table->fragmenter->getFragmentsForQuery().fragments) {
103 catalog.
getDatabaseId(), table_id, col_id, fragment.fragmentId};
112 if (!chunk.isChunkOnDevice(
114 parallelism_hints_per_table[{catalog.
getDatabaseId(), table_id}].insert(
116 fragment.fragmentId});
121 if (!parallelism_hints_per_table.empty()) {
122 auto foreign_storage_mgr =
124 CHECK(foreign_storage_mgr);
125 foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
153 std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
155 int table_id = physical_input.table_id;
157 if (table && table->is_system_table) {
159 system_table_columns_by_table_id[table_id].emplace_back(column_id);
163 if (!system_table_columns_by_table_id.empty() &&
168 for (
const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
175 CHECK(td->fragmenter);
176 auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
177 CHECK_LE(fragment_count, static_cast<size_t>(1))
178 <<
"In-memory system tables are expected to have a single fragment.";
179 if (fragment_count > 0) {
180 for (
auto column_id : column_ids) {
200 std::list<Analyzer::OrderEntry>
result;
203 result.emplace_back(sort_field.getField() + 1,
211 const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
212 const std::vector<TargetMetaInfo>& targets_meta) {
213 CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
215 for (
size_t i = 0; i < targets_meta.size(); ++i) {
216 render_info.
targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
217 targets_meta[i].get_resname(),
218 work_unit_target_exprs[i]->get_shared_ptr(),
231 return {left_deep_join_tree->
getId()};
236 const std::vector<unsigned>& aggregate,
237 const std::vector<unsigned>& next_result)
const override {
239 std::copy(next_result.begin(), next_result.end(), std::back_inserter(
result));
249 const size_t text_encoding_casts)
250 : text_decoding_casts(text_decoding_casts)
251 , text_encoding_casts(text_encoding_casts) {}
257 const bool disregard_cast_to_none_encoding = disregard_cast_to_none_encoding_;
258 result = aggregateResult(result, visit(u_oper->
get_operand()));
264 if (!operand_ti.is_string() || !casted_ti.is_string()) {
271 if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
274 if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
275 if (!disregard_cast_to_none_encoding) {
287 if (u_oper && u_oper->get_optype() ==
kCAST) {
288 disregard_cast_to_none_encoding_ =
true;
289 result = aggregateResult(result, visitUOper(u_oper));
291 result = aggregateResult(result, visit(like->
get_arg()));
293 result = aggregateResult(result, visit(like->
get_like_expr()));
309 void visitBegin()
const override { disregard_cast_to_none_encoding_ =
false; }
316 mutable bool disregard_cast_to_none_encoding_ =
false;
322 auto check_node_for_text_casts = [&cast_counts](
const Analyzer::Expr* expr) {
327 const auto this_node_cast_counts = visitor.
visit(expr);
332 for (
const auto& qual : ra_exe_unit.
quals) {
333 check_node_for_text_casts(qual.get());
335 for (
const auto& simple_qual : ra_exe_unit.
simple_quals) {
336 check_node_for_text_casts(simple_qual.get());
339 check_node_for_text_casts(groupby_expr.get());
341 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
342 check_node_for_text_casts(target_expr);
344 for (
const auto& join_condition : ra_exe_unit.
join_quals) {
345 for (
const auto& join_qual : join_condition.quals) {
346 check_node_for_text_casts(join_qual.get());
353 const std::vector<InputTableInfo>& query_infos,
358 auto const tuples_upper_bound =
362 [](
auto max,
auto const& query_info) {
363 return std::max(max, query_info.info.getNumTuples());
370 const bool has_text_casts =
371 text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
373 if (!has_text_casts) {
376 std::ostringstream oss;
377 oss <<
"Query requires one or more casts between none-encoded and dictionary-encoded "
378 <<
"strings, and the estimated table size (" << tuples_upper_bound <<
" rows) "
379 <<
"exceeds the configured watchdog none-encoded string translation limit of "
381 throw std::runtime_error(oss.str());
392 !query_for_partial_outer_frag &&
411 auto lock =
executor_->acquireExecuteMutex();
422 CHECK(!ed_seq.empty());
423 if (ed_seq.size() > 1) {
433 auto exec_desc_ptr = ed_seq.getDescriptor(0);
434 CHECK(exec_desc_ptr);
435 auto& exec_desc = *exec_desc_ptr;
436 const auto body = exec_desc.getBody();
441 const auto project =
dynamic_cast<const RelProject*
>(body);
450 const auto compound =
dynamic_cast<const RelCompound*
>(body);
452 if (compound->isDeleteViaSelect()) {
454 }
else if (compound->isUpdateViaSelect()) {
457 if (compound->isAggregate()) {
461 const auto work_unit =
474 const bool just_explain_plan,
481 auto execution_result =
484 constexpr
bool vlog_result_set_summary{
false};
485 if constexpr (vlog_result_set_summary) {
486 VLOG(1) << execution_result.getRows()->summaryToString();
490 VLOG(1) <<
"Running post execution callback.";
491 (*post_execution_callback_)();
493 return execution_result;
503 LOG(
INFO) <<
"Query unable to run in GPU mode, retrying on CPU";
514 const bool just_explain_plan,
518 auto timer_setup =
DEBUG_TIMER(
"Query pre-execution steps");
528 std::string query_session{
""};
529 std::string query_str{
"N/A"};
530 std::string query_submitted_time{
""};
533 query_session =
query_state_->getConstSessionInfo()->get_session_id();
535 query_submitted_time =
query_state_->getQuerySubmittedTime();
538 auto validate_or_explain_query =
540 auto interruptable = !render_info && !query_session.empty() &&
547 std::tie(query_session, query_str) =
executor_->attachExecutorToQuerySession(
548 query_session, query_str, query_submitted_time);
554 query_submitted_time,
555 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
560 [
this, &query_session, &interruptable, &query_submitted_time] {
564 executor_->clearQuerySessionStatus(query_session, query_submitted_time);
568 auto acquire_execute_mutex = [](Executor * executor) ->
auto {
569 auto ret = executor->acquireExecuteMutex();
574 auto lock = acquire_execute_mutex(
executor_);
583 executor_->checkPendingQueryStatus(query_session);
586 throw std::runtime_error(
"Query execution has been interrupted (pending query)");
590 throw std::runtime_error(
"Checking pending query status failed: unknown error");
593 int64_t queue_time_ms =
timer_stop(clock_begin);
606 if (just_explain_plan) {
607 std::stringstream ss;
608 std::vector<const RelAlgNode*> nodes;
609 for (
size_t i = 0; i < ed_seq.size(); i++) {
610 nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
612 size_t ctr_node_id_in_plan = nodes.size();
616 auto node_id_in_plan_tree = ctr_node_id_in_plan--;
617 body->setIdInPlanTree(node_id_in_plan_tree);
619 size_t ctr = nodes.size();
624 const auto index = ctr--;
625 const auto tabs = std::string(tab_ctr++,
'\t');
627 ss << tabs <<
std::to_string(index) <<
" : " << body->toString(config) <<
"\n";
628 if (
auto sort = dynamic_cast<const RelSort*>(body)) {
629 ss << tabs <<
" : " <<
sort->getInput(0)->toString(config) <<
"\n";
631 if (dynamic_cast<const RelProject*>(body) ||
632 dynamic_cast<const RelCompound*>(body)) {
633 if (
auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
634 ss << tabs <<
" : " <<
join->toString(config) <<
"\n";
639 if (!subqueries.empty()) {
642 for (
const auto& subquery : subqueries) {
643 const auto ra = subquery->getRelAlg();
644 ss <<
"\t" << ra->toString(config) <<
"\n";
647 auto rs = std::make_shared<ResultSet>(ss.str());
655 ed_seq, co, eo, render_info, queue_time_ms);
661 const auto subquery_ra = subquery->getRelAlg();
663 if (subquery_ra->hasContextData()) {
670 subquery->setExecutionResult(std::make_shared<ExecutionResult>(
result));
678 return executor_->computeColRangesCache(phys_inputs);
683 return executor_->computeStringDictionaryGenerations(phys_inputs);
688 return executor_->computeTableGenerations(phys_table_ids);
700 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
702 auto sort_node =
dynamic_cast<const RelSort*
>(root_node);
708 RelLeftDeepTreeIdsCollector visitor;
709 auto left_deep_tree_ids = visitor.visit(root_node);
717 const auto source = sort->
getInput(0);
718 if (dynamic_cast<const RelSort*>(source)) {
719 throw std::runtime_error(
"Sort node not supported as input to another sort");
727 const size_t step_idx,
735 const auto sort =
dynamic_cast<const RelSort*
>(exe_desc_ptr->getBody());
737 size_t shard_count{0};
747 source_work_unit.exe_unit, *
executor_->getCatalog());
751 const auto source =
sort->getInput(0);
754 CHECK_EQ(temp_seq.size(), size_t(1));
785 std::make_pair(step_idx, step_idx + 1),
790 merge_type(exe_desc_ptr->getBody()),
791 exe_desc_ptr->getBody()->getId(),
794 VLOG(1) <<
"Running post execution callback.";
795 (*post_execution_callback_)();
812 executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
813 executor_->table_generations_ = table_generations;
814 executor_->agg_col_range_cache_ = agg_col_range;
821 const int64_t queue_time_ms,
822 const bool with_existing_temp_tables) {
825 if (!with_existing_temp_tables) {
836 auto get_descriptor_count = [&seq, &eo]() ->
size_t {
851 const auto exec_desc_count = get_descriptor_count();
865 const auto cached_res =
866 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
872 const auto num_steps = exec_desc_count - 1;
873 for (
size_t i = 0; i < exec_desc_count; i++) {
874 VLOG(1) <<
"Executing query step " << i <<
" / " << num_steps;
877 seq, i, co, eo_copied, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
886 LOG(
INFO) <<
"Retrying current query step " << i <<
" / " << num_steps <<
" on CPU";
888 if (render_info && i == num_steps) {
896 (i == num_steps) ? render_info :
nullptr,
902 auto eo_extern = eo_copied;
903 eo_extern.executor_type = ::ExecutorType::Extern;
905 const auto body = exec_desc_ptr->
getBody();
906 const auto compound =
dynamic_cast<const RelCompound*
>(body);
907 if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
908 LOG(
INFO) <<
"Also failed to run the query using interoperability";
912 seq, i, co, eo_extern, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
921 const std::pair<size_t, size_t> interval,
925 const int64_t queue_time_ms) {
931 for (
size_t i = interval.first; i < interval.second; i++) {
938 (i == interval.second - 1) ? render_info :
nullptr,
948 LOG(
INFO) <<
"Retrying current query step " << i <<
" on CPU";
950 if (render_info && i == interval.second - 1) {
957 (i == interval.second - 1) ? render_info :
nullptr,
966 const size_t step_idx,
970 const int64_t queue_time_ms) {
975 CHECK(exec_desc_ptr);
976 auto& exec_desc = *exec_desc_ptr;
977 const auto body = exec_desc.getBody();
1003 auto handle_hint = [co,
1006 this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1009 auto target_node = body;
1010 if (
auto sort_body = dynamic_cast<const RelSort*>(body)) {
1011 target_node = sort_body->getInput(0);
1014 auto columnar_output_hint_enabled =
false;
1015 auto rowwise_output_hint_enabled =
false;
1018 VLOG(1) <<
"A user forces to run the query on the CPU execution mode";
1023 VLOG(1) <<
"A user enables keeping query resultset but is skipped since data "
1024 "recycler is disabled";
1027 VLOG(1) <<
"A user enables keeping query resultset but is skipped since query "
1028 "resultset recycler is disabled";
1030 VLOG(1) <<
"A user enables keeping query resultset";
1037 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1038 "since data recycler is disabled";
1041 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1042 "since query resultset recycler is disabled";
1044 VLOG(1) <<
"A user enables keeping table function's resultset";
1049 VLOG(1) <<
"A user forces the query to run with columnar output";
1050 columnar_output_hint_enabled =
true;
1052 VLOG(1) <<
"A user forces the query to run with rowwise output";
1053 rowwise_output_hint_enabled =
true;
1056 auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1057 ? !rowwise_output_hint_enabled
1058 : columnar_output_hint_enabled;
1059 if (
g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1060 LOG(
INFO) <<
"Currently, we do not support applying query hint to change query "
1061 "output layout in distributed mode.";
1064 return std::make_pair(co_hint_applied, eo_hint_applied);
1067 auto hint_applied = handle_hint();
1071 if (
auto cached_resultset =
1072 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1073 body->getQueryPlanDagHash())) {
1074 VLOG(1) <<
"recycle resultset of the root node " << body->getRelNodeDagId()
1075 <<
" from resultset cache";
1076 body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1078 std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1079 executor_->getRecultSetRecyclerHolder().getTargetExprs(
1080 body->getQueryPlanDagHash());
1081 std::vector<Analyzer::Expr*> copied_target_exprs;
1082 for (
const auto& expr : cached_target_exprs) {
1083 copied_target_exprs.push_back(expr.get());
1086 *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1088 exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1094 const auto compound =
dynamic_cast<const RelCompound*
>(body);
1096 if (compound->isDeleteViaSelect()) {
1097 executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1098 }
else if (compound->isUpdateViaSelect()) {
1099 executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1102 compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1103 VLOG(3) <<
"Returned from executeCompound(), addTemporaryTable("
1104 <<
static_cast<int>(-compound->getId()) <<
", ...)"
1105 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1106 << exec_desc.getResult().getDataPtr()->rowCount();
1107 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1114 const auto project =
dynamic_cast<const RelProject*
>(body);
1116 if (project->isDeleteViaSelect()) {
1117 executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1118 }
else if (project->isUpdateViaSelect()) {
1119 executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1121 std::optional<size_t> prev_count;
1127 if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1132 const auto& prev_exe_result = prev_exec_desc->getResult();
1133 const auto prev_result = prev_exe_result.getRows();
1135 prev_count = prev_result->rowCount();
1136 VLOG(3) <<
"Setting output row count for projection node to previous node ("
1137 << prev_exec_desc->getBody()->toString(
1139 <<
") to " << *prev_count;
1146 hint_applied.second,
1150 VLOG(3) <<
"Returned from executeProject(), addTemporaryTable("
1151 <<
static_cast<int>(-project->getId()) <<
", ...)"
1152 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1153 << exec_desc.getResult().getDataPtr()->rowCount();
1154 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1161 const auto aggregate =
dynamic_cast<const RelAggregate*
>(body);
1164 aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1168 const auto filter =
dynamic_cast<const RelFilter*
>(body);
1171 filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1175 const auto sort =
dynamic_cast<const RelSort*
>(body);
1178 sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1179 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1186 if (logical_values) {
1188 addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1191 const auto modify =
dynamic_cast<const RelModify*
>(body);
1193 exec_desc.setResult(
executeModify(modify, hint_applied.second));
1196 const auto logical_union =
dynamic_cast<const RelLogicalUnion*
>(body);
1197 if (logical_union) {
1201 hint_applied.second,
1210 table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1214 LOG(
FATAL) <<
"Unhandled body type: "
1221 CHECK(dynamic_cast<const RelAggregate*>(body));
1222 CHECK_EQ(
size_t(1), body->inputCount());
1223 const auto input = body->getInput(0);
1224 body->setOutputMetainfo(input->getOutputMetainfo());
1230 ed.setResult({it->second, input->getOutputMetainfo()});
1240 return synthesized_physical_inputs_owned;
1244 const RexInput* rex_input)
const override {
1247 const auto scan_ra =
dynamic_cast<const RelScan*
>(input_ra);
1251 const auto col_id = rex_input->
getIndex();
1253 if (cd && cd->columnType.get_physical_cols() > 0) {
1255 std::unordered_set<const RexInput*> synthesized_physical_inputs;
1256 for (
auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1257 auto physical_input =
1259 synthesized_physical_inputs_owned.emplace_back(physical_input);
1260 synthesized_physical_inputs.insert(physical_input);
1262 return synthesized_physical_inputs;
1271 const std::unordered_set<const RexInput*>& aggregate,
1272 const std::unordered_set<const RexInput*>& next_result)
const override {
1274 result.insert(next_result.begin(), next_result.end());
1284 if (
auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1287 if (
auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1291 if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1294 auto only_src = ra_node->
getInput(0);
1295 const bool is_join =
dynamic_cast<const RelJoin*
>(only_src) ||
1296 dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1297 return is_join ? only_src : ra_node;
1300 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1304 std::unordered_set<const RexInput*> used_inputs =
1305 filter_expr ? visitor.
visit(filter_expr) : std::unordered_set<const RexInput*>{};
1307 for (
size_t i = 0; i < sources_size; ++i) {
1309 used_inputs.insert(source_inputs.begin(), source_inputs.end());
1311 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1312 return std::make_pair(used_inputs, used_inputs_owned);
1315 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1318 std::unordered_set<const RexInput*> used_inputs;
1319 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1320 const auto source = aggregate->
getInput(0);
1323 CHECK_GE(in_metainfo.size(), group_count);
1324 for (
size_t i = 0; i < group_count; ++i) {
1325 auto synthesized_used_input =
new RexInput(source, i);
1326 used_inputs_owned.emplace_back(synthesized_used_input);
1327 used_inputs.insert(synthesized_used_input);
1329 for (
const auto& agg_expr : aggregate->
getAggExprs()) {
1330 for (
size_t i = 0; i < agg_expr->size(); ++i) {
1331 const auto operand_idx = agg_expr->getOperand(i);
1332 CHECK_GE(in_metainfo.size(),
static_cast<size_t>(operand_idx));
1333 auto synthesized_used_input =
new RexInput(source, operand_idx);
1334 used_inputs_owned.emplace_back(synthesized_used_input);
1335 used_inputs.insert(synthesized_used_input);
1338 return std::make_pair(used_inputs, used_inputs_owned);
1341 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1344 std::unordered_set<const RexInput*> used_inputs;
1345 for (
size_t i = 0; i < project->
size(); ++i) {
1347 used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1349 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1350 return std::make_pair(used_inputs, used_inputs_owned);
1353 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1357 std::unordered_set<const RexInput*> used_inputs;
1360 used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1362 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1363 return std::make_pair(used_inputs, used_inputs_owned);
1366 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1368 std::unordered_set<const RexInput*> used_inputs;
1369 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1371 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1372 const auto source = data_sink_node->getInput(nest_level);
1373 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
1375 CHECK(source->getOutputMetainfo().empty());
1376 for (
size_t i = 0; i < scan_source->size(); ++i) {
1377 auto synthesized_used_input =
new RexInput(scan_source, i);
1378 used_inputs_owned.emplace_back(synthesized_used_input);
1379 used_inputs.insert(synthesized_used_input);
1382 const auto& partial_in_metadata = source->getOutputMetainfo();
1383 for (
size_t i = 0; i < partial_in_metadata.size(); ++i) {
1384 auto synthesized_used_input =
new RexInput(source, i);
1385 used_inputs_owned.emplace_back(synthesized_used_input);
1386 used_inputs.insert(synthesized_used_input);
1390 return std::make_pair(used_inputs, used_inputs_owned);
1393 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1395 std::unordered_set<const RexInput*> used_inputs(logical_union->
inputCount());
1396 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1397 used_inputs_owned.reserve(logical_union->
inputCount());
1398 VLOG(3) <<
"logical_union->inputCount()=" << logical_union->
inputCount();
1399 auto const n_inputs = logical_union->
inputCount();
1400 for (
size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1401 auto input = logical_union->
getInput(nest_level);
1402 for (
size_t i = 0; i < input->size(); ++i) {
1403 used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1404 used_inputs.insert(used_inputs_owned.back().get());
1407 return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1411 const auto scan_ra =
dynamic_cast<const RelScan*
>(ra_node);
1417 return -ra_node->
getId();
1422 const std::vector<size_t>& input_permutation) {
1424 std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1425 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1426 const auto input_node_idx =
1427 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1428 const auto input_ra = data_sink_node->getInput(input_node_idx);
1432 size_t const idx =
dynamic_cast<const RelLogicalUnion*
>(ra_node) ? 0 : input_idx;
1433 const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1434 CHECK(it_ok.second);
1437 <<
" to nest level " << input_idx;
1439 return input_to_nest_level;
1442 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1446 if (
auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1448 const auto condition =
join->getCondition();
1450 auto condition_inputs = visitor.
visit(condition);
1451 std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1453 return std::make_pair(condition_inputs, condition_inputs_owned);
1456 if (
auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1457 CHECK_GE(left_deep_join->inputCount(), 2u);
1458 const auto condition = left_deep_join->getInnerCondition();
1461 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1463 const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1464 if (outer_condition) {
1465 const auto outer_result = visitor.
visit(outer_condition);
1466 result.insert(outer_result.begin(), outer_result.end());
1469 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1470 return std::make_pair(
result, used_inputs_owned);
1473 if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1476 }
else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1484 return std::make_pair(std::unordered_set<const RexInput*>{},
1485 std::vector<std::shared_ptr<RexInput>>{});
1489 std::vector<InputDescriptor>& input_descs,
1491 std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1493 const std::unordered_set<const RexInput*>& source_used_inputs,
1494 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1496 <<
" input_col_descs_unique.size()=" << input_col_descs_unique.size()
1497 <<
" source_used_inputs.size()=" << source_used_inputs.size();
1498 for (
const auto used_input : source_used_inputs) {
1499 const auto input_ra = used_input->getSourceNode();
1501 const auto col_id = used_input->getIndex();
1502 auto it = input_to_nest_level.find(input_ra);
1503 if (it != input_to_nest_level.end()) {
1504 const int input_desc = it->second;
1505 input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1506 dynamic_cast<const RelScan*>(input_ra)
1511 }
else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1512 throw std::runtime_error(
"Bushy joins not supported");
1518 std::pair<std::vector<InputDescriptor>,
1519 std::list<std::shared_ptr<const InputColDescriptor>>>
1521 const std::unordered_set<const RexInput*>& used_inputs,
1522 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1523 const std::vector<size_t>& input_permutation,
1525 std::vector<InputDescriptor> input_descs;
1527 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1528 const auto input_node_idx =
1529 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1530 auto input_ra = data_sink_node->getInput(input_node_idx);
1532 input_descs.emplace_back(table_id, input_idx);
1534 std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1537 input_col_descs_unique,
1540 input_to_nest_level);
1541 std::unordered_set<const RexInput*> join_source_used_inputs;
1542 std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1543 std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1547 input_col_descs_unique,
1549 join_source_used_inputs,
1550 input_to_nest_level);
1551 std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1552 input_col_descs_unique.begin(), input_col_descs_unique.end());
1555 input_col_descs.end(),
1556 [](std::shared_ptr<const InputColDescriptor>
const& lhs,
1557 std::shared_ptr<const InputColDescriptor>
const& rhs) {
1558 return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1560 lhs->getScanDesc().getTableId()) <
1561 std::make_tuple(rhs->getScanDesc().getNestLevel(),
1563 rhs->getScanDesc().getTableId());
1565 return {input_descs,
1566 std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1567 input_col_descs.end())};
1571 std::tuple<std::vector<InputDescriptor>,
1572 std::list<std::shared_ptr<const InputColDescriptor>>,
1573 std::vector<std::shared_ptr<RexInput>>>
1575 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1576 const std::vector<size_t>& input_permutation,
1578 std::unordered_set<const RexInput*> used_inputs;
1579 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1580 std::tie(used_inputs, used_inputs_owned) =
get_used_inputs(ra_node, cat);
1581 VLOG(3) <<
"used_inputs.size() = " << used_inputs.size();
1583 ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1584 return std::make_tuple(
1585 input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1593 return project->
size();
1613 const std::shared_ptr<Analyzer::Expr> expr) {
1614 const auto& ti = expr->get_type_info();
1618 auto transient_dict_ti = ti;
1621 transient_dict_ti.set_fixed_size();
1622 return expr->add_cast(transient_dict_ti);
1626 std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1627 const std::shared_ptr<Analyzer::Expr>& expr) {
1631 scalar_sources.push_back(
fold_expr(expr.get()));
1636 const std::shared_ptr<Analyzer::Expr>& input) {
1637 const auto& input_ti = input->get_type_info();
1638 if (input_ti.is_string() && input_ti.get_compression() ==
kENCODING_DICT) {
1649 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1651 VLOG(3) <<
"get_scalar_sources_size("
1653 <<
") = " << scalar_sources_size;
1654 for (
size_t i = 0; i < scalar_sources_size; ++i) {
1655 const auto scalar_rex =
scalar_at(i, ra_node);
1656 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1662 const auto scalar_expr =
1664 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1668 scalar_sources.push_back(
fold_expr(rewritten_expr.get()));
1674 return scalar_sources;
1684 size_t starting_projection_column_idx) {
1685 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1687 const auto scalar_rex =
scalar_at(i, ra_node);
1688 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1694 std::shared_ptr<Analyzer::Expr> translated_expr;
1699 colNames[i - starting_projection_column_idx]);
1704 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1708 return scalar_sources;
1713 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1717 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1718 for (
size_t group_idx = 0; group_idx < compound->
getGroupByCount(); ++group_idx) {
1721 return groupby_exprs;
1726 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1727 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1728 for (
size_t group_idx = 0; group_idx < aggregate->
getGroupByCount(); ++group_idx) {
1731 return groupby_exprs;
1737 const auto filter_expr =
1749 if (agg_expr->get_is_distinct()) {
1750 SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1752 target_expr = target_expr->deep_copy();
1761 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1762 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1763 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1767 std::vector<Analyzer::Expr*> target_exprs;
1768 for (
size_t i = 0; i < compound->
size(); ++i) {
1770 const auto target_rex_agg =
dynamic_cast<const RexAgg*
>(target_rex);
1771 std::shared_ptr<Analyzer::Expr> target_expr;
1772 if (target_rex_agg) {
1777 const auto target_rex_scalar =
dynamic_cast<const RexScalar*
>(target_rex);
1778 const auto target_rex_ref =
dynamic_cast<const RexRef*
>(target_rex_scalar);
1779 if (target_rex_ref) {
1780 const auto ref_idx = target_rex_ref->
getIndex();
1782 CHECK_LE(ref_idx, groupby_exprs.size());
1783 const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1788 target_expr =
fold_expr(rewritten_expr.get());
1801 target_exprs_owned.push_back(target_expr);
1802 target_exprs.push_back(target_expr.get());
1804 return target_exprs;
1808 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1809 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1810 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1813 std::vector<Analyzer::Expr*> target_exprs;
1814 size_t group_key_idx = 1;
1815 for (
const auto& groupby_expr : groupby_exprs) {
1818 target_exprs_owned.push_back(target_expr);
1819 target_exprs.push_back(target_expr.get());
1822 for (
const auto& target_rex_agg : aggregate->
getAggExprs()) {
1826 target_expr =
fold_expr(target_expr.get());
1827 target_exprs_owned.push_back(target_expr);
1828 target_exprs.push_back(target_expr.get());
1830 return target_exprs;
1840 if (agg_expr && agg_expr->get_contains_agg()) {
1853 }
else if (
is_agg(&expr)) {
1862 const std::vector<Analyzer::Expr*>& target_exprs) {
1863 std::vector<TargetMetaInfo> targets_meta;
1864 CHECK_EQ(ra_node->size(), target_exprs.size());
1865 for (
size_t i = 0; i < ra_node->size(); ++i) {
1866 CHECK(target_exprs[i]);
1868 targets_meta.emplace_back(ra_node->getFieldName(i),
1870 target_exprs[i]->get_type_info());
1872 return targets_meta;
1878 const std::vector<Analyzer::Expr*>& target_exprs) {
1880 if (
auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1882 }
else if (
auto const* input = dynamic_cast<RelProject const*>(input0)) {
1884 }
else if (
auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1886 }
else if (
auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1888 }
else if (
auto const* input = dynamic_cast<RelScan const*>(input0)) {
1901 const int64_t queue_time_ms) {
1909 auto execute_update_for_node = [
this, &co, &eo_in](
const auto node,
1911 const bool is_aggregate) {
1912 auto table_descriptor = node->getModifiedTableDescriptor();
1913 CHECK(table_descriptor);
1914 if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1915 throw std::runtime_error(
1916 "UPDATE queries involving variable length columns are only supported on tables "
1917 "with the vacuum attribute set to 'delayed'");
1922 auto updated_table_desc = node->getModifiedTableDescriptor();
1924 std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1925 node->getTargetColumns(),
1926 node->getOutputMetainfo(),
1927 node->isVarlenUpdateRequired());
1931 auto execute_update_ra_exe_unit =
1932 [
this, &co, &eo_in, &table_infos, &updated_table_desc](
1946 CHECK(update_transaction_parameters);
1949 auto table_update_metadata =
1963 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1972 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
1976 auto update_transaction_params =
1978 CHECK(update_transaction_params);
1979 const auto td = update_transaction_params->getTableDescriptor();
1981 const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1982 if (update_column_names.size() > 1) {
1983 throw std::runtime_error(
1984 "Multi-column update is not yet supported for temporary tables.");
1989 auto projected_column_to_update =
1990 makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1991 const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1992 work_unit.exe_unit, projected_column_to_update);
1993 if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1994 throw std::runtime_error(
1995 "Variable length updates not yet supported on temporary tables.");
1997 execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1999 execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2003 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2007 execute_update_for_node(compound, work_unit, compound->isAggregate());
2008 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2012 if (project->isSimple()) {
2013 CHECK_EQ(
size_t(1), project->inputCount());
2014 const auto input_ra = project->getInput(0);
2015 if (dynamic_cast<const RelSort*>(input_ra)) {
2016 const auto& input_table =
2019 work_unit.exe_unit.scan_limit = input_table->rowCount();
2023 execute_update_for_node(project, work_unit,
false);
2025 throw std::runtime_error(
"Unsupported parent node for update: " +
2033 const int64_t queue_time_ms) {
2037 auto execute_delete_for_node = [
this, &co, &eo_in](
const auto node,
2039 const bool is_aggregate) {
2040 auto* table_descriptor = node->getModifiedTableDescriptor();
2041 CHECK(table_descriptor);
2042 if (!table_descriptor->hasDeletedCol) {
2043 throw std::runtime_error(
2044 "DELETE queries are only supported on tables with the vacuum attribute set to "
2052 auto execute_delete_ra_exe_unit =
2053 [
this, &table_infos, &table_descriptor, &eo_in, &co](
const auto& exe_unit,
2054 const bool is_aggregate) {
2056 std::make_unique<DeleteTransactionParameters>(table_descriptor);
2059 CHECK(delete_params);
2069 CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2073 auto table_update_metadata =
2087 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2095 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2098 auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2099 cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2100 const auto rewritten_exe_unit =
2101 query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2102 execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2104 execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2108 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2109 const auto work_unit =
2111 execute_delete_for_node(compound, work_unit, compound->isAggregate());
2112 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2115 if (project->isSimple()) {
2116 CHECK_EQ(
size_t(1), project->inputCount());
2117 const auto input_ra = project->getInput(0);
2118 if (dynamic_cast<const RelSort*>(input_ra)) {
2119 const auto& input_table =
2122 work_unit.exe_unit.scan_limit = input_table->rowCount();
2125 execute_delete_for_node(project, work_unit,
false);
2127 throw std::runtime_error(
"Unsupported parent node for delete: " +
2136 const int64_t queue_time_ms) {
2138 const auto work_unit =
2154 const int64_t queue_time_ms) {
2185 const int64_t queue_time_ms,
2186 const std::optional<size_t> previous_count) {
2192 const auto input_ra = project->
getInput(0);
2193 if (dynamic_cast<const RelSort*>(input_ra)) {
2195 const auto& input_table =
2198 work_unit.exe_unit.scan_limit =
2199 std::min(input_table->getLimit(), input_table->rowCount());
2215 const int64_t queue_time_ms) {
2222 throw std::runtime_error(
"Table functions not supported in distributed mode yet");
2225 throw std::runtime_error(
"Table function support is disabled");
2231 const auto body = table_func_work_unit.body;
2234 const auto table_infos =
2249 auto cached_resultset =
2250 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2252 if (cached_resultset) {
2253 VLOG(1) <<
"recycle table function's resultset of the root node "
2255 result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2264 table_func_work_unit.exe_unit, table_infos, co, eo,
cat_),
2265 body->getOutputMetainfo()};
2269 throw std::runtime_error(
"Table function ran out of memory during execution");
2271 auto query_exec_time =
timer_stop(query_exec_time_begin);
2272 result.setQueueTime(queue_time_ms);
2273 auto resultset_ptr =
result.getDataPtr();
2274 auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2276 resultset_ptr->getBufferSizeBytes(co.
device_type) <=
2279 if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2281 resultset_ptr->setExecTime(query_exec_time);
2282 resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2283 resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2285 resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2286 if (allow_auto_caching_resultset) {
2287 VLOG(1) <<
"Automatically keep table function's query resultset to recycler";
2289 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2290 table_func_work_unit.exe_unit.query_plan_dag_hash,
2291 resultset_ptr->getInputTableKeys(),
2293 resultset_ptr->getBufferSizeBytes(co.
device_type),
2298 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since we do not "
2299 "support resultset recycling on distributed mode";
2301 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2302 "has union-(all) operator";
2304 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2305 "is either validate or explain query";
2307 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored";
2324 std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2325 for (
const auto& element : tuple->getTuple()) {
2328 return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2332 throw std::runtime_error(
"Only columns supported in the window partition for now");
2334 return makeExpr<Analyzer::ColumnVar>(
2335 col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2344 const int64_t queue_time_ms) {
2346 CHECK_EQ(query_infos.size(), size_t(1));
2347 if (query_infos.front().info.fragments.size() != 1) {
2348 throw std::runtime_error(
2349 "Only single fragment tables supported for window functions for now");
2354 query_infos.push_back(query_infos.front());
2363 std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2364 std::unordered_map<QueryPlanHash, std::unique_ptr<int64_t[]>> sorted_partition_cache;
2365 std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2366 std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2367 window_function_context_map;
2377 std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2378 if (partition_keys.size() >= 1) {
2379 std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2380 if (partition_keys.size() > 1) {
2381 partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2383 CHECK_EQ(partition_keys.size(), size_t(1));
2384 partition_key_tuple = partition_keys.front();
2387 partition_key_cond =
2388 makeExpr<Analyzer::BinOper>(
kBOOLEAN,
2391 partition_key_tuple,
2396 partition_key_cond ,
2398 sorted_partition_key_ref_count_map,
2404 CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2407 for (
auto& kv : window_function_context_map) {
2408 kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2409 window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2415 const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2416 std::unordered_map<
QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2417 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2419 const std::vector<InputTableInfo>& query_infos,
2422 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2423 const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2427 std::unique_ptr<WindowFunctionContext> context;
2429 if (partition_key_cond) {
2430 auto partition_cond_str = partition_key_cond->toString();
2431 auto partition_key_hash = boost::hash_value(partition_cond_str);
2432 boost::hash_combine(partition_cache_key, partition_key_hash);
2433 std::shared_ptr<HashJoin> partition_ptr;
2434 auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2435 if (cached_hash_table_it != partition_cache.end()) {
2436 partition_ptr = cached_hash_table_it->second;
2437 VLOG(1) <<
"Reuse a hash table to compute window function context (key: "
2438 << partition_cache_key <<
", partition condition: " << partition_cond_str
2441 const auto hash_table_or_err =
executor_->buildHashTableForQualifier(
2451 if (!hash_table_or_err.fail_reason.empty()) {
2452 throw std::runtime_error(hash_table_or_err.fail_reason);
2455 partition_ptr = hash_table_or_err.hash_table;
2456 CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2458 VLOG(1) <<
"Put a generated hash table for computing window function context to "
2460 << partition_cache_key <<
", partition condition: " << partition_cond_str
2463 CHECK(partition_ptr);
2464 context = std::make_unique<WindowFunctionContext>(window_func,
2465 partition_cache_key,
2471 context = std::make_unique<WindowFunctionContext>(
2472 window_func, elem_count, co.
device_type, row_set_mem_owner);
2475 if (!order_keys.empty()) {
2476 auto sorted_partition_cache_key = partition_cache_key;
2477 for (
auto& order_key : order_keys) {
2478 boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2481 boost::hash_combine(sorted_partition_cache_key, collation.toString());
2483 context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2484 auto cache_key_cnt_it =
2485 sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2486 if (!cache_key_cnt_it.second) {
2487 sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2488 cache_key_cnt_it.first->second + 1;
2491 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2492 for (
const auto& order_key : order_keys) {
2493 const auto order_col =
2496 throw std::runtime_error(
"Only order by columns supported for now");
2498 const int8_t* column;
2499 size_t join_col_elem_count;
2500 std::tie(column, join_col_elem_count) =
2503 query_infos.front().info.fragments.front(),
2511 CHECK_EQ(join_col_elem_count, elem_count);
2512 context->addOrderColumn(column, order_col.get(), chunks_owner);
2521 const int64_t queue_time_ms) {
2523 const auto work_unit =
2526 work_unit, filter->
getOutputMetainfo(),
false, co, eo, render_info, queue_time_ms);
2530 std::vector<TargetMetaInfo>
const& rhs) {
2531 if (lhs.size() == rhs.size()) {
2532 for (
size_t i = 0; i < lhs.size(); ++i) {
2533 if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2551 const int64_t queue_time_ms) {
2553 if (!logical_union->
isAll()) {
2554 throw std::runtime_error(
"UNION without ALL is not supported yet.");
2560 throw std::runtime_error(
"UNION does not support subqueries with geo-columns.");
2583 for (
size_t i = 0; i < tuple_type.size(); ++i) {
2584 auto& target_meta_info = tuple_type[i];
2585 if (target_meta_info.get_type_info().is_varlen()) {
2586 throw std::runtime_error(
"Variable length types not supported in VALUES yet.");
2588 if (target_meta_info.get_type_info().get_type() ==
kNULLT) {
2594 {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2598 std::vector<TargetInfo> target_infos;
2599 for (
const auto& tuple_type_component : tuple_type) {
2602 tuple_type_component.get_type_info(),
2609 std::shared_ptr<ResultSet> rs{
2618 return {rs, tuple_type};
2625 const std::string& columnName,
2636 CHECK(dd && dd->stringDict);
2637 int32_t str_id = dd->stringDict->getOrAdd(str);
2638 if (!dd->dictIsTemp) {
2639 const auto checkpoint_ok = dd->stringDict->checkpoint();
2640 if (!checkpoint_ok) {
2641 throw std::runtime_error(
"Failed to checkpoint dictionary for column " +
2645 const bool invalid = str_id > max_valid_int_value<T>();
2646 if (invalid || str_id == inline_int_null_value<int32_t>()) {
2648 LOG(
ERROR) <<
"Could not encode string: " << str
2649 <<
", the encoded value doesn't fit in " <<
sizeof(
T) * 8
2650 <<
" bits. Will store NULL instead.";
2673 throw std::runtime_error(
"EXPLAIN not supported for ModifyTable");
2684 std::vector<TargetMetaInfo> empty_targets;
2685 return {rs, empty_targets};
2700 size_t rows_number = values_lists.size();
2704 size_t rows_per_leaf = rows_number;
2705 if (td->nShards == 0) {
2707 ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2709 auto max_number_of_rows_per_package =
2710 std::max(
size_t(1), std::min(rows_per_leaf,
size_t(64 * 1024)));
2712 std::vector<const ColumnDescriptor*> col_descriptors;
2713 std::vector<int> col_ids;
2714 std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2715 std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2716 std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2717 std::unordered_map<int, int> sequential_ids;
2719 for (
const int col_id : col_id_list) {
2721 const auto col_enc = cd->columnType.get_compression();
2722 if (cd->columnType.is_string()) {
2726 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2727 CHECK(it_ok.second);
2733 const auto it_ok = col_buffers.emplace(
2735 std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2736 max_number_of_rows_per_package));
2737 CHECK(it_ok.second);
2743 }
else if (cd->columnType.is_geometry()) {
2745 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2746 CHECK(it_ok.second);
2747 }
else if (cd->columnType.is_array()) {
2749 arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2750 CHECK(it_ok.second);
2752 const auto it_ok = col_buffers.emplace(
2754 std::unique_ptr<uint8_t[]>(
new uint8_t[cd->columnType.get_logical_size() *
2755 max_number_of_rows_per_package]()));
2756 CHECK(it_ok.second);
2758 col_descriptors.push_back(cd);
2759 sequential_ids[col_id] = col_ids.size();
2760 col_ids.push_back(col_id);
2765 auto table_key = boost::hash_value(table_chunk_key_prefix);
2769 size_t start_row = 0;
2770 size_t rows_left = rows_number;
2771 while (rows_left != 0) {
2773 for (
const auto& kv : col_buffers) {
2774 memset(kv.second.get(), 0, max_number_of_rows_per_package);
2776 for (
auto& kv : str_col_buffers) {
2779 for (
auto& kv : arr_col_buffers) {
2783 auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2788 for (
size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2789 const auto& values_list = values_lists[row_idx + start_row];
2790 for (
size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2791 CHECK(values_list.size() == col_descriptors.size());
2796 dynamic_cast<const Analyzer::UOper*
>(values_list[col_idx]->get_expr());
2802 const auto cd = col_descriptors[col_idx];
2803 auto col_datum = col_cv->get_constval();
2804 auto col_type = cd->columnType.get_type();
2805 uint8_t* col_data_bytes{
nullptr};
2806 if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2807 (!cd->columnType.is_string() ||
2809 const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2810 CHECK(col_data_bytes_it != col_buffers.end());
2811 col_data_bytes = col_data_bytes_it->second.get();
2815 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
2816 auto null_bool_val =
2818 col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2820 : (col_datum.boolval ? 1 : 0);
2824 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
2825 col_data[row_idx] = col_cv->get_is_null()
2827 : col_datum.tinyintval;
2831 auto col_data =
reinterpret_cast<int16_t*
>(col_data_bytes);
2832 col_data[row_idx] = col_cv->get_is_null()
2834 : col_datum.smallintval;
2838 auto col_data =
reinterpret_cast<int32_t*
>(col_data_bytes);
2839 col_data[row_idx] = col_cv->get_is_null()
2847 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
2848 col_data[row_idx] = col_cv->get_is_null()
2850 : col_datum.bigintval;
2854 auto col_data =
reinterpret_cast<float*
>(col_data_bytes);
2855 col_data[row_idx] = col_datum.floatval;
2859 auto col_data =
reinterpret_cast<double*
>(col_data_bytes);
2860 col_data[row_idx] = col_datum.doubleval;
2866 switch (cd->columnType.get_compression()) {
2868 str_col_buffers[col_ids[col_idx]].push_back(
2869 col_datum.stringval ? *col_datum.stringval :
"");
2872 switch (cd->columnType.get_size()) {
2875 &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
2882 &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
2889 &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
2907 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
2908 col_data[row_idx] = col_cv->get_is_null()
2910 : col_datum.bigintval;
2914 const auto is_null = col_cv->get_is_null();
2915 const auto size = cd->columnType.get_size();
2918 const auto is_point_coords =
2920 if (
is_null && !is_point_coords) {
2924 for (int8_t* p = buf + elem_ti.
get_size(); (p - buf) < size;
2926 put_null(static_cast<void*>(p), elem_ti,
"");
2928 arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf,
is_null);
2930 arr_col_buffers[col_ids[col_idx]].emplace_back(0,
nullptr,
is_null);
2934 const auto l = col_cv->get_value_list();
2935 size_t len = l.size() * elem_ti.
get_size();
2936 if (size > 0 && static_cast<size_t>(size) != len) {
2937 throw std::runtime_error(
"Array column " + cd->columnName +
" expects " +
2939 " values, " +
"received " +
2947 int32_t* p =
reinterpret_cast<int32_t*
>(buf);
2954 &p[elemIndex], cd->columnName, elem_ti, c.get(),
cat_);
2976 str_col_buffers[col_ids[col_idx]].push_back(
2977 col_datum.stringval ? *col_datum.stringval :
"");
2984 start_row += package_size;
2985 rows_left -= package_size;
2989 insert_data.
tableId = table_id;
2990 insert_data.
data.resize(col_ids.size());
2992 for (
const auto& kv : col_buffers) {
2994 p.
numbersPtr =
reinterpret_cast<int8_t*
>(kv.second.get());
2995 insert_data.
data[sequential_ids[kv.first]] = p;
2997 for (
auto& kv : str_col_buffers) {
3000 insert_data.
data[sequential_ids[kv.first]] = p;
3002 for (
auto& kv : arr_col_buffers) {
3005 insert_data.
data[sequential_ids[kv.first]] = p;
3007 insert_data.
numRows = package_size;
3019 std::vector<TargetMetaInfo> empty_targets;
3020 return {rs, empty_targets};
3026 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
3030 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
3031 return (compound && compound->isAggregate()) ? 0 : limit;
3035 return !order_entries.empty() && order_entries.front().is_desc;
3044 const int64_t queue_time_ms) {
3047 const auto source = sort->
getInput(0);
3054 executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3057 auto& aggregated_result = it->second;
3058 auto& result_rows = aggregated_result.rs;
3059 const size_t limit = sort->
getLimit();
3060 const size_t offset = sort->
getOffset();
3061 if (limit || offset) {
3062 if (!order_entries.empty()) {
3063 result_rows->sort(order_entries, limit + offset,
executor_);
3065 result_rows->dropFirstN(offset);
3067 result_rows->keepFirstN(limit);
3077 source_work_unit.exe_unit.target_exprs,
3078 aggregated_result.targets_meta);
3087 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3088 bool is_desc{
false};
3089 bool use_speculative_top_n_sort{
false};
3091 auto execute_sort_query = [
this,
3103 const size_t limit = sort->
getLimit();
3104 const size_t offset = sort->
getOffset();
3106 auto source_node = sort->
getInput(0);
3109 auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3113 if (
auto cached_resultset =
3114 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3115 source_query_plan_dag)) {
3116 CHECK(cached_resultset->canUseSpeculativeTopNSort());
3117 VLOG(1) <<
"recycle resultset of the root node " << source_node->getRelNodeDagId()
3118 <<
" from resultset cache";
3120 ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3124 use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3126 source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3130 if (!source_result.getDataPtr()) {
3138 eo.allow_loop_joins,
3142 eo.with_dynamic_watchdog,
3143 eo.dynamic_watchdog_time_limit,
3144 eo.find_push_down_candidates,
3145 eo.just_calcite_explain,
3146 eo.gpu_input_mem_limit_percent,
3147 eo.allow_runtime_query_interrupt,
3148 eo.running_query_interrupt_freq,
3149 eo.pending_query_interrupt_freq,
3153 groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3155 source->getOutputMetainfo(),
3161 use_speculative_top_n_sort =
3162 source_result.
getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3164 source_result.getRows()->getQueryMemDesc());
3166 if (render_info && render_info->isPotentialInSituRender()) {
3167 return source_result;
3169 if (source_result.isFilterPushDownEnabled()) {
3170 return source_result;
3172 auto rows_to_sort = source_result.getRows();
3173 if (eo.just_explain) {
3174 return {rows_to_sort, {}};
3176 if (sort->
collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3177 !use_speculative_top_n_sort) {
3178 const size_t top_n = limit == 0 ? 0 : limit + offset;
3179 rows_to_sort->sort(order_entries, top_n,
executor_);
3181 if (limit || offset) {
3183 if (offset >= rows_to_sort->rowCount()) {
3184 rows_to_sort->dropFirstN(offset);
3186 rows_to_sort->keepFirstN(limit + offset);
3189 rows_to_sort->dropFirstN(offset);
3191 rows_to_sort->keepFirstN(limit);
3195 return {rows_to_sort, source_result.getTargetsMeta()};
3199 return execute_sort_query();
3201 CHECK_EQ(
size_t(1), groupby_exprs.size());
3202 CHECK(groupby_exprs.front());
3204 return execute_sort_query();
3210 std::list<Analyzer::OrderEntry>& order_entries,
3212 const auto source = sort->
getInput(0);
3213 const size_t limit = sort->
getLimit();
3214 const size_t offset = sort->
getOffset();
3216 const size_t scan_total_limit =
3218 size_t max_groups_buffer_entry_guess{
3224 const auto& source_exe_unit = source_work_unit.exe_unit;
3227 for (
auto order_entry : order_entries) {
3229 const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3231 if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3232 throw std::runtime_error(
3233 "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3237 if (source_exe_unit.groupby_exprs.size() == 1) {
3238 if (!source_exe_unit.groupby_exprs.front()) {
3252 std::move(source_exe_unit.input_col_descs),
3253 source_exe_unit.simple_quals,
3254 source_exe_unit.quals,
3255 source_exe_unit.join_quals,
3256 source_exe_unit.groupby_exprs,
3257 source_exe_unit.target_exprs,
3259 {sort_info.order_entries,
3263 sort_info.limit_delivered},
3265 source_exe_unit.query_hint,
3266 source_exe_unit.query_plan_dag_hash,
3267 source_exe_unit.hash_table_build_plan_dag,
3268 source_exe_unit.table_id_to_node_map,
3269 source_exe_unit.use_bump_allocator,
3270 source_exe_unit.union_all,
3271 source_exe_unit.query_state},
3273 max_groups_buffer_entry_guess,
3274 std::move(source_work_unit.query_rewriter),
3275 source_work_unit.input_permutation,
3276 source_work_unit.left_deep_join_input_sizes};
3288 CHECK(!table_infos.empty());
3289 const auto& first_table = table_infos.front();
3290 size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3291 for (
const auto& table_info : table_infos) {
3292 if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3293 max_num_groups = table_info.info.getNumTuplesUpperBound();
3296 return std::max(max_num_groups,
size_t(1));
3316 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
3319 if (target_expr->get_type_info().is_varlen()) {
3323 if (
auto top_project = dynamic_cast<const RelProject*>(body)) {
3324 if (top_project->isRowwiseOutputForced()) {
3343 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
3344 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3356 return !(ra_exe_unit.
quals.empty() && ra_exe_unit.
join_quals.empty() &&
3362 const std::vector<InputTableInfo>& table_infos,
3363 const Executor* executor,
3365 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3367 for (
size_t i = 0; i < ra_exe_unit.
target_exprs.size(); ++i) {
3373 CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3376 const auto& arg_ti = arg->get_type_info();
3382 if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3383 (arg_ti.is_string() && arg_ti.get_compression() ==
kENCODING_DICT))) {
3394 const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3395 const auto sub_bitmap_count =
3397 int64_t approx_bitmap_sz_bits{0};
3398 const auto error_rate =
static_cast<Analyzer::AggExpr*
>(target_expr)->get_arg1();
3400 CHECK(error_rate->get_type_info().get_type() ==
kINT);
3401 CHECK_GE(error_rate->get_constval().intval, 1);
3407 arg_range.getIntMin(),
3408 approx_bitmap_sz_bits,
3413 arg_range.getIntMin(),
3418 if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3419 precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3420 auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3422 target_exprs_owned.push_back(precise_count_distinct);
3423 ra_exe_unit.
target_exprs[i] = precise_count_distinct.get();
3440 const std::vector<TargetMetaInfo>& targets_meta,
3445 const int64_t queue_time_ms,
3446 const std::optional<size_t> previous_count) {
3459 throw std::runtime_error(
"Window functions support is disabled");
3463 computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3475 const auto body = work_unit.
body;
3478 VLOG(3) <<
"body->getId()=" << body->getId()
3480 <<
" it==leaf_results_.end()=" << (it ==
leaf_results_.end());
3484 auto& aggregated_result = it->second;
3485 auto& result_rows = aggregated_result.rs;
3487 body->setOutputMetainfo(aggregated_result.targets_meta);
3501 auto candidate =
query_dag_->getQueryHint(body);
3503 ra_exe_unit.query_hint = *candidate;
3508 CHECK_EQ(table_infos.size(), size_t(1));
3509 CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3510 max_groups_buffer_entry_guess =
3511 table_infos.front().info.fragments.front().getNumTuples();
3512 ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3515 ra_exe_unit.scan_limit = *previous_count;
3519 ra_exe_unit.scan_limit = 0;
3520 ra_exe_unit.use_bump_allocator =
true;
3522 ra_exe_unit.scan_limit = 0;
3525 if (filter_count_all) {
3526 ra_exe_unit.scan_limit = std::max(*filter_count_all,
size_t(1));
3537 VLOG(1) <<
"Using columnar layout for projection as output size of "
3538 << ra_exe_unit.scan_limit <<
" rows exceeds threshold of "
3555 auto execute_and_handle_errors = [&](
const auto max_groups_buffer_entry_guess_in,
3556 const bool has_cardinality_estimation,
3561 auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3563 return {
executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3571 has_cardinality_estimation,
3580 {ra_exe_unit, work_unit.
body, local_groups_buffer_entry_guess},
3592 for (
const auto& table_info : table_infos) {
3594 if (td && (td->isTemporaryTable() || td->isView)) {
3595 use_resultset_cache =
false;
3597 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has either "
3598 "temporary table or view";
3605 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3606 auto card = cached_cardinality.second;
3607 if (cached_cardinality.first && card >= 0) {
3608 result = execute_and_handle_errors(
3611 result = execute_and_handle_errors(
3612 max_groups_buffer_entry_guess,
3618 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3619 auto card = cached_cardinality.second;
3620 if (cached_cardinality.first && card >= 0) {
3621 result = execute_and_handle_errors(card,
true,
true);
3623 const auto ndv_groups_estimation =
3625 const auto estimated_groups_buffer_entry_guess =
3626 ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3629 CHECK_GT(estimated_groups_buffer_entry_guess,
size_t(0));
3630 result = execute_and_handle_errors(
3631 estimated_groups_buffer_entry_guess,
true,
true);
3633 executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3638 result.setQueueTime(queue_time_ms);
3643 return {std::make_shared<ResultSet>(
3647 ?
executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3653 for (
auto& target_info :
result.getTargetsMeta()) {
3654 if (target_info.get_type_info().is_string() &&
3655 !target_info.get_type_info().is_dict_encoded_string()) {
3657 use_resultset_cache =
false;
3659 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has non-encoded "
3660 "string column projection";
3666 auto allow_auto_caching_resultset =
3669 if (use_resultset_cache && (eo.
keep_result || allow_auto_caching_resultset) &&
3671 auto query_exec_time =
timer_stop(query_exec_time_begin);
3672 res->setExecTime(query_exec_time);
3673 res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3674 res->setTargetMetaInfo(body->getOutputMetainfo());
3676 res->setInputTableKeys(std::move(input_table_keys));
3677 if (allow_auto_caching_resultset) {
3678 VLOG(1) <<
"Automatically keep query resultset to recycler";
3680 res->setUseSpeculativeTopNSort(
3682 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3683 ra_exe_unit.query_plan_dag_hash,
3684 res->getInputTableKeys(),
3691 VLOG(1) <<
"Query hint \'keep_result\' is ignored since we do not support "
3692 "resultset recycling on distributed mode";
3694 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has union-(all) "
3697 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is classified as "
3698 "a in-situ rendering query";
3700 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is either "
3701 "validate or explain query";
3703 VLOG(1) <<
"Query hint \'keep_result\' is ignored";
3721 const auto count_all_exe_unit =
3743 }
catch (
const std::exception& e) {
3744 LOG(
WARNING) <<
"Failed to run pre-flight filtered count with error " << e.what();
3745 return std::nullopt;
3747 const auto count_row = count_all_result->getNextRow(
false,
false);
3748 CHECK_EQ(
size_t(1), count_row.size());
3749 const auto& count_tv = count_row.front();
3750 const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3751 CHECK(count_scalar_tv);
3752 const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3755 auto count_upper_bound =
static_cast<size_t>(*count_ptr);
3756 return std::max(count_upper_bound,
size_t(1));
3760 const auto& ra_exe_unit = work_unit.
exe_unit;
3761 if (ra_exe_unit.input_descs.size() != 1) {
3764 const auto& table_desc = ra_exe_unit.
input_descs.front();
3768 const int table_id = table_desc.getTableId();
3769 for (
const auto& simple_qual : ra_exe_unit.simple_quals) {
3770 const auto comp_expr =
3772 if (!comp_expr || comp_expr->get_optype() !=
kEQ) {
3777 if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3780 const auto rhs = comp_expr->get_right_operand();
3786 if (cd->isVirtualCol) {
3796 const std::vector<TargetMetaInfo>& targets_meta,
3801 const bool was_multifrag_kernel_launch,
3802 const int64_t queue_time_ms) {
3807 auto ra_exe_unit_in = work_unit.
exe_unit;
3840 if (was_multifrag_kernel_launch) {
3844 LOG(
WARNING) <<
"Multifrag query ran out of memory, retrying with multifragment "
3845 "kernels disabled.";
3860 result.setQueueTime(queue_time_ms);
3863 LOG(
WARNING) <<
"Kernel per fragment query ran out of memory, retrying on CPU.";
3877 VLOG(1) <<
"Resetting max groups buffer entry guess.";
3878 max_groups_buffer_entry_guess = 0;
3880 int iteration_ctr = -1;
3904 CHECK(max_groups_buffer_entry_guess);
3908 throw std::runtime_error(
"Query ran out of output slots in the result");
3910 max_groups_buffer_entry_guess *= 2;
3911 LOG(
WARNING) <<
"Query ran out of slots in the output buffer, retrying with max "
3912 "groups buffer entry "
3914 << max_groups_buffer_entry_guess;
3920 result.setQueueTime(queue_time_ms);
3927 LOG(
ERROR) <<
"Query execution failed with error "
3933 LOG(
INFO) <<
"Query ran out of GPU memory, attempting punt to CPU";
3935 throw std::runtime_error(
3936 "Query ran out of GPU memory, unable to automatically retry on CPU");
3945 const char* code{
nullptr};
3946 const char* description{
nullptr};
3951 switch (error_code) {
3953 return {
"ERR_DIV_BY_ZERO",
"Division by zero"};
3955 return {
"ERR_OUT_OF_GPU_MEM",
3957 "Query couldn't keep the entire working set of columns in GPU memory"};
3959 return {
"ERR_UNSUPPORTED_SELF_JOIN",
"Self joins not supported yet"};
3961 return {
"ERR_OUT_OF_CPU_MEM",
"Not enough host memory to execute the query"};
3963 return {
"ERR_OVERFLOW_OR_UNDERFLOW",
"Overflow or underflow"};
3965 return {
"ERR_OUT_OF_TIME",
"Query execution has exceeded the time limit"};
3967 return {
"ERR_INTERRUPTED",
"Query execution has been interrupted"};
3969 return {
"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3970 "Columnar conversion not supported for variable length types"};
3972 return {
"ERR_TOO_MANY_LITERALS",
"Too many literals in the query"};
3974 return {
"ERR_STRING_CONST_IN_RESULTSET",
3976 "NONE ENCODED String types are not supported as input result set."};
3978 return {
"ERR_OUT_OF_RENDER_MEM",
3980 "Insufficient GPU memory for query results in render output buffer "
3981 "sized by render-mem-bytes"};
3983 return {
"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3984 "Streaming-Top-N not supported in Render Query"};
3986 return {
"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3987 "Multiple distinct values encountered"};
3989 return {
"ERR_GEOS",
"ERR_GEOS"};
3991 return {
"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
3993 "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
3995 return {
nullptr,
nullptr};
4002 if (error_code < 0) {
4003 return "Ran out of slots in the query output buffer";
4007 if (errorInfo.code) {
4008 return errorInfo.code +
": "s + errorInfo.description;
4016 VLOG(1) <<
"Running post execution callback.";
4017 (*post_execution_callback_)();
4024 const auto compound =
dynamic_cast<const RelCompound*
>(node);
4028 const auto project =
dynamic_cast<const RelProject*
>(node);
4032 const auto aggregate =
dynamic_cast<const RelAggregate*
>(node);
4036 const auto filter =
dynamic_cast<const RelFilter*
>(node);
4040 LOG(
FATAL) <<
"Unhandled node type: "
4049 if (
auto join = dynamic_cast<const RelJoin*>(sink)) {
4050 return join->getJoinType();
4052 if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4060 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4061 if (!condition || condition->getOperator() !=
kOR || condition->size() != 2) {
4064 const auto equi_join_condition =
4065 dynamic_cast<const RexOperator*
>(condition->getOperand(0));
4066 if (!equi_join_condition || equi_join_condition->getOperator() !=
kEQ) {
4069 const auto both_are_null_condition =
4070 dynamic_cast<const RexOperator*
>(condition->getOperand(1));
4071 if (!both_are_null_condition || both_are_null_condition->getOperator() !=
kAND ||
4072 both_are_null_condition->size() != 2) {
4075 const auto lhs_is_null =
4076 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(0));
4077 const auto rhs_is_null =
4078 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(1));
4079 if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() !=
kISNULL ||
4080 rhs_is_null->getOperator() !=
kISNULL) {
4083 CHECK_EQ(
size_t(1), lhs_is_null->size());
4084 CHECK_EQ(
size_t(1), rhs_is_null->size());
4085 CHECK_EQ(
size_t(2), equi_join_condition->size());
4086 const auto eq_lhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(0));
4087 const auto eq_rhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(1));
4088 const auto is_null_lhs =
dynamic_cast<const RexInput*
>(lhs_is_null->getOperand(0));
4089 const auto is_null_rhs =
dynamic_cast<const RexInput*
>(rhs_is_null->getOperand(0));
4090 if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4093 std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4094 if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4096 auto lhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(0));
4097 auto rhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(1));
4098 eq_operands.emplace_back(lhs_op_copy.release());
4099 eq_operands.emplace_back(rhs_op_copy.release());
4100 return boost::make_unique<const RexOperator>(
4101 kBW_EQ, eq_operands, equi_join_condition->getType());
4108 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4109 if (condition && condition->getOperator() ==
kAND) {
4110 CHECK_GE(condition->size(), size_t(2));
4115 for (
size_t i = 1; i < condition->size(); ++i) {
4116 std::vector<std::unique_ptr<const RexScalar>> and_operands;
4117 and_operands.emplace_back(std::move(acc));
4120 boost::make_unique<const RexOperator>(
kAND, and_operands, condition->getType());
4130 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->
inputCount() - 1;
4135 auto cur_level_join_type = left_deep_join->
getJoinType(nesting_level);
4137 join_types[nesting_level - 1] = cur_level_join_type;
4145 std::vector<InputDescriptor>& input_descs,
4146 std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4148 std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4150 const std::vector<InputTableInfo>& query_infos,
4151 const Executor* executor) {
4157 const auto&
cat = *executor->getCatalog();
4158 for (
const auto& table_info : query_infos) {
4159 if (table_info.table_id < 0) {
4162 const auto td =
cat.getMetadataForTable(table_info.table_id);
4168 const auto input_permutation =
4171 std::tie(input_descs, input_col_descs, std::ignore) =
4173 return input_permutation;
4178 std::vector<size_t> input_sizes;
4179 for (
size_t i = 0; i < left_deep_join->
inputCount(); ++i) {
4181 input_sizes.push_back(inputs.size());
4187 const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4188 std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4189 for (
const auto& qual : quals) {
4191 rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4193 return rewritten_quals;
4202 std::vector<InputDescriptor> input_descs;
4203 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4205 std::tie(input_descs, input_col_descs, std::ignore) =
4210 const auto left_deep_join =
4215 std::vector<size_t> input_permutation;
4216 std::vector<size_t> left_deep_join_input_sizes;
4217 std::optional<unsigned> left_deep_tree_id;
4218 if (left_deep_join) {
4219 left_deep_tree_id = left_deep_join->getId();
4222 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4224 std::find(join_types.begin(), join_types.end(),
JoinType::LEFT) ==
4228 left_deep_join_quals,
4229 input_to_nest_level,
4234 std::tie(input_descs, input_col_descs, std::ignore) =
4237 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4243 input_to_nest_level,
4247 const auto scalar_sources =
4260 auto candidate =
query_dag_->getQueryHint(compound);
4262 query_hint = *candidate;
4270 left_deep_join_quals,
4283 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4284 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4285 const auto targets_meta =
get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4288 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4289 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4290 rewritten_exe_unit.join_quals);
4294 compound, left_deep_tree_id, left_deep_trees_info,
executor_);
4295 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4296 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4298 return {rewritten_exe_unit,
4301 std::move(query_rewriter),
4303 left_deep_join_input_sizes};
4309 const auto left_deep_join =
4313 return std::make_shared<RelAlgTranslator>(
4321 const auto bin_oper =
dynamic_cast<const RexOperator*
>(qual_expr);
4322 if (!bin_oper || bin_oper->getOperator() !=
kAND) {
4325 CHECK_GE(bin_oper->size(), size_t(2));
4327 for (
size_t i = 1; i < bin_oper->size(); ++i) {
4329 lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4335 const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4337 CHECK(!factors.empty());
4338 auto acc = factors.front();
4339 for (
size_t i = 1; i < factors.size(); ++i) {
4345 template <
class QualsList>
4347 const std::shared_ptr<Analyzer::Expr>& needle) {
4348 for (
const auto& qual : haystack) {
4349 if (*qual == *needle) {
4360 const std::shared_ptr<Analyzer::Expr>& expr) {
4362 CHECK_GE(expr_terms.size(), size_t(1));
4363 const auto& first_term = expr_terms.front();
4365 std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4368 for (
const auto& first_term_factor : first_term_factors.quals) {
4370 expr_terms.size() > 1;
4371 for (
size_t i = 1; i < expr_terms.size(); ++i) {
4379 common_factors.push_back(first_term_factor);
4382 if (common_factors.empty()) {
4386 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4387 for (
const auto& term : expr_terms) {
4389 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4390 term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4391 for (
const auto& qual : term_cf.quals) {
4393 remaining_quals.push_back(qual);
4396 if (!remaining_quals.empty()) {
4402 if (remaining_terms.empty()) {
4413 const std::vector<JoinType>& join_types,
4414 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4415 const bool just_explain)
const {
4419 std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4420 for (
const auto rex_condition_component : rex_condition_cf) {
4422 const auto join_condition =
4424 bw_equals ? bw_equals.get() : rex_condition_component));
4427 auto append_folded_cf_quals = [&join_condition_quals](
const auto& cf_quals) {
4428 for (
const auto& cf_qual : cf_quals) {
4429 join_condition_quals.emplace_back(
fold_expr(cf_qual.get()));
4433 append_folded_cf_quals(join_condition_cf.quals);
4434 append_folded_cf_quals(join_condition_cf.simple_quals);
4444 const std::vector<InputDescriptor>& input_descs,
4445 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4446 const bool just_explain) {
4452 std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4453 for (
size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4455 if (outer_condition) {
4456 result[rte_idx - 1].quals =
4457 makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4458 CHECK_LE(rte_idx, join_types.size());
4463 for (
const auto& qual : join_condition_quals) {
4464 if (visited_quals.count(qual)) {
4467 const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4468 if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4469 const auto it_ok = visited_quals.emplace(qual);
4470 CHECK(it_ok.second);
4471 result[rte_idx - 1].quals.push_back(qual);
4474 CHECK_LE(rte_idx, join_types.size());
4478 result[rte_idx - 1].type = join_types[rte_idx - 1];
4487 const size_t nest_level,
4488 const std::vector<TargetMetaInfo>& in_metainfo,
4489 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4492 const auto input = ra_node->
getInput(nest_level);
4493 const auto it_rte_idx = input_to_nest_level.find(input);
4494 CHECK(it_rte_idx != input_to_nest_level.end());
4495 const int rte_idx = it_rte_idx->second;
4497 std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4498 const auto scan_ra =
dynamic_cast<const RelScan*
>(input);
4500 for (
const auto& input_meta : in_metainfo) {
4502 std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4504 scan_ra ? input_idx + 1 : input_idx,
4512 std::vector<std::shared_ptr<Analyzer::Expr>>
const& input) {
4513 std::vector<Analyzer::Expr*> output(input.size());
4514 auto const raw_ptr = [](
auto& shared_ptr) {
return shared_ptr.get(); };
4515 std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4524 const bool just_explain) {
4525 std::vector<InputDescriptor> input_descs;
4526 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4527 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4529 std::tie(input_descs, input_col_descs, used_inputs_owned) =
4536 input_to_nest_level,
4541 const auto source = aggregate->
getInput(0);
4543 const auto scalar_sources =
4555 auto candidate =
query_dag_->getQueryHint(aggregate);
4557 query_hint = *candidate;
4574 join_info.hash_table_plan_dag,
4575 join_info.table_id_to_node_map,
4588 std::vector<InputDescriptor> input_descs;
4589 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4591 std::tie(input_descs, input_col_descs, std::ignore) =
4595 const auto left_deep_join =
4600 std::vector<size_t> input_permutation;
4601 std::vector<size_t> left_deep_join_input_sizes;
4602 std::optional<unsigned> left_deep_tree_id;
4603 if (left_deep_join) {
4604 left_deep_tree_id = left_deep_join->getId();
4608 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4612 left_deep_join_quals,
4613 input_to_nest_level,
4618 std::tie(input_descs, input_col_descs, std::ignore) =
4621 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4628 input_to_nest_level,
4632 const auto target_exprs_owned =
4640 auto candidate =
query_dag_->getQueryHint(project);
4642 query_hint = *candidate;
4649 left_deep_join_quals,
4662 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4663 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4664 const auto targets_meta =
get_targets_meta(project, rewritten_exe_unit.target_exprs);
4667 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4668 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4669 rewritten_exe_unit.join_quals);
4673 project, left_deep_tree_id, left_deep_trees_info,
executor_);
4674 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4675 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4677 return {rewritten_exe_unit,
4680 std::move(query_rewriter),
4682 left_deep_join_input_sizes};
4691 const int negative_node_id = -input_node->
getId();
4692 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4693 target_exprs.reserve(tmis.size());
4694 for (
size_t i = 0; i < tmis.size(); ++i) {
4695 target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4696 tmis[i].get_type_info(), negative_node_id, i, 0));
4698 return target_exprs;
4707 std::vector<InputDescriptor> input_descs;
4708 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4711 std::tie(input_descs, input_col_descs, std::ignore) =
4714 auto const max_num_tuples =
4718 [](
auto max,
auto const& query_info) {
4719 return std::max(max, query_info.info.getNumTuples());
4722 VLOG(3) <<
"input_to_nest_level.size()=" << input_to_nest_level.size() <<
" Pairs are:";
4723 for (
auto& pair : input_to_nest_level) {
4725 << pair.second <<
')';
4733 std::vector<Analyzer::Expr*> target_exprs_pair[2];
4734 for (
unsigned i = 0; i < 2; ++i) {
4736 CHECK(!input_exprs_owned.empty())
4737 <<
"No metainfo found for input node(" << i <<
") "
4739 VLOG(3) <<
"i(" << i <<
") input_exprs_owned.size()=" << input_exprs_owned.size();
4740 for (
auto& input_expr : input_exprs_owned) {
4741 VLOG(3) <<
" " << input_expr->toString();
4749 <<
" target_exprs.size()=" << target_exprs_pair[0].size()
4750 <<
" max_num_tuples=" << max_num_tuples;
4758 target_exprs_pair[0],
4767 logical_union->
isAll(),
4769 target_exprs_pair[1]};
4770 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4771 const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4774 if (
auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4777 }
else if (
auto const* node = dynamic_cast<const RelProject*>(input0)) {
4780 }
else if (
auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4783 }
else if (
auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4786 }
else if (
auto const* node = dynamic_cast<const RelScan*>(input0)) {
4789 }
else if (
auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4792 }
else if (dynamic_cast<const RelSort*>(input0)) {
4793 throw QueryNotSupported(
"LIMIT and OFFSET are not currently supported with UNION.");
4798 VLOG(3) <<
"logical_union->getOutputMetainfo()="
4800 <<
" rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4801 << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4803 return {rewritten_exe_unit,
4806 std::move(query_rewriter)};
4811 const bool just_explain,
4812 const bool is_gpu) {
4813 std::vector<InputDescriptor> input_descs;
4814 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4816 std::tie(input_descs, input_col_descs, std::ignore) =
4827 const auto table_function_impl_and_type_infos = [=]() {
4833 LOG(
WARNING) <<
"createTableFunctionWorkUnit[GPU]: " << e.what()
4835 <<
" step to run on CPU.";
4843 LOG(
WARNING) <<
"createTableFunctionWorkUnit[CPU]: " << e.what();
4848 const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4849 const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4851 size_t output_row_sizing_param = 0;
4852 if (table_function_impl
4853 .hasUserSpecifiedOutputSizeParameter()) {
4854 const auto parameter_index =
4855 table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4856 CHECK_GT(parameter_index,
size_t(0));
4858 const auto parameter_expr =
4860 const auto parameter_expr_literal =
dynamic_cast<const RexLiteral*
>(parameter_expr);
4861 if (!parameter_expr_literal) {
4862 throw std::runtime_error(
4863 "Provided output buffer sizing parameter is not a literal. Only literal "
4864 "values are supported with output buffer sizing configured table "
4867 int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4868 if (literal_val < 0) {
4869 throw std::runtime_error(
"Provided output sizing parameter " +
4871 " must be positive integer.");
4873 output_row_sizing_param =
static_cast<size_t>(literal_val);
4876 output_row_sizing_param = 1;
4878 static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4879 makeExpr<Analyzer::Constant>(
kINT,
false, d);
4881 input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4882 DEFAULT_ROW_MULTIPLIER_EXPR.get());
4884 }
else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4885 output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4890 std::vector<Analyzer::ColumnVar*> input_col_exprs;
4891 size_t input_index = 0;
4892 size_t arg_index = 0;
4893 const auto table_func_args = table_function_impl.getInputArgs();
4894 CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
4895 for (
const auto& ti : table_function_type_infos) {
4896 if (ti.is_column_list()) {
4897 for (
int i = 0; i < ti.get_dimension(); i++) {
4898 auto& input_expr = input_exprs[input_index];
4904 auto type_info = input_expr->get_type_info();
4905 type_info.set_subtype(type_info.get_type());
4906 type_info.set_type(ti.get_type());
4907 type_info.set_dimension(ti.get_dimension());
4908 input_expr->set_type_info(type_info);
4910 input_col_exprs.push_back(col_var);
4913 }
else if (ti.is_column()) {
4914 auto& input_expr = input_exprs[input_index];
4920 auto type_info = input_expr->get_type_info();
4921 type_info.set_subtype(type_info.get_type());
4922 type_info.set_type(ti.get_type());
4923 input_expr->set_type_info(type_info);
4925 input_col_exprs.push_back(col_var);
4928 auto input_expr = input_exprs[input_index];
4930 if (ext_func_arg_ti != input_expr->get_type_info()) {
4931 input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
4938 std::vector<Analyzer::Expr*> table_func_outputs;
4939 constexpr int32_t transient_pos{-1};
4940 for (
size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4941 auto ti = table_function_impl.getOutputSQLType(i);
4942 if (ti.is_dict_encoded_string()) {
4943 auto p = table_function_impl.getInputID(i);
4945 int32_t input_pos = p.first;
4946 if (input_pos == transient_pos) {
4952 for (
int j = 0; j < input_pos; j++) {
4953 const auto ti = table_function_type_infos[j];
4954 offset += ti.is_column_list() ? ti.get_dimension() : 1;
4956 input_pos = offset + p.second;
4958 CHECK_LT(input_pos, input_exprs.size());
4959 int32_t comp_param =
4960 input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4961 ti.set_comp_param(comp_param);
4973 output_row_sizing_param,
4974 table_function_impl};
4977 return {exe_unit, rel_table_func};
4982 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4985 const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4986 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4987 std::vector<TargetMetaInfo> in_metainfo;
4988 std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4990 auto input_it = inputs_owned.begin();
4991 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4992 const auto source = data_sink_node->getInput(nest_level);
4993 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
4995 CHECK(source->getOutputMetainfo().empty());
4996 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4997 for (
size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5000 const auto source_metadata =
5003 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5005 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5007 const auto& source_metadata = source->getOutputMetainfo();
5008 input_it += source_metadata.size();
5010 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5012 data_sink_node, nest_level, source_metadata, input_to_nest_level);
5014 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5017 return std::make_pair(in_metainfo, exprs_owned);
5024 const bool just_explain) {
5026 std::vector<InputDescriptor> input_descs;
5027 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5028 std::vector<TargetMetaInfo> in_metainfo;
5029 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5030 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5033 std::tie(input_descs, input_col_descs, used_inputs_owned) =
5039 input_to_nest_level,
5043 std::tie(in_metainfo, target_exprs_owned) =
5044 get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5045 const auto filter_expr = translator.translateScalarRex(filter->
getCondition());
5048 const auto qual =
fold_expr(filter_expr.get());
5057 auto candidate =
query_dag_->getQueryHint(filter);
5059 query_hint = *candidate;
5064 return {{input_descs,
5067 {rewritten_qual ? rewritten_qual : qual},
5076 join_info.hash_table_plan_dag,
5077 join_info.table_id_to_node_map},
5086 if (
auto foreign_storage_mgr =
5091 foreign_storage_mgr->setParallelismHints({});
5100 executor_->setupCaching(phys_inputs, phys_table_ids);
const size_t getGroupByCount() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
static void invalidateCachesByTable(size_t table_key)
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
TextEncodingCastCounts(const size_t text_decoding_casts, const size_t text_encoding_casts)
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::vector< int > ChunkKey
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
TextEncodingCastCounts visitUOper(const Analyzer::UOper *u_oper) const override
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
HOST DEVICE int get_size() const
int32_t getErrorCode() const
bool find_push_down_candidates
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
bool g_use_query_resultset_cache
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
const Expr * get_escape_expr() const
bool has_valid_query_plan_dag(const RelAlgNode *node)
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
static const int32_t ERR_INTERRUPTED
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)