47 #include <boost/algorithm/cxx11/any_of.hpp>
48 #include <boost/range/adaptor/reversed.hpp>
72 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
73 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
74 return ((compound && compound->isAggregate()) || aggregate);
80 std::unordered_set<PhysicalInput> phys_inputs2;
81 for (
auto& phi : phys_inputs) {
85 catalog->getColumnIdBySpi(phi.table_id, phi.col_id), phi.
table_id, phi.db_id});
91 std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
92 parallelism_hints_per_table;
94 const auto table_id = physical_input.table_id;
98 const auto table = catalog->getMetadataForTable(table_id,
true);
100 !table->is_in_memory_system_table) {
101 const auto col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
102 const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
103 const auto foreign_table = catalog->getForeignTable(table_id);
104 for (
const auto& fragment :
105 foreign_table->fragmenter->getFragmentsForQuery().fragments) {
108 physical_input.db_id, table_id, col_id, fragment.fragmentId};
117 if (!chunk.isChunkOnDevice(
119 parallelism_hints_per_table[{physical_input.db_id, table_id}].insert(
121 fragment.fragmentId});
126 if (!parallelism_hints_per_table.empty()) {
131 CHECK(foreign_storage_mgr);
132 foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
140 const auto table = catalog->getMetadataForTable(table_id,
false);
142 const auto spi_col_id = catalog->getColumnIdBySpi(table_id, col_id);
159 const auto info_schema_catalog =
161 CHECK(info_schema_catalog);
162 std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
164 if (info_schema_catalog->getDatabaseId() != physical_input.db_id) {
167 const auto table_id = physical_input.table_id;
168 const auto table = info_schema_catalog->getMetadataForTable(table_id,
false);
170 if (table->is_in_memory_system_table) {
171 const auto column_id =
172 info_schema_catalog->getColumnIdBySpi(table_id, physical_input.col_id);
173 system_table_columns_by_table_id[table_id].emplace_back(column_id);
177 if (!system_table_columns_by_table_id.empty() &&
182 for (
const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
185 info_schema_catalog->getDataMgr().deleteChunksWithPrefix(
186 ChunkKey{info_schema_catalog->getDatabaseId(), table_id},
195 const auto td = info_schema_catalog->getMetadataForTable(table_id);
197 CHECK(td->fragmenter);
198 auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
199 CHECK_LE(fragment_count, static_cast<size_t>(1))
200 <<
"In-memory system tables are expected to have a single fragment.";
201 if (fragment_count > 0) {
202 for (
auto column_id : column_ids) {
205 const auto cd = info_schema_catalog->getMetadataForColumn(table_id, column_id);
207 info_schema_catalog->getDatabaseId(), table_id, column_id, 0};
209 &(info_schema_catalog->getDataMgr()),
227 const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
228 const std::vector<TargetMetaInfo>& targets_meta) {
229 CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
231 for (
size_t i = 0; i < targets_meta.size(); ++i) {
232 render_info.
targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
233 targets_meta[i].get_resname(),
234 work_unit_target_exprs[i]->get_shared_ptr(),
247 return {left_deep_join_tree->
getId()};
252 const std::vector<unsigned>& aggregate,
253 const std::vector<unsigned>& next_result)
const override {
255 std::copy(next_result.begin(), next_result.end(), std::back_inserter(
result));
266 : text_decoding_casts(text_decoding_casts)
267 , text_encoding_casts(text_encoding_casts) {}
274 default_disregard_casts_to_none_encoding) {}
288 if (!operand_ti.is_string() && casted_ti.is_dict_encoded_string()) {
291 if (!casted_ti.is_string()) {
298 if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
301 if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
302 if (!disregard_casts_to_none_encoding) {
336 const auto prev_disregard_casts_to_none_encoding_state =
338 const auto left_u_oper =
340 if (left_u_oper && left_u_oper->get_optype() ==
kCAST) {
348 const auto right_u_oper =
350 if (right_u_oper && right_u_oper->get_optype() ==
kCAST) {
368 const auto prev_disregard_casts_to_none_encoding_state =
370 if (u_oper && u_oper->get_optype() ==
kCAST) {
409 auto check_node_for_text_casts = [&cast_counts](
411 const bool disregard_casts_to_none_encoding) {
416 const auto this_node_cast_counts = visitor.
visit(expr);
421 for (
const auto& qual : ra_exe_unit.
quals) {
422 check_node_for_text_casts(qual.get(),
false);
424 for (
const auto& simple_qual : ra_exe_unit.
simple_quals) {
425 check_node_for_text_casts(simple_qual.get(),
false);
428 check_node_for_text_casts(groupby_expr.get(),
false);
430 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
431 check_node_for_text_casts(target_expr,
false);
433 for (
const auto& join_condition : ra_exe_unit.
join_quals) {
434 for (
const auto& join_qual : join_condition.quals) {
440 check_node_for_text_casts(join_qual.get(),
450 const std::vector<InputTableInfo>& query_infos,
455 auto const tuples_upper_bound =
459 [](
auto max,
auto const& query_info) {
460 return std::max(max, query_info.info.getNumTuples());
467 const bool has_text_casts =
468 text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
470 if (!has_text_casts) {
473 std::ostringstream oss;
474 oss <<
"Query requires one or more casts between none-encoded and dictionary-encoded "
475 <<
"strings, and the estimated table size (" << tuples_upper_bound <<
" rows) "
476 <<
"exceeds the configured watchdog none-encoded string translation limit of "
478 throw std::runtime_error(oss.str());
489 !query_for_partial_outer_frag &&
490 (!render_info || (render_info && !render_info->
isInSitu()));
508 auto lock =
executor_->acquireExecuteMutex();
519 CHECK(!ed_seq.empty());
520 if (ed_seq.size() > 1) {
528 auto exec_desc_ptr = ed_seq.getDescriptor(0);
529 CHECK(exec_desc_ptr);
530 auto& exec_desc = *exec_desc_ptr;
531 const auto body = exec_desc.getBody();
536 const auto project =
dynamic_cast<const RelProject*
>(body);
543 const auto compound =
dynamic_cast<const RelCompound*
>(body);
545 if (compound->isDeleteViaSelect()) {
547 }
else if (compound->isUpdateViaSelect()) {
550 if (compound->isAggregate()) {
566 const bool just_explain_plan,
567 const bool explain_verbose,
571 << static_cast<int>(
query_dag_->getBuildState());
578 co_in, eo, just_explain_plan, explain_verbose, render_info);
580 constexpr
bool vlog_result_set_summary{
false};
581 if constexpr (vlog_result_set_summary) {
582 VLOG(1) << execution_result.getRows()->summaryToString();
586 VLOG(1) <<
"Running post execution callback.";
587 (*post_execution_callback_)();
589 return execution_result;
599 LOG(
INFO) <<
"Query unable to run in GPU mode, retrying on CPU";
610 const bool just_explain_plan,
611 const bool explain_verbose,
615 auto timer_setup =
DEBUG_TIMER(
"Query pre-execution steps");
625 std::string query_session{
""};
626 std::string query_str{
"N/A"};
627 std::string query_submitted_time{
""};
630 query_session =
query_state_->getConstSessionInfo()->get_session_id();
632 query_submitted_time =
query_state_->getQuerySubmittedTime();
635 auto validate_or_explain_query =
637 auto interruptable = !render_info && !query_session.empty() &&
644 std::tie(query_session, query_str) =
executor_->attachExecutorToQuerySession(
645 query_session, query_str, query_submitted_time);
651 query_submitted_time,
652 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
657 [
this, &query_session, &interruptable, &query_submitted_time] {
661 executor_->clearQuerySessionStatus(query_session, query_submitted_time);
665 auto acquire_execute_mutex = [](Executor * executor) ->
auto{
666 auto ret = executor->acquireExecuteMutex();
671 auto lock = acquire_execute_mutex(
executor_);
680 executor_->checkPendingQueryStatus(query_session);
683 throw std::runtime_error(
"Query execution has been interrupted (pending query)");
687 throw std::runtime_error(
"Checking pending query status failed: unknown error");
690 int64_t queue_time_ms =
timer_stop(clock_begin);
703 if (just_explain_plan) {
706 std::vector<const RelAlgNode*> steps;
707 for (
size_t i = 0; i < ed_seq.size(); i++) {
708 steps.emplace_back(ed_seq.getDescriptor(i)->getBody());
709 steps.back()->setStepNumber(i + 1);
711 std::stringstream ss;
716 auto rs = std::make_shared<ResultSet>(ss.str());
724 ed_seq, co, eo, render_info, queue_time_ms);
731 const auto subquery_ra = subquery->getRelAlg();
733 if (subquery_ra->hasContextData()) {
740 if (global_hints || local_hints) {
741 const auto subquery_rel_alg_dag = subquery_executor.
getRelAlgDag();
746 subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
751 subquery->setExecutionResult(std::make_shared<ExecutionResult>(
result));
759 return executor_->computeColRangesCache(phys_inputs);
764 return executor_->computeStringDictionaryGenerations(phys_inputs);
769 return executor_->computeTableGenerations(phys_table_ids);
781 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
783 auto sort_node =
dynamic_cast<const RelSort*
>(root_node);
790 auto left_deep_tree_ids = visitor.
visit(root_node);
798 const auto source = sort->
getInput(0);
799 if (dynamic_cast<const RelSort*>(source)) {
800 throw std::runtime_error(
"Sort node not supported as input to another sort");
808 const size_t step_idx,
816 const auto sort =
dynamic_cast<const RelSort*
>(exe_desc_ptr->getBody());
818 size_t shard_count{0};
825 auto order_entries =
sort->getOrderEntries();
832 const auto source =
sort->getInput(0);
835 CHECK_EQ(temp_seq.size(), size_t(1));
848 merge_type(exe_desc_ptr->getBody()),
849 exe_desc_ptr->getBody()->getId(),
853 seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info,
queue_time_ms_);
858 LOG(
INFO) <<
"Retry the query via CPU mode";
860 std::make_pair(step_idx, step_idx + 1),
867 VLOG(1) <<
"Running post execution callback.";
868 (*post_execution_callback_)();
870 return query_step_result;
883 executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(
885 executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
886 executor_->table_generations_ = table_generations;
887 executor_->agg_col_range_cache_ = agg_col_range;
894 const int64_t queue_time_ms,
895 const bool with_existing_temp_tables) {
898 if (!with_existing_temp_tables) {
908 auto get_descriptor_count = [&seq, &eo]() ->
size_t {
923 const auto exec_desc_count = get_descriptor_count();
937 const auto cached_res =
938 executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
944 const auto num_steps = exec_desc_count - 1;
945 for (
size_t i = 0; i < exec_desc_count; i++) {
946 VLOG(1) <<
"Executing query step " << i <<
" / " << num_steps;
949 seq, i, co, eo_copied, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
958 LOG(
INFO) <<
"Retrying current query step " << i <<
" / " << num_steps <<
" on CPU";
960 if (render_info && i == num_steps) {
968 (i == num_steps) ? render_info :
nullptr,
974 auto eo_extern = eo_copied;
975 eo_extern.executor_type = ::ExecutorType::Extern;
977 const auto body = exec_desc_ptr->
getBody();
978 const auto compound =
dynamic_cast<const RelCompound*
>(body);
979 if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
980 LOG(
INFO) <<
"Also failed to run the query using interoperability";
984 seq, i, co, eo_extern, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
993 const std::pair<size_t, size_t> interval,
997 const int64_t queue_time_ms) {
1002 for (
size_t i = interval.first; i < interval.second; i++) {
1009 (i == interval.second - 1) ? render_info :
nullptr,
1019 LOG(
INFO) <<
"Retrying current query step " << i <<
" on CPU";
1021 if (render_info && i == interval.second - 1) {
1028 (i == interval.second - 1) ? render_info :
nullptr,
1040 auto columnar_output_hint_enabled =
false;
1041 auto rowwise_output_hint_enabled =
false;
1043 VLOG(1) <<
"A user forces to run the query on the CPU execution mode";
1048 VLOG(1) <<
"A user enables keeping query resultset but is skipped since data "
1049 "recycler is disabled";
1052 VLOG(1) <<
"A user enables keeping query resultset but is skipped since query "
1053 "resultset recycler is disabled";
1055 VLOG(1) <<
"A user enables keeping query resultset";
1062 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1063 "since data recycler is disabled";
1066 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1067 "since query resultset recycler is disabled";
1069 VLOG(1) <<
"A user enables keeping table function's resultset";
1075 VLOG(1) <<
"A user enables watchdog for this query";
1081 VLOG(1) <<
"A user disables watchdog for this query";
1087 VLOG(1) <<
"A user enables dynamic watchdog for this query";
1093 VLOG(1) <<
"A user disables dynamic watchdog for this query";
1098 std::ostringstream oss;
1099 oss <<
"A user sets query time limit to " << query_hints.
query_time_limit <<
" ms";
1103 oss <<
" (and system automatically enables dynamic watchdog to activate the "
1104 "given \"query_time_limit\" hint)";
1106 VLOG(1) << oss.str();
1109 VLOG(1) <<
"A user enables loop join";
1113 VLOG(1) <<
"A user disables loop join";
1118 VLOG(1) <<
"A user forces the maximum size of a join hash table as "
1124 VLOG(1) <<
"Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1125 "of the following query hints are given simultaneously: "
1126 "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1128 VLOG(1) <<
"A user enables optimization of cuda block and grid sizes";
1134 VLOG(1) <<
"A user disables table reordering listed in the FROM clause";
1138 VLOG(1) <<
"A user forces the query to run with columnar output";
1139 columnar_output_hint_enabled =
true;
1141 VLOG(1) <<
"A user forces the query to run with rowwise output";
1142 rowwise_output_hint_enabled =
true;
1145 : columnar_output_hint_enabled;
1146 if (
g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1147 LOG(
INFO) <<
"Currently, we do not support applying query hint to change query "
1148 "output layout in distributed mode.";
1155 const size_t step_idx,
1159 const int64_t queue_time_ms) {
1163 CHECK(exec_desc_ptr);
1164 auto& exec_desc = *exec_desc_ptr;
1165 const auto body = exec_desc.getBody();
1166 if (body->isNop()) {
1177 auto target_node = body;
1178 auto query_plan_dag_hash = body->getQueryPlanDagHash();
1179 if (
auto sort_body = dynamic_cast<const RelSort*>(body)) {
1180 target_node = sort_body->getInput(0);
1185 target_node->getQueryPlanDagHash(),
SortInfo());
1194 if (
auto cached_resultset =
1195 executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
1196 query_plan_dag_hash)) {
1197 VLOG(1) <<
"recycle resultset of the root node " << body->getRelNodeDagId()
1198 <<
" from resultset cache";
1199 body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1201 std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1202 executor_->getResultSetRecyclerHolder().getTargetExprs(query_plan_dag_hash);
1203 std::vector<Analyzer::Expr*> copied_target_exprs;
1204 for (
const auto& expr : cached_target_exprs) {
1205 copied_target_exprs.push_back(expr.get());
1208 *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1210 exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1216 const auto compound =
dynamic_cast<const RelCompound*
>(body);
1218 if (compound->isDeleteViaSelect()) {
1219 executeDelete(compound, co_copied, eo_copied, queue_time_ms);
1220 }
else if (compound->isUpdateViaSelect()) {
1221 executeUpdate(compound, co_copied, eo_copied, queue_time_ms);
1223 exec_desc.setResult(
1224 executeCompound(compound, co_copied, eo_copied, render_info, queue_time_ms));
1225 VLOG(3) <<
"Returned from executeCompound(), addTemporaryTable("
1226 <<
static_cast<int>(-compound->getId()) <<
", ...)"
1227 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1228 << exec_desc.getResult().getDataPtr()->rowCount();
1229 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1236 const auto project =
dynamic_cast<const RelProject*
>(body);
1238 if (project->isDeleteViaSelect()) {
1239 executeDelete(project, co_copied, eo_copied, queue_time_ms);
1240 }
else if (project->isUpdateViaSelect()) {
1241 executeUpdate(project, co_copied, eo_copied, queue_time_ms);
1243 std::optional<size_t> prev_count;
1249 if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1254 const auto& prev_exe_result = prev_exec_desc->getResult();
1255 const auto prev_result = prev_exe_result.getRows();
1257 prev_count = prev_result->rowCount();
1258 VLOG(3) <<
"Setting output row count for projection node to previous node ("
1259 << prev_exec_desc->getBody()->toString(
1261 <<
") to " << *prev_count;
1267 project, co_copied, eo_copied, render_info, queue_time_ms, prev_count));
1268 VLOG(3) <<
"Returned from executeProject(), addTemporaryTable("
1269 <<
static_cast<int>(-project->getId()) <<
", ...)"
1270 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1271 << exec_desc.getResult().getDataPtr()->rowCount();
1272 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1279 const auto aggregate =
dynamic_cast<const RelAggregate*
>(body);
1281 exec_desc.setResult(
1282 executeAggregate(aggregate, co_copied, eo_copied, render_info, queue_time_ms));
1286 const auto filter =
dynamic_cast<const RelFilter*
>(body);
1288 exec_desc.setResult(
1289 executeFilter(filter, co_copied, eo_copied, render_info, queue_time_ms));
1293 const auto sort =
dynamic_cast<const RelSort*
>(body);
1295 exec_desc.setResult(
1297 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1304 if (logical_values) {
1306 addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1309 const auto modify =
dynamic_cast<const RelModify*
>(body);
1314 const auto logical_union =
dynamic_cast<const RelLogicalUnion*
>(body);
1315 if (logical_union) {
1317 logical_union, seq, co_copied, eo_copied, render_info, queue_time_ms));
1323 exec_desc.setResult(
1328 LOG(
FATAL) <<
"Unhandled body type: "
1335 CHECK(dynamic_cast<const RelAggregate*>(body));
1336 CHECK_EQ(
size_t(1), body->inputCount());
1337 const auto input = body->getInput(0);
1338 body->setOutputMetainfo(input->getOutputMetainfo());
1344 ed.setResult({it->second, input->getOutputMetainfo()});
1354 return synthesized_physical_inputs_owned;
1358 const RexInput* rex_input)
const override {
1361 const auto scan_ra =
dynamic_cast<const RelScan*
>(input_ra);
1365 const auto col_id = rex_input->
getIndex();
1367 scan_ra->getCatalog().getMetadataForColumnBySpi(td->tableId, col_id + 1);
1368 if (cd && cd->columnType.get_physical_cols() > 0) {
1370 std::unordered_set<const RexInput*> synthesized_physical_inputs;
1371 for (
auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1372 auto physical_input =
1374 synthesized_physical_inputs_owned.emplace_back(physical_input);
1375 synthesized_physical_inputs.insert(physical_input);
1377 return synthesized_physical_inputs;
1386 const std::unordered_set<const RexInput*>& aggregate,
1387 const std::unordered_set<const RexInput*>& next_result)
const override {
1389 result.insert(next_result.begin(), next_result.end());
1398 if (
auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1401 if (
auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1405 if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1408 auto only_src = ra_node->
getInput(0);
1409 const bool is_join =
dynamic_cast<const RelJoin*
>(only_src) ||
1410 dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1411 return is_join ? only_src : ra_node;
1414 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1418 std::unordered_set<const RexInput*> used_inputs =
1419 filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1421 for (
size_t i = 0; i < sources_size; ++i) {
1422 const auto source_inputs = visitor.visit(compound->
getScalarSource(i));
1423 used_inputs.insert(source_inputs.begin(), source_inputs.end());
1425 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1426 return std::make_pair(used_inputs, used_inputs_owned);
1429 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1432 std::unordered_set<const RexInput*> used_inputs;
1433 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1434 const auto source = aggregate->
getInput(0);
1437 CHECK_GE(in_metainfo.size(), group_count);
1438 for (
size_t i = 0; i < group_count; ++i) {
1439 auto synthesized_used_input =
new RexInput(source, i);
1440 used_inputs_owned.emplace_back(synthesized_used_input);
1441 used_inputs.insert(synthesized_used_input);
1443 for (
const auto& agg_expr : aggregate->
getAggExprs()) {
1444 for (
size_t i = 0; i < agg_expr->size(); ++i) {
1445 const auto operand_idx = agg_expr->getOperand(i);
1446 CHECK_GE(in_metainfo.size(),
static_cast<size_t>(operand_idx));
1447 auto synthesized_used_input =
new RexInput(source, operand_idx);
1448 used_inputs_owned.emplace_back(synthesized_used_input);
1449 used_inputs.insert(synthesized_used_input);
1452 return std::make_pair(used_inputs, used_inputs_owned);
1455 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1458 std::unordered_set<const RexInput*> used_inputs;
1459 for (
size_t i = 0; i < project->
size(); ++i) {
1460 const auto proj_inputs = visitor.visit(project->
getProjectAt(i));
1461 used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1463 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1464 return std::make_pair(used_inputs, used_inputs_owned);
1467 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1470 std::unordered_set<const RexInput*> used_inputs;
1473 used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1475 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1476 return std::make_pair(used_inputs, used_inputs_owned);
1479 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1481 std::unordered_set<const RexInput*> used_inputs;
1482 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1484 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1485 const auto source = data_sink_node->getInput(nest_level);
1486 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
1488 CHECK(source->getOutputMetainfo().empty());
1489 for (
size_t i = 0; i < scan_source->size(); ++i) {
1490 auto synthesized_used_input =
new RexInput(scan_source, i);
1491 used_inputs_owned.emplace_back(synthesized_used_input);
1492 used_inputs.insert(synthesized_used_input);
1495 const auto& partial_in_metadata = source->getOutputMetainfo();
1496 for (
size_t i = 0; i < partial_in_metadata.size(); ++i) {
1497 auto synthesized_used_input =
new RexInput(source, i);
1498 used_inputs_owned.emplace_back(synthesized_used_input);
1499 used_inputs.insert(synthesized_used_input);
1503 return std::make_pair(used_inputs, used_inputs_owned);
1506 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1508 std::unordered_set<const RexInput*> used_inputs(logical_union->
inputCount());
1509 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1510 used_inputs_owned.reserve(logical_union->
inputCount());
1511 VLOG(3) <<
"logical_union->inputCount()=" << logical_union->
inputCount();
1512 auto const n_inputs = logical_union->
inputCount();
1513 for (
size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1514 auto input = logical_union->
getInput(nest_level);
1515 for (
size_t i = 0; i < input->size(); ++i) {
1516 used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1517 used_inputs.insert(used_inputs_owned.back().get());
1520 return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1524 const auto scan_ra =
dynamic_cast<const RelScan*
>(ra_node);
1527 table_key.
db_id = scan_ra->getCatalog().getDatabaseId();
1528 const auto td = scan_ra->getTableDescriptor();
1530 table_key.table_id = td->tableId;
1537 const std::vector<size_t>& input_permutation) {
1539 std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1540 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1541 const auto input_node_idx =
1542 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1543 const auto input_ra = data_sink_node->getInput(input_node_idx);
1547 size_t const idx =
dynamic_cast<const RelLogicalUnion*
>(ra_node) ? 0 : input_idx;
1548 const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1549 CHECK(it_ok.second);
1552 <<
" to nest level " << input_idx;
1554 return input_to_nest_level;
1557 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1560 if (
auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1562 const auto condition =
join->getCondition();
1564 auto condition_inputs = visitor.visit(condition);
1565 std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1567 return std::make_pair(condition_inputs, condition_inputs_owned);
1570 if (
auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1571 CHECK_GE(left_deep_join->inputCount(), 2u);
1572 const auto condition = left_deep_join->getInnerCondition();
1574 auto result = visitor.visit(condition);
1575 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1577 const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1578 if (outer_condition) {
1579 const auto outer_result = visitor.visit(outer_condition);
1580 result.insert(outer_result.begin(), outer_result.end());
1583 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1584 return std::make_pair(
result, used_inputs_owned);
1587 if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1590 }
else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1598 return std::make_pair(std::unordered_set<const RexInput*>{},
1599 std::vector<std::shared_ptr<RexInput>>{});
1603 std::vector<InputDescriptor>& input_descs,
1604 std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1606 const std::unordered_set<const RexInput*>& source_used_inputs,
1607 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1609 <<
" input_col_descs_unique.size()=" << input_col_descs_unique.size()
1610 <<
" source_used_inputs.size()=" << source_used_inputs.size();
1611 for (
const auto used_input : source_used_inputs) {
1612 const auto input_ra = used_input->getSourceNode();
1614 auto col_id = used_input->getIndex();
1615 auto it = input_to_nest_level.find(input_ra);
1616 if (it != input_to_nest_level.end()) {
1617 const int nest_level = it->second;
1618 if (
auto rel_scan = dynamic_cast<const RelScan*>(input_ra)) {
1619 const auto& catalog = rel_scan->getCatalog();
1620 col_id = catalog.getColumnIdBySpi(table_key.table_id, col_id + 1);
1622 input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1623 col_id, table_key.table_id, table_key.db_id, nest_level));
1624 }
else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1625 throw std::runtime_error(
"Bushy joins not supported");
1631 std::pair<std::vector<InputDescriptor>,
1632 std::list<std::shared_ptr<const InputColDescriptor>>>
1634 const std::unordered_set<const RexInput*>& used_inputs,
1635 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1636 const std::vector<size_t>& input_permutation) {
1637 std::vector<InputDescriptor> input_descs;
1639 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1640 const auto input_node_idx =
1641 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1642 auto input_ra = data_sink_node->getInput(input_node_idx);
1644 input_descs.emplace_back(table_key.db_id, table_key.table_id, input_idx);
1646 std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1648 input_col_descs_unique,
1651 input_to_nest_level);
1652 std::unordered_set<const RexInput*> join_source_used_inputs;
1653 std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1654 std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1657 input_col_descs_unique,
1659 join_source_used_inputs,
1660 input_to_nest_level);
1661 std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1662 input_col_descs_unique.begin(), input_col_descs_unique.end());
1665 input_col_descs.end(),
1666 [](std::shared_ptr<const InputColDescriptor>
const& lhs,
1667 std::shared_ptr<const InputColDescriptor>
const& rhs) {
1668 return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1670 lhs->getScanDesc().getTableKey()) <
1671 std::make_tuple(rhs->getScanDesc().getNestLevel(),
1673 rhs->getScanDesc().getTableKey());
1675 return {input_descs,
1676 std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1677 input_col_descs.end())};
1681 std::tuple<std::vector<InputDescriptor>,
1682 std::list<std::shared_ptr<const InputColDescriptor>>,
1683 std::vector<std::shared_ptr<RexInput>>>
1685 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1686 const std::vector<size_t>& input_permutation) {
1687 std::unordered_set<const RexInput*> used_inputs;
1688 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1690 VLOG(3) <<
"used_inputs.size() = " << used_inputs.size();
1691 auto input_desc_pair =
1693 return std::make_tuple(
1694 input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1702 return project->
size();
1722 const std::shared_ptr<Analyzer::Expr> expr) {
1723 const auto& ti = expr->get_type_info();
1727 auto transient_dict_ti = ti;
1731 transient_dict_ti.set_fixed_size();
1732 return expr->add_cast(transient_dict_ti);
1736 std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1737 const std::shared_ptr<Analyzer::Expr>& expr) {
1741 scalar_sources.push_back(
fold_expr(expr.get()));
1746 const std::shared_ptr<Analyzer::Expr>& input) {
1747 const auto& input_ti = input->get_type_info();
1748 if (input_ti.is_string() && input_ti.get_compression() ==
kENCODING_DICT) {
1759 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1761 VLOG(3) <<
"get_scalar_sources_size("
1763 <<
") = " << scalar_sources_size;
1764 for (
size_t i = 0; i < scalar_sources_size; ++i) {
1765 const auto scalar_rex =
scalar_at(i, ra_node);
1766 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1772 const auto scalar_expr =
1774 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1778 scalar_sources.push_back(
fold_expr(rewritten_expr.get()));
1784 return scalar_sources;
1794 size_t starting_projection_column_idx) {
1795 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1797 const auto scalar_rex =
scalar_at(i, ra_node);
1798 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1804 std::shared_ptr<Analyzer::Expr> translated_expr;
1806 translated_expr = cast_to_column_type(translator.
translate(scalar_rex),
1809 colNames[i - starting_projection_column_idx]);
1811 translated_expr = translator.
translate(scalar_rex);
1814 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1818 return scalar_sources;
1823 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1827 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1828 for (
size_t group_idx = 0; group_idx < compound->
getGroupByCount(); ++group_idx) {
1831 return groupby_exprs;
1836 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1837 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1838 for (
size_t group_idx = 0; group_idx < aggregate->
getGroupByCount(); ++group_idx) {
1841 return groupby_exprs;
1847 const auto filter_expr = filter_rex ? translator.
translate(filter_rex) :
nullptr;
1856 size_t target_expr_idx,
1857 std::shared_ptr<Analyzer::Expr>& target_expr,
1858 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1861 if (agg_expr->get_is_distinct()) {
1862 SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1864 target_exprs_type_infos.emplace(target_expr_idx, ti);
1865 target_expr = target_expr->deep_copy();
1871 target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1876 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1877 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1878 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1879 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1883 std::vector<Analyzer::Expr*> target_exprs;
1884 for (
size_t i = 0; i < compound->
size(); ++i) {
1886 const auto target_rex_agg =
dynamic_cast<const RexAgg*
>(target_rex);
1887 std::shared_ptr<Analyzer::Expr> target_expr;
1888 if (target_rex_agg) {
1893 const auto target_rex_scalar =
dynamic_cast<const RexScalar*
>(target_rex);
1894 const auto target_rex_ref =
dynamic_cast<const RexRef*
>(target_rex_scalar);
1895 if (target_rex_ref) {
1896 const auto ref_idx = target_rex_ref->
getIndex();
1898 CHECK_LE(ref_idx, groupby_exprs.size());
1899 const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1905 target_expr =
fold_expr(rewritten_expr.get());
1916 target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1919 target_exprs_owned.push_back(target_expr);
1920 target_exprs.push_back(target_expr.get());
1922 return target_exprs;
1926 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1927 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1928 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1929 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1932 std::vector<Analyzer::Expr*> target_exprs;
1933 size_t group_key_idx = 1;
1934 for (
const auto& groupby_expr : groupby_exprs) {
1937 target_exprs_owned.push_back(target_expr);
1938 target_exprs.push_back(target_expr.get());
1941 for (
const auto& target_rex_agg : aggregate->
getAggExprs()) {
1945 target_expr =
fold_expr(target_expr.get());
1946 target_exprs_owned.push_back(target_expr);
1947 target_exprs.push_back(target_expr.get());
1949 return target_exprs;
1959 if (agg_expr && agg_expr->get_contains_agg()) {
1975 }
else if (
is_agg(expr)) {
1984 const std::vector<Analyzer::Expr*>& target_exprs) {
1985 std::vector<TargetMetaInfo> targets_meta;
1986 CHECK_EQ(ra_node->size(), target_exprs.size());
1987 for (
size_t i = 0; i < ra_node->size(); ++i) {
1988 CHECK(target_exprs[i]);
1991 targets_meta.emplace_back(
1992 ra_node->getFieldName(i), ti, target_exprs[i]->get_type_info());
1994 return targets_meta;
2000 const std::vector<Analyzer::Expr*>& target_exprs) {
2002 if (
auto const* input = dynamic_cast<RelCompound const*>(input0)) {
2004 }
else if (
auto const* input = dynamic_cast<RelProject const*>(input0)) {
2006 }
else if (
auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
2008 }
else if (
auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
2010 }
else if (
auto const* input = dynamic_cast<RelScan const*>(input0)) {
2012 }
else if (
auto const* input = dynamic_cast<RelLogicalValues const*>(input0)) {
2025 const int64_t queue_time_ms) {
2033 auto execute_update_for_node = [
this, &co, &eo_in](
const auto node,
2035 const bool is_aggregate) {
2036 auto table_descriptor = node->getModifiedTableDescriptor();
2037 CHECK(table_descriptor);
2038 if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2039 throw std::runtime_error(
2040 "UPDATE queries involving variable length columns are only supported on tables "
2041 "with the vacuum attribute set to 'delayed'");
2044 auto catalog = node->getModifiedTableCatalog();
2049 std::make_unique<UpdateTransactionParameters>(table_descriptor,
2051 node->getTargetColumns(),
2052 node->getOutputMetainfo(),
2053 node->isVarlenUpdateRequired());
2057 auto execute_update_ra_exe_unit =
2058 [
this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2063 if (dml_transaction_parameters_->tableIsTemporary()) {
2064 eo.output_columnar_hint =
true;
2071 dml_transaction_parameters_.get());
2073 CHECK(update_transaction_parameters);
2076 auto table_update_metadata =
2087 dml_transaction_parameters_->finalizeTransaction(*catalog);
2089 dml_transaction_parameters_->getTableDescriptor(),
executor_, *catalog};
2090 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2097 if (dml_transaction_parameters_->tableIsTemporary()) {
2099 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2103 auto update_transaction_params =
2105 CHECK(update_transaction_params);
2106 const auto td = update_transaction_params->getTableDescriptor();
2108 const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2109 if (update_column_names.size() > 1) {
2110 throw std::runtime_error(
2111 "Multi-column update is not yet supported for temporary tables.");
2115 catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2117 auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2121 const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2122 work_unit.exe_unit, projected_column_to_update);
2123 if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2124 throw std::runtime_error(
2125 "Variable length updates not yet supported on temporary tables.");
2127 execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2129 execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2133 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2136 execute_update_for_node(compound, work_unit, compound->isAggregate());
2137 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2140 if (project->isSimple()) {
2141 CHECK_EQ(
size_t(1), project->inputCount());
2142 const auto input_ra = project->getInput(0);
2143 if (dynamic_cast<const RelSort*>(input_ra)) {
2144 const auto& input_table =
2147 work_unit.exe_unit.scan_limit = input_table->rowCount();
2150 if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2165 throw std::runtime_error(
2166 "Update query having window function is not yet supported in distributed "
2171 computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2173 execute_update_for_node(project, work_unit,
false);
2175 throw std::runtime_error(
"Unsupported parent node for update: " +
2183 const int64_t queue_time_ms) {
2187 auto execute_delete_for_node = [
this, &co, &eo_in](
const auto node,
2189 const bool is_aggregate) {
2190 auto* table_descriptor = node->getModifiedTableDescriptor();
2191 CHECK(table_descriptor);
2192 if (!table_descriptor->hasDeletedCol) {
2193 throw std::runtime_error(
2194 "DELETE queries are only supported on tables with the vacuum attribute set to "
2198 const auto catalog = node->getModifiedTableCatalog();
2204 auto execute_delete_ra_exe_unit =
2205 [
this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2206 const auto& exe_unit,
const bool is_aggregate) {
2208 std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2211 CHECK(delete_params);
2217 eo.output_columnar_hint =
true;
2221 CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2225 auto table_update_metadata =
2239 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2247 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2248 const auto cd = catalog->getDeletedColumn(table_descriptor);
2250 auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2253 catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2255 const auto rewritten_exe_unit =
2256 query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2257 execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2259 execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2263 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2265 execute_delete_for_node(compound, work_unit, compound->isAggregate());
2266 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2268 if (project->isSimple()) {
2269 CHECK_EQ(
size_t(1), project->inputCount());
2270 const auto input_ra = project->getInput(0);
2271 if (dynamic_cast<const RelSort*>(input_ra)) {
2272 const auto& input_table =
2275 work_unit.exe_unit.scan_limit = input_table->rowCount();
2278 execute_delete_for_node(project, work_unit,
false);
2280 throw std::runtime_error(
"Unsupported parent node for delete: " +
2289 const int64_t queue_time_ms) {
2306 const int64_t queue_time_ms) {
2336 const int64_t queue_time_ms,
2337 const std::optional<size_t> previous_count) {
2343 const auto input_ra = project->
getInput(0);
2344 if (dynamic_cast<const RelSort*>(input_ra)) {
2346 const auto& input_table =
2349 work_unit.exe_unit.scan_limit =
2350 std::min(input_table->getLimit(), input_table->rowCount());
2366 const int64_t queue_time_ms) {
2373 throw std::runtime_error(
"Table functions not supported in distributed mode yet");
2376 throw std::runtime_error(
"Table function support is disabled");
2382 const auto body = table_func_work_unit.body;
2385 const auto table_infos =
2397 auto const use_resultset_recycler =
2399 if (use_resultset_recycler) {
2400 auto cached_resultset =
2401 executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
2403 if (cached_resultset) {
2404 VLOG(1) <<
"recycle table function's resultset of the root node "
2406 result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2415 table_func_work_unit.exe_unit, table_infos, co, eo),
2416 body->getOutputMetainfo()};
2420 throw std::runtime_error(
"Table function ran out of memory during execution");
2422 auto query_exec_time =
timer_stop(query_exec_time_begin);
2423 result.setQueueTime(queue_time_ms);
2424 auto resultset_ptr =
result.getDataPtr();
2425 if (use_resultset_recycler) {
2426 resultset_ptr->setExecTime(query_exec_time);
2427 resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2428 resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2430 resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2431 auto allow_auto_caching_resultset =
2432 resultset_ptr && resultset_ptr->hasValidBuffer() &&
2434 resultset_ptr->getBufferSizeBytes(co.device_type) <=
2437 allow_auto_caching_resultset) {
2438 if (allow_auto_caching_resultset) {
2439 VLOG(1) <<
"Automatically keep table function's query resultset to recycler";
2441 executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
2442 table_func_work_unit.exe_unit.query_plan_dag_hash,
2443 resultset_ptr->getInputTableKeys(),
2445 resultset_ptr->getBufferSizeBytes(co.device_type),
2451 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since we do not "
2452 "support resultset recycling on distributed mode";
2454 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2455 "has union-(all) operator";
2457 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2458 "is either validate or explain query";
2460 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored";
2477 std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2478 for (
const auto& element : tuple->getTuple()) {
2481 return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2485 throw std::runtime_error(
"Only columns supported in the window partition for now");
2487 return makeExpr<Analyzer::ColumnVar>(col->get_type_info(), col->getColumnKey(), 1);
2496 const int64_t queue_time_ms) {
2498 CHECK_EQ(query_infos.size(), size_t(1));
2499 if (query_infos.front().info.fragments.size() != 1) {
2500 throw std::runtime_error(
2501 "Only single fragment tables supported for window functions for now");
2506 query_infos.push_back(query_infos.front());
2515 std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2516 std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2517 sorted_partition_cache;
2518 std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2519 std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2520 window_function_context_map;
2521 std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2531 std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2532 if (partition_keys.size() >= 1) {
2533 std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2534 if (partition_keys.size() > 1) {
2535 partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2537 CHECK_EQ(partition_keys.size(), size_t(1));
2538 partition_key_tuple = partition_keys.front();
2541 partition_key_cond =
2542 makeExpr<Analyzer::BinOper>(
kBOOLEAN,
2545 partition_key_tuple,
2550 partition_key_cond ,
2552 sorted_partition_key_ref_count_map,
2558 CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2561 for (
auto& kv : window_function_context_map) {
2563 sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2564 window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2570 const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2571 std::unordered_map<
QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2572 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2574 const std::vector<InputTableInfo>& query_infos,
2577 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2578 const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2582 std::unique_ptr<WindowFunctionContext> context;
2588 if (partition_key_cond) {
2589 auto partition_cond_str = partition_key_cond->toString();
2591 boost::hash_combine(partition_cache_key, partition_key_hash);
2592 boost::hash_combine(partition_cache_key, static_cast<int>(window_partition_type));
2593 std::shared_ptr<HashJoin> partition_ptr;
2594 auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2595 if (cached_hash_table_it != partition_cache.end()) {
2596 partition_ptr = cached_hash_table_it->second;
2597 VLOG(1) <<
"Reuse a hash table to compute window function context (key: "
2598 << partition_cache_key <<
", partition condition: " << partition_cond_str
2601 const auto hash_table_or_err =
executor_->buildHashTableForQualifier(
2605 window_partition_type,
2611 if (!hash_table_or_err.fail_reason.empty()) {
2612 throw std::runtime_error(hash_table_or_err.fail_reason);
2615 partition_ptr = hash_table_or_err.hash_table;
2616 CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2618 VLOG(1) <<
"Put a generated hash table for computing window function context to "
2620 << partition_cache_key <<
", partition condition: " << partition_cond_str
2623 CHECK(partition_ptr);
2627 VLOG(1) <<
"Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2629 context = std::make_unique<WindowFunctionContext>(window_func,
2630 partition_cache_key,
2635 aggregate_tree_fanout);
2637 context = std::make_unique<WindowFunctionContext>(
2638 window_func, elem_count, co.
device_type, row_set_mem_owner);
2641 if (!order_keys.empty()) {
2642 auto sorted_partition_cache_key = partition_cache_key;
2643 for (
auto& order_key : order_keys) {
2644 boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2647 boost::hash_combine(sorted_partition_cache_key, collation.toString());
2649 context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2650 auto cache_key_cnt_it =
2651 sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2652 if (!cache_key_cnt_it.second) {
2653 sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2654 cache_key_cnt_it.first->second + 1;
2657 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2658 for (
const auto& order_key : order_keys) {
2659 const auto order_col =
2662 throw std::runtime_error(
"Only order by columns supported for now");
2664 auto const [column, col_elem_count] =
2667 query_infos.front().info.fragments.front(),
2675 CHECK_EQ(col_elem_count, elem_count);
2676 context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2679 if (context->getWindowFunction()->hasFraming() ||
2680 context->getWindowFunction()->isMissingValueFillingFunction()) {
2684 auto& window_function_expression_args = window_func->
getArgs();
2685 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2686 for (
auto& expr : window_function_expression_args) {
2687 if (
const auto arg_col_var =
2688 std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2692 query_infos.front().info.fragments.front(),
2699 CHECK_EQ(col_elem_count, elem_count);
2700 context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2711 const int64_t queue_time_ms) {
2715 work_unit, filter->
getOutputMetainfo(),
false, co, eo, render_info, queue_time_ms);
2719 std::vector<TargetMetaInfo>
const& rhs) {
2720 if (lhs.size() == rhs.size()) {
2721 for (
size_t i = 0; i < lhs.size(); ++i) {
2722 if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2740 const int64_t queue_time_ms) {
2742 if (!logical_union->
isAll()) {
2743 throw std::runtime_error(
"UNION without ALL is not supported yet.");
2748 throw std::runtime_error(
"UNION does not support subqueries with geo-columns.");
2768 for (
size_t i = 0; i < tuple_type.size(); ++i) {
2769 auto& target_meta_info = tuple_type[i];
2770 if (target_meta_info.get_type_info().get_type() ==
kNULLT) {
2776 {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2780 std::vector<TargetInfo> target_infos;
2781 for (
const auto& tuple_type_component : tuple_type) {
2784 tuple_type_component.get_type_info(),
2790 std::shared_ptr<ResultSet> rs{
2799 return {rs, tuple_type};
2806 const std::string& columnName,
2817 CHECK(dd && dd->stringDict);
2818 int32_t str_id = dd->stringDict->getOrAdd(str);
2819 if (!dd->dictIsTemp) {
2820 const auto checkpoint_ok = dd->stringDict->checkpoint();
2821 if (!checkpoint_ok) {
2822 throw std::runtime_error(
"Failed to checkpoint dictionary for column " +
2826 const bool invalid = str_id > max_valid_int_value<T>();
2827 if (invalid || str_id == inline_int_null_value<int32_t>()) {
2829 LOG(
ERROR) <<
"Could not encode string: " << str
2830 <<
", the encoded value doesn't fit in " <<
sizeof(
T) * 8
2831 <<
" bits. Will store NULL instead.";
2854 throw std::runtime_error(
"EXPLAIN not supported for ModifyTable");
2864 std::vector<TargetMetaInfo> empty_targets;
2865 return {rs, empty_targets};
2880 size_t rows_number = values_lists.size();
2885 size_t rows_per_leaf = rows_number;
2886 if (td->nShards == 0) {
2888 ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2890 auto max_number_of_rows_per_package =
2891 std::max(
size_t(1), std::min(rows_per_leaf,
size_t(64 * 1024)));
2893 std::vector<const ColumnDescriptor*> col_descriptors;
2894 std::vector<int> col_ids;
2895 std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2896 std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2897 std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2898 std::unordered_map<int, int> sequential_ids;
2900 for (
const int col_id : col_id_list) {
2903 if (cd->columnType.is_string()) {
2907 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2908 CHECK(it_ok.second);
2912 const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2914 const auto it_ok = col_buffers.emplace(
2916 std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2917 max_number_of_rows_per_package));
2918 CHECK(it_ok.second);
2924 }
else if (cd->columnType.is_geometry()) {
2926 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2927 CHECK(it_ok.second);
2928 }
else if (cd->columnType.is_array()) {
2930 arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2931 CHECK(it_ok.second);
2933 const auto it_ok = col_buffers.emplace(
2935 std::unique_ptr<uint8_t[]>(
new uint8_t[cd->columnType.get_logical_size() *
2936 max_number_of_rows_per_package]()));
2937 CHECK(it_ok.second);
2939 col_descriptors.push_back(cd);
2940 sequential_ids[col_id] = col_ids.size();
2941 col_ids.push_back(col_id);
2946 size_t start_row = 0;
2947 size_t rows_left = rows_number;
2948 while (rows_left != 0) {
2950 for (
const auto& kv : col_buffers) {
2951 memset(kv.second.get(), 0, max_number_of_rows_per_package);
2953 for (
auto& kv : str_col_buffers) {
2956 for (
auto& kv : arr_col_buffers) {
2960 auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2965 for (
size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2966 const auto& values_list = values_lists[row_idx + start_row];
2967 for (
size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2968 CHECK(values_list.size() == col_descriptors.size());
2973 dynamic_cast<const Analyzer::UOper*
>(values_list[col_idx]->get_expr());
2979 const auto cd = col_descriptors[col_idx];
2980 auto col_datum = col_cv->get_constval();
2981 auto col_type = cd->columnType.get_type();
2982 uint8_t* col_data_bytes{
nullptr};
2983 if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2984 (!cd->columnType.is_string() ||
2986 const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2987 CHECK(col_data_bytes_it != col_buffers.end());
2988 col_data_bytes = col_data_bytes_it->second.get();
2992 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
2993 auto null_bool_val =
2995 col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2997 : (col_datum.boolval ? 1 : 0);
3001 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
3002 col_data[row_idx] = col_cv->get_is_null()
3004 : col_datum.tinyintval;
3008 auto col_data =
reinterpret_cast<int16_t*
>(col_data_bytes);
3009 col_data[row_idx] = col_cv->get_is_null()
3011 : col_datum.smallintval;
3015 auto col_data =
reinterpret_cast<int32_t*
>(col_data_bytes);
3016 col_data[row_idx] = col_cv->get_is_null()
3024 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
3025 col_data[row_idx] = col_cv->get_is_null()
3027 : col_datum.bigintval;
3031 auto col_data =
reinterpret_cast<float*
>(col_data_bytes);
3032 col_data[row_idx] = col_datum.floatval;
3036 auto col_data =
reinterpret_cast<double*
>(col_data_bytes);
3037 col_data[row_idx] = col_datum.doubleval;
3043 switch (cd->columnType.get_compression()) {
3045 str_col_buffers[col_ids[col_idx]].push_back(
3046 col_datum.stringval ? *col_datum.stringval :
"");
3049 switch (cd->columnType.get_size()) {
3052 &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3059 &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3066 &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3084 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
3085 col_data[row_idx] = col_cv->get_is_null()
3087 : col_datum.bigintval;
3091 const auto is_null = col_cv->get_is_null();
3092 const auto size = cd->columnType.get_size();
3095 const auto is_point_coords =
3097 if (
is_null && !is_point_coords) {
3101 for (int8_t* p = buf + elem_ti.
get_size(); (p - buf) < size;
3103 put_null(static_cast<void*>(p), elem_ti,
"");
3105 arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf,
is_null);
3107 arr_col_buffers[col_ids[col_idx]].emplace_back(0,
nullptr,
is_null);
3111 const auto l = col_cv->get_value_list();
3112 size_t len = l.size() * elem_ti.
get_size();
3113 if (size > 0 && static_cast<size_t>(size) != len) {
3114 throw std::runtime_error(
"Array column " + cd->columnName +
" expects " +
3116 " values, " +
"received " +
3124 int32_t* p =
reinterpret_cast<int32_t*
>(buf);
3131 &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3155 if (col_datum.stringval && col_datum.stringval->empty()) {
3156 throw std::runtime_error(
3157 "Empty values are not allowed for geospatial column \"" +
3158 cd->columnName +
"\"");
3160 str_col_buffers[col_ids[col_idx]].push_back(
3161 col_datum.stringval ? *col_datum.stringval :
"");
3169 start_row += package_size;
3170 rows_left -= package_size;
3173 insert_data.
databaseId = catalog.getCurrentDB().dbId;
3174 insert_data.
tableId = table_id;
3175 insert_data.
data.resize(col_ids.size());
3177 for (
const auto& kv : col_buffers) {
3179 p.
numbersPtr =
reinterpret_cast<int8_t*
>(kv.second.get());
3180 insert_data.
data[sequential_ids[kv.first]] = p;
3182 for (
auto& kv : str_col_buffers) {
3185 insert_data.
data[sequential_ids[kv.first]] = p;
3187 for (
auto& kv : arr_col_buffers) {
3190 insert_data.
data[sequential_ids[kv.first]] = p;
3192 insert_data.
numRows = package_size;
3203 std::vector<TargetMetaInfo> empty_targets;
3204 return {rs, empty_targets};
3210 return limit ? *limit : 0;
3214 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
3218 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
3219 return (compound && compound->isAggregate()) ? 0 :
get_limit_value(limit);
3223 return !order_entries.empty() && order_entries.front().is_desc;
3236 const int64_t queue_time_ms) {
3239 const auto source = sort->
getInput(0);
3246 executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3249 auto& aggregated_result = it->second;
3250 auto& result_rows = aggregated_result.rs;
3252 const size_t offset = sort->
getOffset();
3253 if (limit || offset) {
3254 if (!order_entries.empty()) {
3258 result_rows->dropFirstN(offset);
3270 source_work_unit.exe_unit.target_exprs,
3271 aggregated_result.targets_meta);
3280 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3281 bool is_desc{
false};
3282 bool use_speculative_top_n_sort{
false};
3284 auto execute_sort_query = [
this,
3296 std::optional<size_t> limit = sort->
getLimit();
3297 const size_t offset = sort->
getOffset();
3299 auto source_node = sort->
getInput(0);
3303 SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3305 source_node->getQueryPlanDagHash(), sort_info);
3308 if (
auto cached_resultset =
3309 executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
3310 source_query_plan_dag)) {
3311 CHECK(cached_resultset->canUseSpeculativeTopNSort());
3312 VLOG(1) <<
"recycle resultset of the root node " << source_node->getRelNodeDagId()
3313 <<
" from resultset cache";
3315 ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3319 use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3321 source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3325 if (!source_result.getDataPtr()) {
3334 VLOG(1) <<
"Punt sort's input query to CPU: detect union(-all) of none-encoded "
3338 groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3340 source->getOutputMetainfo(),
3346 use_speculative_top_n_sort =
3347 source_result.
getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3349 source_result.getRows()->getQueryMemDesc());
3351 if (render_info && render_info->isInSitu()) {
3352 return source_result;
3354 if (source_result.isFilterPushDownEnabled()) {
3355 return source_result;
3357 auto rows_to_sort = source_result.getRows();
3358 if (eo.just_explain) {
3359 return {rows_to_sort, {}};
3362 if (sort->
collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3363 !use_speculative_top_n_sort) {
3364 const size_t top_n = limit_val + offset;
3365 rows_to_sort->sort(order_entries, top_n, co.device_type,
executor_);
3367 if (limit || offset) {
3369 if (offset >= rows_to_sort->rowCount()) {
3370 rows_to_sort->dropFirstN(offset);
3372 rows_to_sort->keepFirstN(limit_val + offset);
3375 rows_to_sort->dropFirstN(offset);
3377 rows_to_sort->keepFirstN(limit_val);
3381 return {rows_to_sort, source_result.getTargetsMeta()};
3385 return execute_sort_query();
3387 CHECK_EQ(
size_t(1), groupby_exprs.size());
3388 CHECK(groupby_exprs.front());
3390 return execute_sort_query();
3396 std::list<Analyzer::OrderEntry>& order_entries,
3398 const auto source = sort->
getInput(0);
3400 const size_t offset = sort->
getOffset();
3402 const size_t scan_total_limit =
3404 size_t max_groups_buffer_entry_guess{
3407 SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3409 const auto& source_exe_unit = source_work_unit.exe_unit;
3412 for (
auto order_entry : order_entries) {
3414 const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3416 if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3417 throw std::runtime_error(
3418 "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3422 if (source_exe_unit.groupby_exprs.size() == 1) {
3423 if (!source_exe_unit.groupby_exprs.front()) {
3437 std::move(source_exe_unit.input_col_descs),
3438 source_exe_unit.simple_quals,
3439 source_exe_unit.quals,
3440 source_exe_unit.join_quals,
3441 source_exe_unit.groupby_exprs,
3442 source_exe_unit.target_exprs,
3443 source_exe_unit.target_exprs_original_type_infos,
3445 {sort_info.order_entries, sort_algorithm, limit, offset},
3447 source_exe_unit.query_hint,
3448 source_exe_unit.query_plan_dag_hash,
3449 source_exe_unit.hash_table_build_plan_dag,
3450 source_exe_unit.table_id_to_node_map,
3451 source_exe_unit.use_bump_allocator,
3452 source_exe_unit.union_all,
3453 source_exe_unit.query_state},
3455 max_groups_buffer_entry_guess,
3456 std::move(source_work_unit.query_rewriter),
3457 source_work_unit.input_permutation,
3458 source_work_unit.left_deep_join_input_sizes};
3470 const std::vector<InputTableInfo>& table_infos) {
3471 CHECK(!table_infos.empty());
3472 const auto& first_table = table_infos.front();
3473 size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3474 auto table_key = first_table.table_key;
3475 for (
const auto& table_info : table_infos) {
3476 if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3477 max_num_groups = table_info.info.getNumTuplesUpperBound();
3478 table_key = table_info.table_key;
3481 return std::make_pair(std::max(max_num_groups,
size_t(1)), table_key);
3494 if (render_info && render_info->
isInSitu()) {
3501 bool flatbuffer_is_used =
false;
3502 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
3503 const auto ti = target_expr->get_type_info();
3505 if (ti.usesFlatBuffer()) {
3506 flatbuffer_is_used =
true;
3512 if (ti.is_varlen()) {
3516 if (
auto top_project = dynamic_cast<const RelProject*>(body)) {
3517 if (top_project->isRowwiseOutputForced()) {
3518 if (flatbuffer_is_used) {
3519 throw std::runtime_error(
3520 "Cannot force rowwise output when FlatBuffer layout is used.");
3529 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
3530 if (target_expr->get_type_info().usesFlatBuffer()) {
3546 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
3547 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3553 preflight_count_query_threshold =
3555 VLOG(1) <<
"Set the pre-flight count query's threshold as "
3556 << preflight_count_query_threshold <<
" by a query hint";
3560 ra_exe_unit.
scan_limit > preflight_count_query_threshold)) {
3567 return !(ra_exe_unit.
quals.empty() && ra_exe_unit.
join_quals.empty() &&
3573 const std::vector<InputTableInfo>& table_infos,
3574 const Executor* executor,
3576 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3578 for (
size_t i = 0; i < ra_exe_unit.
target_exprs.size(); ++i) {
3584 CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3587 const auto& arg_ti = arg->get_type_info();
3593 if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3594 (arg_ti.is_string() && arg_ti.get_compression() ==
kENCODING_DICT))) {
3605 const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3606 const auto sub_bitmap_count =
3608 int64_t approx_bitmap_sz_bits{0};
3609 const auto error_rate_expr =
static_cast<Analyzer::AggExpr*
>(target_expr)->get_arg1();
3610 if (error_rate_expr) {
3611 CHECK(error_rate_expr->get_type_info().get_type() ==
kINT);
3612 auto const error_rate =
3615 CHECK_GE(error_rate->get_constval().intval, 1);
3621 arg_range.getIntMin(),
3623 approx_bitmap_sz_bits,
3628 arg_range.getIntMin(),
3634 if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3635 precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3636 auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3638 target_exprs_owned.push_back(precise_count_distinct);
3639 ra_exe_unit.
target_exprs[i] = precise_count_distinct.get();
3656 const std::vector<TargetMetaInfo>& targets_meta,
3661 const int64_t queue_time_ms,
3662 const std::optional<size_t> previous_count) {
3673 ScopeGuard clearWindowContextIfNecessary = [&]() {
3680 throw std::runtime_error(
"Window functions support is disabled");
3683 co.allow_lazy_fetch =
false;
3684 computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3686 if (!eo.just_explain && eo.find_push_down_candidates) {
3688 VLOG(1) <<
"Try to find filter predicate push-down candidate.";
3690 if (!selected_filters.empty() || eo.just_calcite_explain) {
3691 VLOG(1) <<
"Found " << selected_filters.size()
3692 <<
" filter(s) to be pushed down. Re-create a query plan based on pushed "
3693 "filter predicate(s).";
3694 return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3696 VLOG(1) <<
"Continue with the current query plan";
3698 if (render_info && render_info->
isInSitu()) {
3699 co.allow_lazy_fetch =
false;
3701 const auto body = work_unit.
body;
3704 VLOG(3) <<
"body->getId()=" << body->getId()
3706 <<
" it==leaf_results_.end()=" << (it ==
leaf_results_.end());
3710 auto& aggregated_result = it->second;
3711 auto& result_rows = aggregated_result.rs;
3713 body->setOutputMetainfo(aggregated_result.targets_meta);
3727 auto candidate =
query_dag_->getQueryHint(body);
3729 ra_exe_unit.query_hint = *candidate;
3733 const auto& query_hints = ra_exe_unit.query_hint;
3735 orig_block_size =
executor_->blockSize(),
3736 orig_grid_size =
executor_->gridSize()]() {
3737 if (
executor_->getDataMgr()->getCudaMgr()) {
3739 if (orig_block_size) {
3740 executor_->setBlockSize(orig_block_size);
3746 if (orig_grid_size) {
3757 if (!
executor_->getDataMgr()->getCudaMgr()) {
3758 VLOG(1) <<
"Skip CUDA grid size query hint: cannot detect CUDA device";
3760 const auto num_sms =
executor_->cudaMgr()->getMinNumMPsForAllDevices();
3761 const auto new_grid_size =
static_cast<unsigned>(
3762 std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3763 const auto default_grid_size =
executor_->gridSize();
3764 if (new_grid_size != default_grid_size) {
3765 VLOG(1) <<
"Change CUDA grid size: " << default_grid_size
3766 <<
" (default_grid_size) -> " << new_grid_size <<
" (# SMs * "
3767 << query_hints.cuda_grid_size_multiplier <<
")";
3771 VLOG(1) <<
"Skip CUDA grid size query hint: invalid grid size";
3776 if (!
executor_->getDataMgr()->getCudaMgr()) {
3777 VLOG(1) <<
"Skip CUDA block size query hint: cannot detect CUDA device";
3779 int cuda_block_size = query_hints.cuda_block_size;
3781 if (cuda_block_size >= warp_size) {
3782 cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3783 VLOG(1) <<
"Change CUDA block size w.r.t warp size (" << warp_size
3784 <<
"): " <<
executor_->blockSize() <<
" -> " << cuda_block_size;
3786 VLOG(1) <<
"Change CUDA block size: " <<
executor_->blockSize() <<
" -> "
3789 executor_->setBlockSize(cuda_block_size);
3796 CHECK_EQ(table_infos.size(), size_t(1));
3797 CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3798 max_groups_buffer_entry_guess =
3799 table_infos.front().info.fragments.front().getNumTuples();
3800 ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3803 ra_exe_unit.scan_limit = *previous_count;
3807 ra_exe_unit.scan_limit = 0;
3808 ra_exe_unit.use_bump_allocator =
true;
3810 ra_exe_unit.scan_limit = 0;
3811 }
else if (!eo.just_explain) {
3813 if (filter_count_all) {
3814 ra_exe_unit.scan_limit = std::max(*filter_count_all,
size_t(1));
3815 VLOG(1) <<
"Set a new scan limit from filtered_count_all: "
3816 << ra_exe_unit.scan_limit;
3817 auto const has_limit_value = ra_exe_unit.sort_info.limit.has_value();
3818 auto const top_k_sort_query =
3819 has_limit_value && !ra_exe_unit.sort_info.order_entries.empty();
3822 if (has_limit_value && !top_k_sort_query &&
3823 ra_exe_unit.scan_limit > ra_exe_unit.sort_info.limit.value()) {
3824 ra_exe_unit.scan_limit = ra_exe_unit.sort_info.limit.value();
3825 VLOG(1) <<
"Override scan limit to LIMIT value: " << ra_exe_unit.scan_limit;
3837 VLOG(1) <<
"Using columnar layout for projection as output size of "
3838 << ra_exe_unit.scan_limit <<
" rows exceeds threshold of "
3840 <<
" or some target uses FlatBuffer memory layout.";
3841 eo.output_columnar_hint =
true;
3844 eo.output_columnar_hint =
false;
3855 auto execute_and_handle_errors = [&](
const auto max_groups_buffer_entry_guess_in,
3856 const bool has_cardinality_estimation,
3861 auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3863 return {
executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3870 has_cardinality_estimation,
3879 {ra_exe_unit, work_unit.
body, local_groups_buffer_entry_guess},
3892 for (
const auto& table_info : table_infos) {
3893 const auto db_id = table_info.table_key.db_id;
3896 if (td && (td->isTemporaryTable() || td->isView)) {
3897 use_resultset_cache =
false;
3898 if (eo.keep_result) {
3899 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has either "
3900 "temporary table or view";
3908 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3909 auto card = cached_cardinality.second;
3910 if (cached_cardinality.first && card >= 0) {
3911 VLOG(1) <<
"Use cached cardinality for max_groups_buffer_entry_guess: " << card;
3912 result = execute_and_handle_errors(
3915 VLOG(1) <<
"Use default cardinality for max_groups_buffer_entry_guess: "
3916 << max_groups_buffer_entry_guess;
3917 result = execute_and_handle_errors(
3918 max_groups_buffer_entry_guess,
3924 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3925 auto card = cached_cardinality.second;
3926 if (cached_cardinality.first && card >= 0) {
3927 VLOG(1) <<
"CardinalityEstimationRequired, Use cached cardinality for "
3928 "max_groups_buffer_entry_guess: "
3930 result = execute_and_handle_errors(card,
true,
true);
3932 const auto ndv_groups_estimation =
3936 ndv_groups_estimator_multiplier = query_hints.ndv_groups_estimator_multiplier;
3937 VLOG(1) <<
"Modify NDV groups estimator multiplier: "
3939 << ndv_groups_estimator_multiplier;
3941 const auto estimated_groups_buffer_entry_guess =
3942 ndv_groups_estimation > 0
3943 ?
static_cast<size_t>(
static_cast<double>(ndv_groups_estimation) *
3944 ndv_groups_estimator_multiplier)
3947 CHECK_GT(estimated_groups_buffer_entry_guess,
size_t(0));
3948 VLOG(1) <<
"CardinalityEstimationRequired, Use ndv_estimation: "
3949 << ndv_groups_estimation
3950 <<
", cardinality for estimated_groups_buffer_entry_guess: "
3951 << estimated_groups_buffer_entry_guess;
3952 result = execute_and_handle_errors(
3953 estimated_groups_buffer_entry_guess,
true,
true);
3954 if (!(eo.just_validate || eo.just_explain)) {
3955 executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3960 result.setQueueTime(queue_time_ms);
3965 return {std::make_shared<ResultSet>(
3969 ?
executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3975 for (
auto& target_info :
result.getTargetsMeta()) {
3976 if (target_info.get_type_info().is_string() &&
3977 !target_info.get_type_info().is_dict_encoded_string()) {
3979 use_resultset_cache =
false;
3980 if (eo.keep_result) {
3981 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has non-encoded "
3982 "string column projection";
3989 auto query_exec_time =
timer_stop(query_exec_time_begin);
3990 res->setExecTime(query_exec_time);
3991 res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3992 res->setTargetMetaInfo(body->getOutputMetainfo());
3994 res->setInputTableKeys(std::move(input_table_keys));
3995 auto allow_auto_caching_resultset =
3998 if (eo.keep_result || allow_auto_caching_resultset) {
3999 if (allow_auto_caching_resultset) {
4000 VLOG(1) <<
"Automatically keep query resultset to recycler";
4002 res->setUseSpeculativeTopNSort(
4004 executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
4005 ra_exe_unit.query_plan_dag_hash,
4006 res->getInputTableKeys(),
4008 res->getBufferSizeBytes(co.device_type),
4012 if (eo.keep_result) {
4014 VLOG(1) <<
"Query hint \'keep_result\' is ignored since we do not support "
4015 "resultset recycling on distributed mode";
4017 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has union-(all) "
4019 }
else if (render_info && render_info->
isInSitu()) {
4020 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is classified as "
4021 "a in-situ rendering query";
4023 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is either "
4024 "validate or explain query";
4026 VLOG(1) <<
"Query hint \'keep_result\' is ignored";
4035 std::vector<InputTableInfo>
const& input_tables_info)
const {
4037 input_tables_info.begin(), input_tables_info.end(), [](
InputTableInfo const& info) {
4038 auto const& table_key = info.table_key;
4039 if (table_key.db_id > 0) {
4043 auto td = catalog->getMetadataForTable(table_key.table_id);
4045 if (catalog->getDeletedColumnIfRowsDeleted(td)) {
4055 std::string table_name{
""};
4056 if (table_key.
db_id > 0) {
4059 auto td = catalog->getMetadataForTable(table_key.
table_id);
4061 table_name = td->tableName;
4077 auto const num_rows = max_row_info.first;
4079 VLOG(1) <<
"Short-circuiting filtered count query for the projection query "
4080 "containing input table "
4081 << table_name <<
": return its table cardinality " << num_rows <<
" instead";
4082 return std::make_optional<size_t>(num_rows);
4094 VLOG(1) <<
"Try to execute pre-flight counts query";
4098 count_all_result =
executor_->executeWorkUnit(one,
4113 }
catch (
const std::exception& e) {
4114 LOG(
WARNING) <<
"Failed to run pre-flight filtered count with error " << e.what();
4115 return std::nullopt;
4117 const auto count_row = count_all_result->getNextRow(
false,
false);
4118 CHECK_EQ(
size_t(1), count_row.size());
4119 const auto& count_tv = count_row.front();
4120 const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4121 CHECK(count_scalar_tv);
4122 const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4125 auto count_upper_bound =
static_cast<size_t>(*count_ptr);
4126 return std::max(count_upper_bound,
size_t(1));
4130 const auto& ra_exe_unit = work_unit.
exe_unit;
4131 if (ra_exe_unit.input_descs.size() != 1) {
4134 const auto& table_desc = ra_exe_unit.
input_descs.front();
4138 const auto& table_key = table_desc.getTableKey();
4139 for (
const auto& simple_qual : ra_exe_unit.simple_quals) {
4140 const auto comp_expr =
4142 if (!comp_expr || comp_expr->get_optype() !=
kEQ) {
4147 if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4150 const auto rhs = comp_expr->get_right_operand();
4156 {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4157 if (cd->isVirtualCol) {
4168 const std::vector<TargetMetaInfo>& targets_meta,
4174 const int64_t queue_time_ms) {
4179 auto ra_exe_unit_in = work_unit.
exe_unit;
4192 auto eo_no_multifrag = eo;
4194 eo_no_multifrag.allow_multifrag =
false;
4195 eo_no_multifrag.find_push_down_candidates =
false;
4200 LOG(
WARNING) <<
"Multifrag query ran out of memory, retrying with multifragment "
4201 "kernels disabled.";
4214 result.setQueueTime(queue_time_ms);
4217 LOG(
WARNING) <<
"Kernel per fragment query ran out of memory, retrying on CPU.";
4231 VLOG(1) <<
"Resetting max groups buffer entry guess.";
4232 max_groups_buffer_entry_guess = 0;
4235 int iteration_ctr = -1;
4257 CHECK(max_groups_buffer_entry_guess);
4261 throw std::runtime_error(
"Query ran out of output slots in the result");
4263 max_groups_buffer_entry_guess *= 2;
4264 LOG(
WARNING) <<
"Query ran out of slots in the output buffer, retrying with max "
4265 "groups buffer entry "
4267 << max_groups_buffer_entry_guess;
4273 result.setQueueTime(queue_time_ms);
4280 LOG(
ERROR) <<
"Query execution failed with error "
4282 if (error_code == int32_t(ErrorCode::OUT_OF_GPU_MEM)) {
4286 LOG(
INFO) <<
"Query ran out of GPU memory, attempting punt to CPU";
4288 throw std::runtime_error(
4289 "Query ran out of GPU memory, unable to automatically retry on CPU");
4298 const char* code{
nullptr};
4299 const char* description{
nullptr};
4304 if (0 < error_code && error_code < int32_t(ErrorCode::N_)) {
4305 auto const ec =
static_cast<ErrorCode
>(
error_code);
4306 return {
to_string(ec), to_description(ec)};
4308 return {
nullptr,
nullptr};
4313 if (error_code < 0) {
4314 return "Ran out of slots in the query output buffer";
4318 if (errorInfo.code) {
4319 return errorInfo.code +
": "s + errorInfo.description;
4327 VLOG(1) <<
"Running post execution callback.";
4328 (*post_execution_callback_)();
4335 const auto compound =
dynamic_cast<const RelCompound*
>(node);
4339 const auto project =
dynamic_cast<const RelProject*
>(node);
4343 const auto aggregate =
dynamic_cast<const RelAggregate*
>(node);
4347 const auto filter =
dynamic_cast<const RelFilter*
>(node);
4351 LOG(
FATAL) <<
"Unhandled node type: "
4360 if (
auto join = dynamic_cast<const RelJoin*>(sink)) {
4361 return join->getJoinType();
4363 if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4371 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4372 if (!condition || condition->getOperator() !=
kOR || condition->size() != 2) {
4375 const auto equi_join_condition =
4376 dynamic_cast<const RexOperator*
>(condition->getOperand(0));
4377 if (!equi_join_condition || equi_join_condition->getOperator() !=
kEQ) {
4380 const auto both_are_null_condition =
4381 dynamic_cast<const RexOperator*
>(condition->getOperand(1));
4382 if (!both_are_null_condition || both_are_null_condition->getOperator() !=
kAND ||
4383 both_are_null_condition->size() != 2) {
4386 const auto lhs_is_null =
4387 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(0));
4388 const auto rhs_is_null =
4389 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(1));
4390 if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() !=
kISNULL ||
4391 rhs_is_null->getOperator() !=
kISNULL) {
4394 CHECK_EQ(
size_t(1), lhs_is_null->size());
4395 CHECK_EQ(
size_t(1), rhs_is_null->size());
4396 CHECK_EQ(
size_t(2), equi_join_condition->size());
4397 const auto eq_lhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(0));
4398 const auto eq_rhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(1));
4399 const auto is_null_lhs =
dynamic_cast<const RexInput*
>(lhs_is_null->getOperand(0));
4400 const auto is_null_rhs =
dynamic_cast<const RexInput*
>(rhs_is_null->getOperand(0));
4401 if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4404 std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4405 if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4407 auto lhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(0));
4408 auto rhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(1));
4409 eq_operands.emplace_back(lhs_op_copy.release());
4410 eq_operands.emplace_back(rhs_op_copy.release());
4411 return boost::make_unique<const RexOperator>(
4412 kBW_EQ, eq_operands, equi_join_condition->getType());
4419 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4420 if (condition && condition->getOperator() ==
kAND) {
4421 CHECK_GE(condition->size(), size_t(2));
4426 for (
size_t i = 1; i < condition->size(); ++i) {
4427 std::vector<std::unique_ptr<const RexScalar>> and_operands;
4428 and_operands.emplace_back(std::move(acc));
4431 boost::make_unique<const RexOperator>(
kAND, and_operands, condition->getType());
4441 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->
inputCount() - 1;
4446 auto cur_level_join_type = left_deep_join->
getJoinType(nesting_level);
4448 join_types[nesting_level - 1] = cur_level_join_type;
4456 std::vector<InputDescriptor>& input_descs,
4457 std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4459 std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4461 const std::vector<InputTableInfo>& query_infos,
4462 const Executor* executor) {
4468 if (node->isUpdateViaSelect() || node->isDeleteViaSelect()) {
4473 for (
const auto& table_info : query_infos) {
4474 if (table_info.table_key.table_id < 0) {
4483 const auto input_permutation =
4486 std::tie(input_descs, input_col_descs, std::ignore) =
4488 return input_permutation;
4493 std::vector<size_t> input_sizes;
4494 for (
size_t i = 0; i < left_deep_join->
inputCount(); ++i) {
4496 input_sizes.push_back(inputs.size());
4502 const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4503 std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4504 for (
const auto& qual : quals) {
4506 rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4508 return rewritten_quals;
4517 std::vector<InputDescriptor> input_descs;
4518 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4520 std::tie(input_descs, input_col_descs, std::ignore) =
4526 const auto left_deep_join =
4531 std::vector<size_t> input_permutation;
4532 std::vector<size_t> left_deep_join_input_sizes;
4533 std::optional<unsigned> left_deep_tree_id;
4534 if (left_deep_join) {
4535 left_deep_tree_id = left_deep_join->getId();
4538 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4540 std::find(join_types.begin(), join_types.end(),
JoinType::LEFT) ==
4544 left_deep_join_quals,
4545 input_to_nest_level,
4550 std::tie(input_descs, input_col_descs, std::ignore) =
4551 get_input_desc(compound, input_to_nest_level, input_permutation);
4553 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4558 input_to_nest_level,
4562 if (bbox_intersect_qual_info.is_reordered) {
4567 if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4568 left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4574 const auto scalar_sources =
4577 std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4579 target_exprs_type_infos,
4588 auto candidate =
query_dag_->getQueryHint(compound);
4590 query_hint = *candidate;
4598 left_deep_join_quals,
4601 target_exprs_type_infos,
4613 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4614 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4615 const auto targets_meta =
get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4618 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4619 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4620 rewritten_exe_unit.join_quals);
4624 compound, left_deep_tree_id, left_deep_trees_info,
executor_);
4625 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4626 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4628 return {rewritten_exe_unit,
4631 std::move(query_rewriter),
4633 left_deep_join_input_sizes};
4639 const auto left_deep_join =
4643 return std::make_shared<RelAlgTranslator>(
4651 const auto bin_oper =
dynamic_cast<const RexOperator*
>(qual_expr);
4652 if (!bin_oper || bin_oper->getOperator() !=
kAND) {
4655 CHECK_GE(bin_oper->size(), size_t(2));
4657 for (
size_t i = 1; i < bin_oper->size(); ++i) {
4659 lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4665 const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4667 CHECK(!factors.empty());
4668 auto acc = factors.front();
4669 for (
size_t i = 1; i < factors.size(); ++i) {
4675 template <
class QualsList>
4677 const std::shared_ptr<Analyzer::Expr>& needle) {
4678 for (
const auto& qual : haystack) {
4679 if (*qual == *needle) {
4690 const std::shared_ptr<Analyzer::Expr>& expr) {
4692 CHECK_GE(expr_terms.size(), size_t(1));
4693 const auto& first_term = expr_terms.front();
4695 std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4698 for (
const auto& first_term_factor : first_term_factors.quals) {
4700 expr_terms.size() > 1;
4701 for (
size_t i = 1; i < expr_terms.size(); ++i) {
4709 common_factors.push_back(first_term_factor);
4712 if (common_factors.empty()) {
4716 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4717 for (
const auto& term : expr_terms) {
4719 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4720 term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4721 for (
const auto& qual : term_cf.quals) {
4723 remaining_quals.push_back(qual);
4726 if (!remaining_quals.empty()) {
4732 if (remaining_terms.empty()) {
4743 const std::vector<JoinType>& join_types,
4744 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4745 const bool just_explain)
const {
4749 std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4750 for (
const auto rex_condition_component : rex_condition_cf) {
4753 translator.
translate(bw_equals ? bw_equals.get() : rex_condition_component));
4756 auto append_folded_cf_quals = [&join_condition_quals](
const auto& cf_quals) {
4757 for (
const auto& cf_qual : cf_quals) {
4758 join_condition_quals.emplace_back(
fold_expr(cf_qual.get()));
4762 append_folded_cf_quals(join_condition_cf.quals);
4763 append_folded_cf_quals(join_condition_cf.simple_quals);
4773 const std::vector<InputDescriptor>& input_descs,
4774 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4775 const bool just_explain) {
4781 std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4782 for (
size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4784 if (outer_condition) {
4785 result[rte_idx - 1].quals =
4786 makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4787 CHECK_LE(rte_idx, join_types.size());
4792 for (
const auto& qual : join_condition_quals) {
4793 if (visited_quals.count(qual)) {
4796 const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4797 if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4798 const auto it_ok = visited_quals.emplace(qual);
4799 CHECK(it_ok.second);
4800 result[rte_idx - 1].quals.push_back(qual);
4803 CHECK_LE(rte_idx, join_types.size());
4807 result[rte_idx - 1].type = join_types[rte_idx - 1];
4816 const size_t nest_level,
4817 const std::vector<TargetMetaInfo>& in_metainfo,
4818 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4821 const auto input = ra_node->
getInput(nest_level);
4822 const auto it_rte_idx = input_to_nest_level.find(input);
4823 CHECK(it_rte_idx != input_to_nest_level.end());
4824 const int rte_idx = it_rte_idx->second;
4826 std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4827 const auto scan_ra =
dynamic_cast<const RelScan*
>(input);
4829 for (
const auto& input_meta : in_metainfo) {
4830 inputs.push_back(std::make_shared<Analyzer::ColumnVar>(
4831 input_meta.get_type_info(),
4840 std::vector<std::shared_ptr<Analyzer::Expr>>
const& input) {
4841 std::vector<Analyzer::Expr*> output(input.size());
4842 auto const raw_ptr = [](
auto& shared_ptr) {
return shared_ptr.get(); };
4843 std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4852 const bool just_explain) {
4853 std::vector<InputDescriptor> input_descs;
4854 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4855 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4857 std::tie(input_descs, input_col_descs, used_inputs_owned) =
4864 const auto source = aggregate->
getInput(0);
4866 const auto scalar_sources =
4869 std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4871 target_exprs_type_infos,
4883 auto candidate =
query_dag_->getQueryHint(aggregate);
4885 query_hint = *candidate;
4897 target_exprs_type_infos,
4904 join_info.hash_table_plan_dag,
4905 join_info.table_id_to_node_map,
4918 std::vector<InputDescriptor> input_descs;
4919 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4921 std::tie(input_descs, input_col_descs, std::ignore) =
4926 const auto left_deep_join =
4931 std::vector<size_t> input_permutation;
4932 std::vector<size_t> left_deep_join_input_sizes;
4933 std::optional<unsigned> left_deep_tree_id;
4934 if (left_deep_join) {
4935 left_deep_tree_id = left_deep_join->getId();
4938 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4942 left_deep_join_quals,
4943 input_to_nest_level,
4948 std::tie(input_descs, input_col_descs, std::ignore) =
4951 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4956 input_to_nest_level,
4960 if (bbox_intersect_qual_info.is_reordered) {
4965 if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4966 left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4970 const auto target_exprs_owned =
4978 auto candidate =
query_dag_->getQueryHint(project);
4980 query_hint = *candidate;
4987 left_deep_join_quals,
5002 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
5003 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5004 const auto targets_meta =
get_targets_meta(project, rewritten_exe_unit.target_exprs);
5007 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
5008 left_deep_trees_info.emplace(left_deep_tree_id.value(),
5009 rewritten_exe_unit.join_quals);
5013 project, left_deep_tree_id, left_deep_trees_info,
executor_);
5014 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
5015 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
5017 return {rewritten_exe_unit,
5020 std::move(query_rewriter),
5022 left_deep_join_input_sizes};
5031 const int negative_node_id = -input_node->
getId();
5033 if (
auto rel_scan = dynamic_cast<const RelScan*>(input_node)) {
5034 db_id = rel_scan->getCatalog().getDatabaseId();
5036 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
5037 target_exprs.reserve(tmis.size());
5038 for (
size_t i = 0; i < tmis.size(); ++i) {
5039 target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
5040 tmis[i].get_type_info(),
5044 return target_exprs;
5053 std::vector<InputDescriptor> input_descs;
5054 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5057 std::tie(input_descs, input_col_descs, std::ignore) =
5060 auto const max_num_tuples =
5064 [](
auto max,
auto const& query_info) {
5065 return std::max(max, query_info.info.getNumTuples());
5068 VLOG(3) <<
"input_to_nest_level.size()=" << input_to_nest_level.size() <<
" Pairs are:";
5069 for (
auto& pair : input_to_nest_level) {
5071 << pair.second <<
')';
5076 std::vector<Analyzer::Expr*> target_exprs_pair[2];
5077 for (
unsigned i = 0; i < 2; ++i) {
5079 CHECK(!input_exprs_owned.empty())
5080 <<
"No metainfo found for input node(" << i <<
") "
5082 VLOG(3) <<
"i(" << i <<
") input_exprs_owned.size()=" << input_exprs_owned.size();
5083 for (
auto& input_expr : input_exprs_owned) {
5084 VLOG(3) <<
" " << input_expr->toString();
5092 <<
" target_exprs.size()=" << target_exprs_pair[0].size()
5093 <<
" max_num_tuples=" << max_num_tuples;
5101 target_exprs_pair[0],
5111 logical_union->
isAll(),
5113 target_exprs_pair[1]};
5114 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
5115 const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5118 if (
auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5121 }
else if (
auto const* node = dynamic_cast<const RelProject*>(input0)) {
5124 }
else if (
auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5127 }
else if (
auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5130 }
else if (
auto const* node = dynamic_cast<const RelScan*>(input0)) {
5133 }
else if (
auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5136 }
else if (
auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5139 }
else if (dynamic_cast<const RelSort*>(input0)) {
5140 throw QueryNotSupported(
"LIMIT and OFFSET are not currently supported with UNION.");
5145 VLOG(3) <<
"logical_union->getOutputMetainfo()="
5147 <<
" rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5148 << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5150 return {rewritten_exe_unit,
5153 std::move(query_rewriter)};
5158 const bool just_explain,
5159 const bool is_gpu) {
5160 std::vector<InputDescriptor> input_descs;
5161 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5163 std::tie(input_descs, input_col_descs, std::ignore) =
5173 const auto table_function_impl_and_type_infos = [=]() {
5179 LOG(
WARNING) <<
"createTableFunctionWorkUnit[GPU]: " << e.what()
5181 <<
" step to run on CPU.";
5189 LOG(
WARNING) <<
"createTableFunctionWorkUnit[CPU]: " << e.what();
5194 const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5195 const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5196 size_t output_row_sizing_param = 0;
5197 if (table_function_impl
5198 .hasUserSpecifiedOutputSizeParameter()) {
5199 const auto parameter_index =
5200 table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5201 CHECK_GT(parameter_index,
size_t(0));
5203 const auto parameter_expr =
5205 const auto parameter_expr_literal =
dynamic_cast<const RexLiteral*
>(parameter_expr);
5206 if (!parameter_expr_literal) {
5207 throw std::runtime_error(
5208 "Provided output buffer sizing parameter is not a literal. Only literal "
5209 "values are supported with output buffer sizing configured table "
5212 int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5213 if (literal_val < 0) {
5214 throw std::runtime_error(
"Provided output sizing parameter " +
5216 " must be positive integer.");
5218 output_row_sizing_param =
static_cast<size_t>(literal_val);
5221 output_row_sizing_param = 1;
5224 makeExpr<Analyzer::Constant>(
kINT,
false, d);
5226 input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5227 DEFAULT_ROW_MULTIPLIER_EXPR);
5229 }
else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5230 output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5235 std::vector<Analyzer::ColumnVar*> input_col_exprs;
5236 size_t input_index = 0;
5237 size_t arg_index = 0;
5238 const auto table_func_args = table_function_impl.getInputArgs();
5239 CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5240 for (
const auto& ti : table_function_type_infos) {
5241 if (ti.is_column_list()) {
5242 for (
int i = 0; i < ti.get_dimension(); i++) {
5243 auto& input_expr = input_exprs_owned[input_index];
5249 auto type_info = input_expr->get_type_info();
5250 if (ti.is_column_array()) {