46 #include <boost/algorithm/cxx11/any_of.hpp>
47 #include <boost/range/adaptor/reversed.hpp>
69 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
70 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
71 return ((compound && compound->isAggregate()) || aggregate);
77 std::unordered_set<PhysicalInput> phys_inputs2;
78 for (
auto& phi : phys_inputs) {
82 catalog->getColumnIdBySpi(phi.table_id, phi.col_id), phi.
table_id, phi.db_id});
88 std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
89 parallelism_hints_per_table;
91 const auto table_id = physical_input.table_id;
95 const auto table = catalog->getMetadataForTable(table_id,
true);
97 !table->is_in_memory_system_table) {
98 const auto col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
99 const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
100 const auto foreign_table = catalog->getForeignTable(table_id);
101 for (
const auto& fragment :
102 foreign_table->fragmenter->getFragmentsForQuery().fragments) {
105 physical_input.db_id, table_id, col_id, fragment.fragmentId};
114 if (!chunk.isChunkOnDevice(
116 parallelism_hints_per_table[{physical_input.db_id, table_id}].insert(
118 fragment.fragmentId});
123 if (!parallelism_hints_per_table.empty()) {
128 CHECK(foreign_storage_mgr);
129 foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
137 const auto table = catalog->getMetadataForTable(table_id,
false);
139 const auto spi_col_id = catalog->getColumnIdBySpi(table_id, col_id);
156 const auto info_schema_catalog =
158 CHECK(info_schema_catalog);
159 std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
161 if (info_schema_catalog->getDatabaseId() != physical_input.db_id) {
164 const auto table_id = physical_input.table_id;
165 const auto table = info_schema_catalog->getMetadataForTable(table_id,
false);
167 if (table->is_in_memory_system_table) {
168 const auto column_id =
169 info_schema_catalog->getColumnIdBySpi(table_id, physical_input.col_id);
170 system_table_columns_by_table_id[table_id].emplace_back(column_id);
174 if (!system_table_columns_by_table_id.empty() &&
179 for (
const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
182 info_schema_catalog->getDataMgr().deleteChunksWithPrefix(
183 ChunkKey{info_schema_catalog->getDatabaseId(), table_id},
192 const auto td = info_schema_catalog->getMetadataForTable(table_id);
194 CHECK(td->fragmenter);
195 auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
196 CHECK_LE(fragment_count, static_cast<size_t>(1))
197 <<
"In-memory system tables are expected to have a single fragment.";
198 if (fragment_count > 0) {
199 for (
auto column_id : column_ids) {
202 const auto cd = info_schema_catalog->getMetadataForColumn(table_id, column_id);
204 info_schema_catalog->getDatabaseId(), table_id, column_id, 0};
206 &(info_schema_catalog->getDataMgr()),
225 std::list<Analyzer::OrderEntry>
result;
228 result.emplace_back(sort_field.getField() + 1,
236 const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
237 const std::vector<TargetMetaInfo>& targets_meta) {
238 CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
240 for (
size_t i = 0; i < targets_meta.size(); ++i) {
241 render_info.
targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
242 targets_meta[i].get_resname(),
243 work_unit_target_exprs[i]->get_shared_ptr(),
256 return {left_deep_join_tree->
getId()};
261 const std::vector<unsigned>& aggregate,
262 const std::vector<unsigned>& next_result)
const override {
264 std::copy(next_result.begin(), next_result.end(), std::back_inserter(
result));
274 const size_t text_encoding_casts)
275 : text_decoding_casts(text_decoding_casts)
276 , text_encoding_casts(text_encoding_casts) {}
282 : default_disregard_casts_to_none_encoding_(
283 default_disregard_casts_to_none_encoding) {}
290 const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
291 result = aggregateResult(result, visit(u_oper->
get_operand()));
297 if (!operand_ti.is_string() && casted_ti.is_dict_encoded_string()) {
300 if (!casted_ti.is_string()) {
307 if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
310 if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
311 if (!disregard_casts_to_none_encoding) {
324 result = aggregateResult(result, visit(string_oper->
getArg(0)));
345 const auto prev_disregard_casts_to_none_encoding_state =
346 disregard_casts_to_none_encoding_;
347 const auto left_u_oper =
349 if (left_u_oper && left_u_oper->get_optype() ==
kCAST) {
350 disregard_casts_to_none_encoding_ =
false;
351 result = aggregateResult(result, visitUOper(left_u_oper));
353 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
357 const auto right_u_oper =
359 if (right_u_oper && right_u_oper->get_optype() ==
kCAST) {
360 disregard_casts_to_none_encoding_ =
false;
361 result = aggregateResult(result, visitUOper(right_u_oper));
363 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
366 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
377 const auto prev_disregard_casts_to_none_encoding_state =
378 disregard_casts_to_none_encoding_;
379 if (u_oper && u_oper->get_optype() ==
kCAST) {
380 disregard_casts_to_none_encoding_ =
true;
381 result = aggregateResult(result, visitUOper(u_oper));
382 disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
384 result = aggregateResult(result, visit(like->
get_arg()));
386 result = aggregateResult(result, visit(like->
get_like_expr()));
403 disregard_casts_to_none_encoding_ = default_disregard_casts_to_none_encoding_;
411 mutable bool disregard_casts_to_none_encoding_ =
false;
418 auto check_node_for_text_casts = [&cast_counts](
420 const bool disregard_casts_to_none_encoding) {
425 const auto this_node_cast_counts = visitor.
visit(expr);
430 for (
const auto& qual : ra_exe_unit.
quals) {
431 check_node_for_text_casts(qual.get(),
false);
433 for (
const auto& simple_qual : ra_exe_unit.
simple_quals) {
434 check_node_for_text_casts(simple_qual.get(),
false);
437 check_node_for_text_casts(groupby_expr.get(),
false);
439 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
440 check_node_for_text_casts(target_expr,
false);
442 for (
const auto& join_condition : ra_exe_unit.
join_quals) {
443 for (
const auto& join_qual : join_condition.quals) {
449 check_node_for_text_casts(join_qual.get(),
457 const std::vector<InputTableInfo>& query_infos,
462 auto const tuples_upper_bound =
466 [](
auto max,
auto const& query_info) {
467 return std::max(max, query_info.info.getNumTuples());
474 const bool has_text_casts =
475 text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
477 if (!has_text_casts) {
480 std::ostringstream oss;
481 oss <<
"Query requires one or more casts between none-encoded and dictionary-encoded "
482 <<
"strings, and the estimated table size (" << tuples_upper_bound <<
" rows) "
483 <<
"exceeds the configured watchdog none-encoded string translation limit of "
485 throw std::runtime_error(oss.str());
496 !query_for_partial_outer_frag &&
497 (!render_info || (render_info && !render_info->
isInSitu()));
515 auto lock =
executor_->acquireExecuteMutex();
526 CHECK(!ed_seq.empty());
527 if (ed_seq.size() > 1) {
535 auto exec_desc_ptr = ed_seq.getDescriptor(0);
536 CHECK(exec_desc_ptr);
537 auto& exec_desc = *exec_desc_ptr;
538 const auto body = exec_desc.getBody();
543 const auto project =
dynamic_cast<const RelProject*
>(body);
551 const auto compound =
dynamic_cast<const RelCompound*
>(body);
553 if (compound->isDeleteViaSelect()) {
555 }
else if (compound->isUpdateViaSelect()) {
558 if (compound->isAggregate()) {
562 const auto work_unit =
575 const bool just_explain_plan,
579 << static_cast<int>(
query_dag_->getBuildState());
585 auto execution_result =
588 constexpr
bool vlog_result_set_summary{
false};
589 if constexpr (vlog_result_set_summary) {
590 VLOG(1) << execution_result.getRows()->summaryToString();
594 VLOG(1) <<
"Running post execution callback.";
595 (*post_execution_callback_)();
597 return execution_result;
607 LOG(
INFO) <<
"Query unable to run in GPU mode, retrying on CPU";
618 const bool just_explain_plan,
622 auto timer_setup =
DEBUG_TIMER(
"Query pre-execution steps");
632 std::string query_session{
""};
633 std::string query_str{
"N/A"};
634 std::string query_submitted_time{
""};
637 query_session =
query_state_->getConstSessionInfo()->get_session_id();
639 query_submitted_time =
query_state_->getQuerySubmittedTime();
642 auto validate_or_explain_query =
644 auto interruptable = !render_info && !query_session.empty() &&
651 std::tie(query_session, query_str) =
executor_->attachExecutorToQuerySession(
652 query_session, query_str, query_submitted_time);
658 query_submitted_time,
659 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
664 [
this, &query_session, &interruptable, &query_submitted_time] {
668 executor_->clearQuerySessionStatus(query_session, query_submitted_time);
672 auto acquire_execute_mutex = [](Executor * executor) ->
auto {
673 auto ret = executor->acquireExecuteMutex();
678 auto lock = acquire_execute_mutex(
executor_);
687 executor_->checkPendingQueryStatus(query_session);
690 throw std::runtime_error(
"Query execution has been interrupted (pending query)");
694 throw std::runtime_error(
"Checking pending query status failed: unknown error");
697 int64_t queue_time_ms =
timer_stop(clock_begin);
710 if (just_explain_plan) {
711 std::stringstream ss;
712 std::vector<const RelAlgNode*> nodes;
713 for (
size_t i = 0; i < ed_seq.size(); i++) {
714 nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
716 size_t ctr_node_id_in_plan = nodes.size();
720 auto node_id_in_plan_tree = ctr_node_id_in_plan--;
721 body->setIdInPlanTree(node_id_in_plan_tree);
723 size_t ctr = nodes.size();
728 const auto index = ctr--;
729 const auto tabs = std::string(tab_ctr++,
'\t');
731 ss << tabs <<
std::to_string(index) <<
" : " << body->toString(config) <<
"\n";
732 if (
auto sort = dynamic_cast<const RelSort*>(body)) {
733 ss << tabs <<
" : " <<
sort->getInput(0)->toString(config) <<
"\n";
735 if (dynamic_cast<const RelProject*>(body) ||
736 dynamic_cast<const RelCompound*>(body)) {
737 if (
auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
738 ss << tabs <<
" : " <<
join->toString(config) <<
"\n";
743 if (!subqueries.empty()) {
746 for (
const auto& subquery : subqueries) {
747 const auto ra = subquery->getRelAlg();
748 ss <<
"\t" << ra->toString(config) <<
"\n";
751 auto rs = std::make_shared<ResultSet>(ss.str());
759 ed_seq, co, eo, render_info, queue_time_ms);
766 const auto subquery_ra = subquery->getRelAlg();
768 if (subquery_ra->hasContextData()) {
775 if (global_hints || local_hints) {
776 const auto subquery_rel_alg_dag = subquery_executor.
getRelAlgDag();
781 subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
786 subquery->setExecutionResult(std::make_shared<ExecutionResult>(
result));
794 return executor_->computeColRangesCache(phys_inputs);
799 return executor_->computeStringDictionaryGenerations(phys_inputs);
804 return executor_->computeTableGenerations(phys_table_ids);
816 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
818 auto sort_node =
dynamic_cast<const RelSort*
>(root_node);
824 RelLeftDeepTreeIdsCollector visitor;
825 auto left_deep_tree_ids = visitor.visit(root_node);
833 const auto source = sort->
getInput(0);
834 if (dynamic_cast<const RelSort*>(source)) {
835 throw std::runtime_error(
"Sort node not supported as input to another sort");
843 const size_t step_idx,
851 const auto sort =
dynamic_cast<const RelSort*
>(exe_desc_ptr->getBody());
853 size_t shard_count{0};
867 const auto source =
sort->getInput(0);
870 CHECK_EQ(temp_seq.size(), size_t(1));
883 merge_type(exe_desc_ptr->getBody()),
884 exe_desc_ptr->getBody()->getId(),
888 seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info,
queue_time_ms_);
893 LOG(
INFO) <<
"Retry the query via CPU mode";
895 std::make_pair(step_idx, step_idx + 1),
902 VLOG(1) <<
"Running post execution callback.";
903 (*post_execution_callback_)();
905 return query_step_result;
920 executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
921 executor_->table_generations_ = table_generations;
922 executor_->agg_col_range_cache_ = agg_col_range;
929 const int64_t queue_time_ms,
930 const bool with_existing_temp_tables) {
933 if (!with_existing_temp_tables) {
943 auto get_descriptor_count = [&seq, &eo]() ->
size_t {
958 const auto exec_desc_count = get_descriptor_count();
972 const auto cached_res =
973 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
979 const auto num_steps = exec_desc_count - 1;
980 for (
size_t i = 0; i < exec_desc_count; i++) {
981 VLOG(1) <<
"Executing query step " << i <<
" / " << num_steps;
984 seq, i, co, eo_copied, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
993 LOG(
INFO) <<
"Retrying current query step " << i <<
" / " << num_steps <<
" on CPU";
995 if (render_info && i == num_steps) {
1003 (i == num_steps) ? render_info :
nullptr,
1009 auto eo_extern = eo_copied;
1010 eo_extern.executor_type = ::ExecutorType::Extern;
1012 const auto body = exec_desc_ptr->
getBody();
1013 const auto compound =
dynamic_cast<const RelCompound*
>(body);
1014 if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
1015 LOG(
INFO) <<
"Also failed to run the query using interoperability";
1019 seq, i, co, eo_extern, (i == num_steps) ? render_info :
nullptr, queue_time_ms);
1028 const std::pair<size_t, size_t> interval,
1032 const int64_t queue_time_ms) {
1037 for (
size_t i = interval.first; i < interval.second; i++) {
1044 (i == interval.second - 1) ? render_info :
nullptr,
1054 LOG(
INFO) <<
"Retrying current query step " << i <<
" on CPU";
1056 if (render_info && i == interval.second - 1) {
1063 (i == interval.second - 1) ? render_info :
nullptr,
1072 const size_t step_idx,
1076 const int64_t queue_time_ms) {
1080 CHECK(exec_desc_ptr);
1081 auto& exec_desc = *exec_desc_ptr;
1082 const auto body = exec_desc.getBody();
1083 if (body->isNop()) {
1093 auto handle_hint = [co,
1096 this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1099 auto target_node = body;
1100 if (
auto sort_body = dynamic_cast<const RelSort*>(body)) {
1101 target_node = sort_body->getInput(0);
1104 auto columnar_output_hint_enabled =
false;
1105 auto rowwise_output_hint_enabled =
false;
1108 VLOG(1) <<
"A user forces to run the query on the CPU execution mode";
1113 VLOG(1) <<
"A user enables keeping query resultset but is skipped since data "
1114 "recycler is disabled";
1117 VLOG(1) <<
"A user enables keeping query resultset but is skipped since query "
1118 "resultset recycler is disabled";
1120 VLOG(1) <<
"A user enables keeping query resultset";
1127 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1128 "since data recycler is disabled";
1131 VLOG(1) <<
"A user enables keeping table function's resultset but is skipped "
1132 "since query resultset recycler is disabled";
1134 VLOG(1) <<
"A user enables keeping table function's resultset";
1140 VLOG(1) <<
"A user enables watchdog for this query";
1146 VLOG(1) <<
"A user disables watchdog for this query";
1152 VLOG(1) <<
"A user enables dynamic watchdog for this query";
1158 VLOG(1) <<
"A user disables dynamic watchdog for this query";
1163 std::ostringstream oss;
1164 oss <<
"A user sets query time limit to " << query_hints->query_time_limit
1169 oss <<
" (and system automatically enables dynamic watchdog to activate the "
1170 "given \"query_time_limit\" hint)";
1172 VLOG(1) << oss.str();
1175 VLOG(1) <<
"A user enables loop join";
1179 VLOG(1) <<
"A user disables loop join";
1184 VLOG(1) <<
"A user forces the maximum size of a join hash table as "
1190 VLOG(1) <<
"Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1191 "of the following query hints are given simultaneously: "
1192 "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1194 VLOG(1) <<
"A user enables optimization of cuda block and grid sizes";
1199 VLOG(1) <<
"A user forces the query to run with columnar output";
1200 columnar_output_hint_enabled =
true;
1202 VLOG(1) <<
"A user forces the query to run with rowwise output";
1203 rowwise_output_hint_enabled =
true;
1206 auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1207 ? !rowwise_output_hint_enabled
1208 : columnar_output_hint_enabled;
1209 if (
g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1210 LOG(
INFO) <<
"Currently, we do not support applying query hint to change query "
1211 "output layout in distributed mode.";
1214 return std::make_pair(co_hint_applied, eo_hint_applied);
1217 auto hint_applied = handle_hint();
1221 if (
auto cached_resultset =
1222 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1223 body->getQueryPlanDagHash())) {
1224 VLOG(1) <<
"recycle resultset of the root node " << body->getRelNodeDagId()
1225 <<
" from resultset cache";
1226 body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1228 std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1229 executor_->getRecultSetRecyclerHolder().getTargetExprs(
1230 body->getQueryPlanDagHash());
1231 std::vector<Analyzer::Expr*> copied_target_exprs;
1232 for (
const auto& expr : cached_target_exprs) {
1233 copied_target_exprs.push_back(expr.get());
1236 *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1238 exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1244 const auto compound =
dynamic_cast<const RelCompound*
>(body);
1246 if (compound->isDeleteViaSelect()) {
1247 executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1248 }
else if (compound->isUpdateViaSelect()) {
1249 executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1252 compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1253 VLOG(3) <<
"Returned from executeCompound(), addTemporaryTable("
1254 <<
static_cast<int>(-compound->getId()) <<
", ...)"
1255 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1256 << exec_desc.getResult().getDataPtr()->rowCount();
1257 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1264 const auto project =
dynamic_cast<const RelProject*
>(body);
1266 if (project->isDeleteViaSelect()) {
1267 executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1268 }
else if (project->isUpdateViaSelect()) {
1269 executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1271 std::optional<size_t> prev_count;
1277 if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1282 const auto& prev_exe_result = prev_exec_desc->getResult();
1283 const auto prev_result = prev_exe_result.getRows();
1285 prev_count = prev_result->rowCount();
1286 VLOG(3) <<
"Setting output row count for projection node to previous node ("
1287 << prev_exec_desc->getBody()->toString(
1289 <<
") to " << *prev_count;
1296 hint_applied.second,
1300 VLOG(3) <<
"Returned from executeProject(), addTemporaryTable("
1301 <<
static_cast<int>(-project->getId()) <<
", ...)"
1302 <<
" exec_desc.getResult().getDataPtr()->rowCount()="
1303 << exec_desc.getResult().getDataPtr()->rowCount();
1304 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1311 const auto aggregate =
dynamic_cast<const RelAggregate*
>(body);
1314 aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1318 const auto filter =
dynamic_cast<const RelFilter*
>(body);
1321 filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1325 const auto sort =
dynamic_cast<const RelSort*
>(body);
1328 sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1329 if (exec_desc.getResult().isFilterPushDownEnabled()) {
1336 if (logical_values) {
1338 addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1341 const auto modify =
dynamic_cast<const RelModify*
>(body);
1343 exec_desc.setResult(
executeModify(modify, hint_applied.second));
1346 const auto logical_union =
dynamic_cast<const RelLogicalUnion*
>(body);
1347 if (logical_union) {
1351 hint_applied.second,
1360 table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1364 LOG(
FATAL) <<
"Unhandled body type: "
1371 CHECK(dynamic_cast<const RelAggregate*>(body));
1372 CHECK_EQ(
size_t(1), body->inputCount());
1373 const auto input = body->getInput(0);
1374 body->setOutputMetainfo(input->getOutputMetainfo());
1380 ed.setResult({it->second, input->getOutputMetainfo()});
1390 return synthesized_physical_inputs_owned;
1394 const RexInput* rex_input)
const override {
1397 const auto scan_ra =
dynamic_cast<const RelScan*
>(input_ra);
1401 const auto col_id = rex_input->
getIndex();
1403 scan_ra->getCatalog().getMetadataForColumnBySpi(td->tableId, col_id + 1);
1404 if (cd && cd->columnType.get_physical_cols() > 0) {
1406 std::unordered_set<const RexInput*> synthesized_physical_inputs;
1407 for (
auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1408 auto physical_input =
1410 synthesized_physical_inputs_owned.emplace_back(physical_input);
1411 synthesized_physical_inputs.insert(physical_input);
1413 return synthesized_physical_inputs;
1422 const std::unordered_set<const RexInput*>& aggregate,
1423 const std::unordered_set<const RexInput*>& next_result)
const override {
1425 result.insert(next_result.begin(), next_result.end());
1434 if (
auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1437 if (
auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1441 if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1444 auto only_src = ra_node->
getInput(0);
1445 const bool is_join =
dynamic_cast<const RelJoin*
>(only_src) ||
1446 dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1447 return is_join ? only_src : ra_node;
1450 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1454 std::unordered_set<const RexInput*> used_inputs =
1455 filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1457 for (
size_t i = 0; i < sources_size; ++i) {
1458 const auto source_inputs = visitor.visit(compound->
getScalarSource(i));
1459 used_inputs.insert(source_inputs.begin(), source_inputs.end());
1461 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1462 return std::make_pair(used_inputs, used_inputs_owned);
1465 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1468 std::unordered_set<const RexInput*> used_inputs;
1469 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1470 const auto source = aggregate->
getInput(0);
1473 CHECK_GE(in_metainfo.size(), group_count);
1474 for (
size_t i = 0; i < group_count; ++i) {
1475 auto synthesized_used_input =
new RexInput(source, i);
1476 used_inputs_owned.emplace_back(synthesized_used_input);
1477 used_inputs.insert(synthesized_used_input);
1479 for (
const auto& agg_expr : aggregate->
getAggExprs()) {
1480 for (
size_t i = 0; i < agg_expr->size(); ++i) {
1481 const auto operand_idx = agg_expr->getOperand(i);
1482 CHECK_GE(in_metainfo.size(),
static_cast<size_t>(operand_idx));
1483 auto synthesized_used_input =
new RexInput(source, operand_idx);
1484 used_inputs_owned.emplace_back(synthesized_used_input);
1485 used_inputs.insert(synthesized_used_input);
1488 return std::make_pair(used_inputs, used_inputs_owned);
1491 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1494 std::unordered_set<const RexInput*> used_inputs;
1495 for (
size_t i = 0; i < project->
size(); ++i) {
1496 const auto proj_inputs = visitor.visit(project->
getProjectAt(i));
1497 used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1499 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1500 return std::make_pair(used_inputs, used_inputs_owned);
1503 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1506 std::unordered_set<const RexInput*> used_inputs;
1509 used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1511 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1512 return std::make_pair(used_inputs, used_inputs_owned);
1515 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1517 std::unordered_set<const RexInput*> used_inputs;
1518 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1520 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1521 const auto source = data_sink_node->getInput(nest_level);
1522 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
1524 CHECK(source->getOutputMetainfo().empty());
1525 for (
size_t i = 0; i < scan_source->size(); ++i) {
1526 auto synthesized_used_input =
new RexInput(scan_source, i);
1527 used_inputs_owned.emplace_back(synthesized_used_input);
1528 used_inputs.insert(synthesized_used_input);
1531 const auto& partial_in_metadata = source->getOutputMetainfo();
1532 for (
size_t i = 0; i < partial_in_metadata.size(); ++i) {
1533 auto synthesized_used_input =
new RexInput(source, i);
1534 used_inputs_owned.emplace_back(synthesized_used_input);
1535 used_inputs.insert(synthesized_used_input);
1539 return std::make_pair(used_inputs, used_inputs_owned);
1542 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1544 std::unordered_set<const RexInput*> used_inputs(logical_union->
inputCount());
1545 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1546 used_inputs_owned.reserve(logical_union->
inputCount());
1547 VLOG(3) <<
"logical_union->inputCount()=" << logical_union->
inputCount();
1548 auto const n_inputs = logical_union->
inputCount();
1549 for (
size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1550 auto input = logical_union->
getInput(nest_level);
1551 for (
size_t i = 0; i < input->size(); ++i) {
1552 used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1553 used_inputs.insert(used_inputs_owned.back().get());
1556 return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1560 const auto scan_ra =
dynamic_cast<const RelScan*
>(ra_node);
1563 table_key.
db_id = scan_ra->getCatalog().getDatabaseId();
1564 const auto td = scan_ra->getTableDescriptor();
1566 table_key.table_id = td->tableId;
1573 const std::vector<size_t>& input_permutation) {
1575 std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1576 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1577 const auto input_node_idx =
1578 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1579 const auto input_ra = data_sink_node->getInput(input_node_idx);
1583 size_t const idx =
dynamic_cast<const RelLogicalUnion*
>(ra_node) ? 0 : input_idx;
1584 const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1585 CHECK(it_ok.second);
1588 <<
" to nest level " << input_idx;
1590 return input_to_nest_level;
1593 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1596 if (
auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1598 const auto condition =
join->getCondition();
1600 auto condition_inputs = visitor.visit(condition);
1601 std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1603 return std::make_pair(condition_inputs, condition_inputs_owned);
1606 if (
auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1607 CHECK_GE(left_deep_join->inputCount(), 2u);
1608 const auto condition = left_deep_join->getInnerCondition();
1610 auto result = visitor.visit(condition);
1611 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1613 const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1614 if (outer_condition) {
1615 const auto outer_result = visitor.visit(outer_condition);
1616 result.insert(outer_result.begin(), outer_result.end());
1619 std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.
get_inputs_owned());
1620 return std::make_pair(
result, used_inputs_owned);
1623 if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1626 }
else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1634 return std::make_pair(std::unordered_set<const RexInput*>{},
1635 std::vector<std::shared_ptr<RexInput>>{});
1639 std::vector<InputDescriptor>& input_descs,
1640 std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1642 const std::unordered_set<const RexInput*>& source_used_inputs,
1643 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1645 <<
" input_col_descs_unique.size()=" << input_col_descs_unique.size()
1646 <<
" source_used_inputs.size()=" << source_used_inputs.size();
1647 for (
const auto used_input : source_used_inputs) {
1648 const auto input_ra = used_input->getSourceNode();
1650 auto col_id = used_input->getIndex();
1651 auto it = input_to_nest_level.find(input_ra);
1652 if (it != input_to_nest_level.end()) {
1653 const int nest_level = it->second;
1654 if (
auto rel_scan = dynamic_cast<const RelScan*>(input_ra)) {
1655 const auto& catalog = rel_scan->getCatalog();
1656 col_id = catalog.getColumnIdBySpi(table_key.table_id, col_id + 1);
1658 input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1659 col_id, table_key.table_id, table_key.db_id, nest_level));
1660 }
else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1661 throw std::runtime_error(
"Bushy joins not supported");
1667 std::pair<std::vector<InputDescriptor>,
1668 std::list<std::shared_ptr<const InputColDescriptor>>>
1670 const std::unordered_set<const RexInput*>& used_inputs,
1671 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1672 const std::vector<size_t>& input_permutation) {
1673 std::vector<InputDescriptor> input_descs;
1675 for (
size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1676 const auto input_node_idx =
1677 input_permutation.empty() ? input_idx : input_permutation[input_idx];
1678 auto input_ra = data_sink_node->getInput(input_node_idx);
1680 input_descs.emplace_back(table_key.db_id, table_key.table_id, input_idx);
1682 std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1684 input_col_descs_unique,
1687 input_to_nest_level);
1688 std::unordered_set<const RexInput*> join_source_used_inputs;
1689 std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1690 std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1693 input_col_descs_unique,
1695 join_source_used_inputs,
1696 input_to_nest_level);
1697 std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1698 input_col_descs_unique.begin(), input_col_descs_unique.end());
1701 input_col_descs.end(),
1702 [](std::shared_ptr<const InputColDescriptor>
const& lhs,
1703 std::shared_ptr<const InputColDescriptor>
const& rhs) {
1704 return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1706 lhs->getScanDesc().getTableKey()) <
1707 std::make_tuple(rhs->getScanDesc().getNestLevel(),
1709 rhs->getScanDesc().getTableKey());
1711 return {input_descs,
1712 std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1713 input_col_descs.end())};
1717 std::tuple<std::vector<InputDescriptor>,
1718 std::list<std::shared_ptr<const InputColDescriptor>>,
1719 std::vector<std::shared_ptr<RexInput>>>
1721 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1722 const std::vector<size_t>& input_permutation) {
1723 std::unordered_set<const RexInput*> used_inputs;
1724 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1726 VLOG(3) <<
"used_inputs.size() = " << used_inputs.size();
1727 auto input_desc_pair =
1729 return std::make_tuple(
1730 input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1738 return project->
size();
1758 const std::shared_ptr<Analyzer::Expr> expr) {
1759 const auto& ti = expr->get_type_info();
1763 auto transient_dict_ti = ti;
1766 transient_dict_ti.set_fixed_size();
1767 return expr->add_cast(transient_dict_ti);
1771 std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1772 const std::shared_ptr<Analyzer::Expr>& expr) {
1776 scalar_sources.push_back(
fold_expr(expr.get()));
1781 const std::shared_ptr<Analyzer::Expr>& input) {
1782 const auto& input_ti = input->get_type_info();
1783 if (input_ti.is_string() && input_ti.get_compression() ==
kENCODING_DICT) {
1794 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1796 VLOG(3) <<
"get_scalar_sources_size("
1798 <<
") = " << scalar_sources_size;
1799 for (
size_t i = 0; i < scalar_sources_size; ++i) {
1800 const auto scalar_rex =
scalar_at(i, ra_node);
1801 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1807 const auto scalar_expr =
1809 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1813 scalar_sources.push_back(
fold_expr(rewritten_expr.get()));
1819 return scalar_sources;
1829 size_t starting_projection_column_idx) {
1830 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1832 const auto scalar_rex =
scalar_at(i, ra_node);
1833 if (dynamic_cast<const RexRef*>(scalar_rex)) {
1839 std::shared_ptr<Analyzer::Expr> translated_expr;
1841 translated_expr = cast_to_column_type(translator.
translate(scalar_rex),
1844 colNames[i - starting_projection_column_idx]);
1846 translated_expr = translator.
translate(scalar_rex);
1849 const auto rewritten_expr =
rewrite_expr(scalar_expr.get());
1853 return scalar_sources;
1858 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1862 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1863 for (
size_t group_idx = 0; group_idx < compound->
getGroupByCount(); ++group_idx) {
1866 return groupby_exprs;
1871 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1872 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1873 for (
size_t group_idx = 0; group_idx < aggregate->
getGroupByCount(); ++group_idx) {
1876 return groupby_exprs;
1882 const auto filter_expr = filter_rex ? translator.
translate(filter_rex) :
nullptr;
1891 size_t target_expr_idx,
1892 std::shared_ptr<Analyzer::Expr>& target_expr,
1893 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1896 if (agg_expr->get_is_distinct()) {
1897 SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1899 target_exprs_type_infos.emplace(target_expr_idx, ti);
1900 target_expr = target_expr->deep_copy();
1906 target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1911 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1912 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1913 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1914 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1918 std::vector<Analyzer::Expr*> target_exprs;
1919 for (
size_t i = 0; i < compound->
size(); ++i) {
1921 const auto target_rex_agg =
dynamic_cast<const RexAgg*
>(target_rex);
1922 std::shared_ptr<Analyzer::Expr> target_expr;
1923 if (target_rex_agg) {
1928 const auto target_rex_scalar =
dynamic_cast<const RexScalar*
>(target_rex);
1929 const auto target_rex_ref =
dynamic_cast<const RexRef*
>(target_rex_scalar);
1930 if (target_rex_ref) {
1931 const auto ref_idx = target_rex_ref->
getIndex();
1933 CHECK_LE(ref_idx, groupby_exprs.size());
1934 const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1940 target_expr =
fold_expr(rewritten_expr.get());
1951 target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1954 target_exprs_owned.push_back(target_expr);
1955 target_exprs.push_back(target_expr.get());
1957 return target_exprs;
1961 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1962 std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1963 const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1964 const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1967 std::vector<Analyzer::Expr*> target_exprs;
1968 size_t group_key_idx = 1;
1969 for (
const auto& groupby_expr : groupby_exprs) {
1972 target_exprs_owned.push_back(target_expr);
1973 target_exprs.push_back(target_expr.get());
1976 for (
const auto& target_rex_agg : aggregate->
getAggExprs()) {
1980 target_expr =
fold_expr(target_expr.get());
1981 target_exprs_owned.push_back(target_expr);
1982 target_exprs.push_back(target_expr.get());
1984 return target_exprs;
1994 if (agg_expr && agg_expr->get_contains_agg()) {
2008 }
else if (
is_agg(&expr)) {
2017 const std::vector<Analyzer::Expr*>& target_exprs) {
2018 std::vector<TargetMetaInfo> targets_meta;
2019 CHECK_EQ(ra_node->size(), target_exprs.size());
2020 for (
size_t i = 0; i < ra_node->size(); ++i) {
2021 CHECK(target_exprs[i]);
2023 targets_meta.emplace_back(ra_node->getFieldName(i),
2025 target_exprs[i]->get_type_info());
2027 return targets_meta;
2033 const std::vector<Analyzer::Expr*>& target_exprs) {
2035 if (
auto const* input = dynamic_cast<RelCompound const*>(input0)) {
2037 }
else if (
auto const* input = dynamic_cast<RelProject const*>(input0)) {
2039 }
else if (
auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
2041 }
else if (
auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
2043 }
else if (
auto const* input = dynamic_cast<RelScan const*>(input0)) {
2045 }
else if (
auto const* input = dynamic_cast<RelLogicalValues const*>(input0)) {
2058 const int64_t queue_time_ms) {
2066 auto execute_update_for_node = [
this, &co, &eo_in](
const auto node,
2068 const bool is_aggregate) {
2069 auto table_descriptor = node->getModifiedTableDescriptor();
2070 CHECK(table_descriptor);
2071 if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2072 throw std::runtime_error(
2073 "UPDATE queries involving variable length columns are only supported on tables "
2074 "with the vacuum attribute set to 'delayed'");
2077 auto catalog = node->getModifiedTableCatalog();
2082 std::make_unique<UpdateTransactionParameters>(table_descriptor,
2084 node->getTargetColumns(),
2085 node->getOutputMetainfo(),
2086 node->isVarlenUpdateRequired());
2090 auto execute_update_ra_exe_unit =
2091 [
this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2096 if (dml_transaction_parameters_->tableIsTemporary()) {
2097 eo.output_columnar_hint =
true;
2104 dml_transaction_parameters_.get());
2106 CHECK(update_transaction_parameters);
2109 auto table_update_metadata =
2120 dml_transaction_parameters_->finalizeTransaction(*catalog);
2122 dml_transaction_parameters_->getTableDescriptor(),
executor_, *catalog};
2123 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2130 if (dml_transaction_parameters_->tableIsTemporary()) {
2132 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2136 auto update_transaction_params =
2138 CHECK(update_transaction_params);
2139 const auto td = update_transaction_params->getTableDescriptor();
2141 const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2142 if (update_column_names.size() > 1) {
2143 throw std::runtime_error(
2144 "Multi-column update is not yet supported for temporary tables.");
2148 catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2150 auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2154 const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2155 work_unit.exe_unit, projected_column_to_update);
2156 if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2157 throw std::runtime_error(
2158 "Variable length updates not yet supported on temporary tables.");
2160 execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2162 execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2166 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2170 execute_update_for_node(compound, work_unit, compound->isAggregate());
2171 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2175 if (project->isSimple()) {
2176 CHECK_EQ(
size_t(1), project->inputCount());
2177 const auto input_ra = project->getInput(0);
2178 if (dynamic_cast<const RelSort*>(input_ra)) {
2179 const auto& input_table =
2182 work_unit.exe_unit.scan_limit = input_table->rowCount();
2185 if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2200 throw std::runtime_error(
2201 "Update query having window function is not yet supported in distributed "
2206 computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2208 execute_update_for_node(project, work_unit,
false);
2210 throw std::runtime_error(
"Unsupported parent node for update: " +
2218 const int64_t queue_time_ms) {
2222 auto execute_delete_for_node = [
this, &co, &eo_in](
const auto node,
2224 const bool is_aggregate) {
2225 auto* table_descriptor = node->getModifiedTableDescriptor();
2226 CHECK(table_descriptor);
2227 if (!table_descriptor->hasDeletedCol) {
2228 throw std::runtime_error(
2229 "DELETE queries are only supported on tables with the vacuum attribute set to "
2233 const auto catalog = node->getModifiedTableCatalog();
2239 auto execute_delete_ra_exe_unit =
2240 [
this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2241 const auto& exe_unit,
const bool is_aggregate) {
2243 std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2246 CHECK(delete_params);
2252 eo.output_columnar_hint =
true;
2256 CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2260 auto table_update_metadata =
2274 table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2282 auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,
executor_);
2283 const auto cd = catalog->getDeletedColumn(table_descriptor);
2285 auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2288 catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2290 const auto rewritten_exe_unit =
2291 query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2292 execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2294 execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2298 if (
auto compound = dynamic_cast<const RelCompound*>(node)) {
2299 const auto work_unit =
2301 execute_delete_for_node(compound, work_unit, compound->isAggregate());
2302 }
else if (
auto project = dynamic_cast<const RelProject*>(node)) {
2305 if (project->isSimple()) {
2306 CHECK_EQ(
size_t(1), project->inputCount());
2307 const auto input_ra = project->getInput(0);
2308 if (dynamic_cast<const RelSort*>(input_ra)) {
2309 const auto& input_table =
2312 work_unit.exe_unit.scan_limit = input_table->rowCount();
2315 execute_delete_for_node(project, work_unit,
false);
2317 throw std::runtime_error(
"Unsupported parent node for delete: " +
2326 const int64_t queue_time_ms) {
2328 const auto work_unit =
2344 const int64_t queue_time_ms) {
2375 const int64_t queue_time_ms,
2376 const std::optional<size_t> previous_count) {
2382 const auto input_ra = project->
getInput(0);
2383 if (dynamic_cast<const RelSort*>(input_ra)) {
2385 const auto& input_table =
2388 work_unit.exe_unit.scan_limit =
2389 std::min(input_table->getLimit(), input_table->rowCount());
2405 const int64_t queue_time_ms) {
2412 throw std::runtime_error(
"Table functions not supported in distributed mode yet");
2415 throw std::runtime_error(
"Table function support is disabled");
2421 const auto body = table_func_work_unit.body;
2424 const auto table_infos =
2438 auto cached_resultset =
2439 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2441 if (cached_resultset) {
2442 VLOG(1) <<
"recycle table function's resultset of the root node "
2444 result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2453 table_func_work_unit.exe_unit, table_infos, co, eo),
2454 body->getOutputMetainfo()};
2458 throw std::runtime_error(
"Table function ran out of memory during execution");
2460 auto query_exec_time =
timer_stop(query_exec_time_begin);
2461 result.setQueueTime(queue_time_ms);
2462 auto resultset_ptr =
result.getDataPtr();
2463 auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2465 resultset_ptr->getBufferSizeBytes(co.device_type) <=
2468 if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2470 resultset_ptr->setExecTime(query_exec_time);
2471 resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2472 resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2474 resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2475 if (allow_auto_caching_resultset) {
2476 VLOG(1) <<
"Automatically keep table function's query resultset to recycler";
2478 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2479 table_func_work_unit.exe_unit.query_plan_dag_hash,
2480 resultset_ptr->getInputTableKeys(),
2482 resultset_ptr->getBufferSizeBytes(co.device_type),
2487 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since we do not "
2488 "support resultset recycling on distributed mode";
2490 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2491 "has union-(all) operator";
2493 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored since a query "
2494 "is either validate or explain query";
2496 VLOG(1) <<
"Query hint \'keep_table_function_result\' is ignored";
2513 std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2514 for (
const auto& element : tuple->getTuple()) {
2517 return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2521 throw std::runtime_error(
"Only columns supported in the window partition for now");
2523 return makeExpr<Analyzer::ColumnVar>(col->get_type_info(), col->getColumnKey(), 1);
2532 const int64_t queue_time_ms) {
2534 CHECK_EQ(query_infos.size(), size_t(1));
2535 if (query_infos.front().info.fragments.size() != 1) {
2536 throw std::runtime_error(
2537 "Only single fragment tables supported for window functions for now");
2542 query_infos.push_back(query_infos.front());
2551 std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2552 std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2553 sorted_partition_cache;
2554 std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2555 std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2556 window_function_context_map;
2557 std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2567 std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2568 if (partition_keys.size() >= 1) {
2569 std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2570 if (partition_keys.size() > 1) {
2571 partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2573 CHECK_EQ(partition_keys.size(), size_t(1));
2574 partition_key_tuple = partition_keys.front();
2577 partition_key_cond =
2578 makeExpr<Analyzer::BinOper>(
kBOOLEAN,
2581 partition_key_tuple,
2586 partition_key_cond ,
2588 sorted_partition_key_ref_count_map,
2594 CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2597 for (
auto& kv : window_function_context_map) {
2599 sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2600 window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2606 const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2607 std::unordered_map<
QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2608 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2610 const std::vector<InputTableInfo>& query_infos,
2613 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2614 const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2618 std::unique_ptr<WindowFunctionContext> context;
2620 if (partition_key_cond) {
2621 auto partition_cond_str = partition_key_cond->toString();
2622 auto partition_key_hash = boost::hash_value(partition_cond_str);
2623 boost::hash_combine(partition_cache_key, partition_key_hash);
2624 std::shared_ptr<HashJoin> partition_ptr;
2625 auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2626 if (cached_hash_table_it != partition_cache.end()) {
2627 partition_ptr = cached_hash_table_it->second;
2628 VLOG(1) <<
"Reuse a hash table to compute window function context (key: "
2629 << partition_cache_key <<
", partition condition: " << partition_cond_str
2635 const auto hash_table_or_err =
executor_->buildHashTableForQualifier(
2639 window_partition_type,
2645 if (!hash_table_or_err.fail_reason.empty()) {
2646 throw std::runtime_error(hash_table_or_err.fail_reason);
2649 partition_ptr = hash_table_or_err.hash_table;
2650 CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2652 VLOG(1) <<
"Put a generated hash table for computing window function context to "
2654 << partition_cache_key <<
", partition condition: " << partition_cond_str
2657 CHECK(partition_ptr);
2661 VLOG(1) <<
"Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2663 context = std::make_unique<WindowFunctionContext>(window_func,
2664 partition_cache_key,
2669 aggregate_tree_fanout);
2671 context = std::make_unique<WindowFunctionContext>(
2672 window_func, elem_count, co.
device_type, row_set_mem_owner);
2675 if (!order_keys.empty()) {
2676 auto sorted_partition_cache_key = partition_cache_key;
2677 for (
auto& order_key : order_keys) {
2678 boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2681 boost::hash_combine(sorted_partition_cache_key, collation.toString());
2683 context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2684 auto cache_key_cnt_it =
2685 sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2686 if (!cache_key_cnt_it.second) {
2687 sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2688 cache_key_cnt_it.first->second + 1;
2691 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2692 for (
const auto& order_key : order_keys) {
2693 const auto order_col =
2696 throw std::runtime_error(
"Only order by columns supported for now");
2698 auto const [column, col_elem_count] =
2701 query_infos.front().info.fragments.front(),
2709 CHECK_EQ(col_elem_count, elem_count);
2710 context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2713 if (context->getWindowFunction()->hasFraming()) {
2717 auto& window_function_expression_args = window_func->
getArgs();
2718 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2719 for (
auto& expr : window_function_expression_args) {
2720 if (
const auto arg_col_var =
2721 std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2725 query_infos.front().info.fragments.front(),
2732 CHECK_EQ(col_elem_count, elem_count);
2733 context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2744 const int64_t queue_time_ms) {
2746 const auto work_unit =
2749 work_unit, filter->
getOutputMetainfo(),
false, co, eo, render_info, queue_time_ms);
2753 std::vector<TargetMetaInfo>
const& rhs) {
2754 if (lhs.size() == rhs.size()) {
2755 for (
size_t i = 0; i < lhs.size(); ++i) {
2756 if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2774 const int64_t queue_time_ms) {
2776 if (!logical_union->
isAll()) {
2777 throw std::runtime_error(
"UNION without ALL is not supported yet.");
2782 throw std::runtime_error(
"UNION does not support subqueries with geo-columns.");
2805 for (
size_t i = 0; i < tuple_type.size(); ++i) {
2806 auto& target_meta_info = tuple_type[i];
2807 if (target_meta_info.get_type_info().get_type() ==
kNULLT) {
2813 {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2817 std::vector<TargetInfo> target_infos;
2818 for (
const auto& tuple_type_component : tuple_type) {
2821 tuple_type_component.get_type_info(),
2828 std::shared_ptr<ResultSet> rs{
2837 return {rs, tuple_type};
2844 const std::string& columnName,
2855 CHECK(dd && dd->stringDict);
2856 int32_t str_id = dd->stringDict->getOrAdd(str);
2857 if (!dd->dictIsTemp) {
2858 const auto checkpoint_ok = dd->stringDict->checkpoint();
2859 if (!checkpoint_ok) {
2860 throw std::runtime_error(
"Failed to checkpoint dictionary for column " +
2864 const bool invalid = str_id > max_valid_int_value<T>();
2865 if (invalid || str_id == inline_int_null_value<int32_t>()) {
2867 LOG(
ERROR) <<
"Could not encode string: " << str
2868 <<
", the encoded value doesn't fit in " <<
sizeof(
T) * 8
2869 <<
" bits. Will store NULL instead.";
2892 throw std::runtime_error(
"EXPLAIN not supported for ModifyTable");
2902 std::vector<TargetMetaInfo> empty_targets;
2903 return {rs, empty_targets};
2918 size_t rows_number = values_lists.size();
2923 size_t rows_per_leaf = rows_number;
2924 if (td->nShards == 0) {
2926 ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2928 auto max_number_of_rows_per_package =
2929 std::max(
size_t(1), std::min(rows_per_leaf,
size_t(64 * 1024)));
2931 std::vector<const ColumnDescriptor*> col_descriptors;
2932 std::vector<int> col_ids;
2933 std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2934 std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2935 std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2936 std::unordered_map<int, int> sequential_ids;
2938 for (
const int col_id : col_id_list) {
2941 if (cd->columnType.is_string()) {
2945 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2946 CHECK(it_ok.second);
2950 const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2952 const auto it_ok = col_buffers.emplace(
2954 std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2955 max_number_of_rows_per_package));
2956 CHECK(it_ok.second);
2962 }
else if (cd->columnType.is_geometry()) {
2964 str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2965 CHECK(it_ok.second);
2966 }
else if (cd->columnType.is_array()) {
2968 arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2969 CHECK(it_ok.second);
2971 const auto it_ok = col_buffers.emplace(
2973 std::unique_ptr<uint8_t[]>(
new uint8_t[cd->columnType.get_logical_size() *
2974 max_number_of_rows_per_package]()));
2975 CHECK(it_ok.second);
2977 col_descriptors.push_back(cd);
2978 sequential_ids[col_id] = col_ids.size();
2979 col_ids.push_back(col_id);
2983 std::vector<int> table_chunk_key_prefix{catalog.getCurrentDB().dbId, table_id};
2984 auto table_key = boost::hash_value(table_chunk_key_prefix);
2988 size_t start_row = 0;
2989 size_t rows_left = rows_number;
2990 while (rows_left != 0) {
2992 for (
const auto& kv : col_buffers) {
2993 memset(kv.second.get(), 0, max_number_of_rows_per_package);
2995 for (
auto& kv : str_col_buffers) {
2998 for (
auto& kv : arr_col_buffers) {
3002 auto package_size = std::min(rows_left, max_number_of_rows_per_package);
3007 for (
size_t row_idx = 0; row_idx < package_size; ++row_idx) {
3008 const auto& values_list = values_lists[row_idx + start_row];
3009 for (
size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
3010 CHECK(values_list.size() == col_descriptors.size());
3015 dynamic_cast<const Analyzer::UOper*
>(values_list[col_idx]->get_expr());
3021 const auto cd = col_descriptors[col_idx];
3022 auto col_datum = col_cv->get_constval();
3023 auto col_type = cd->columnType.get_type();
3024 uint8_t* col_data_bytes{
nullptr};
3025 if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
3026 (!cd->columnType.is_string() ||
3028 const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
3029 CHECK(col_data_bytes_it != col_buffers.end());
3030 col_data_bytes = col_data_bytes_it->second.get();
3034 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
3035 auto null_bool_val =
3037 col_data[row_idx] = col_cv->get_is_null() || null_bool_val
3039 : (col_datum.boolval ? 1 : 0);
3043 auto col_data =
reinterpret_cast<int8_t*
>(col_data_bytes);
3044 col_data[row_idx] = col_cv->get_is_null()
3046 : col_datum.tinyintval;
3050 auto col_data =
reinterpret_cast<int16_t*
>(col_data_bytes);
3051 col_data[row_idx] = col_cv->get_is_null()
3053 : col_datum.smallintval;
3057 auto col_data =
reinterpret_cast<int32_t*
>(col_data_bytes);
3058 col_data[row_idx] = col_cv->get_is_null()
3066 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
3067 col_data[row_idx] = col_cv->get_is_null()
3069 : col_datum.bigintval;
3073 auto col_data =
reinterpret_cast<float*
>(col_data_bytes);
3074 col_data[row_idx] = col_datum.floatval;
3078 auto col_data =
reinterpret_cast<double*
>(col_data_bytes);
3079 col_data[row_idx] = col_datum.doubleval;
3085 switch (cd->columnType.get_compression()) {
3087 str_col_buffers[col_ids[col_idx]].push_back(
3088 col_datum.stringval ? *col_datum.stringval :
"");
3091 switch (cd->columnType.get_size()) {
3094 &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3101 &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3108 &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3126 auto col_data =
reinterpret_cast<int64_t*
>(col_data_bytes);
3127 col_data[row_idx] = col_cv->get_is_null()
3129 : col_datum.bigintval;
3133 const auto is_null = col_cv->get_is_null();
3134 const auto size = cd->columnType.get_size();
3137 const auto is_point_coords =
3139 if (
is_null && !is_point_coords) {
3143 for (int8_t* p = buf + elem_ti.
get_size(); (p - buf) < size;
3145 put_null(static_cast<void*>(p), elem_ti,
"");
3147 arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf,
is_null);
3149 arr_col_buffers[col_ids[col_idx]].emplace_back(0,
nullptr,
is_null);
3153 const auto l = col_cv->get_value_list();
3154 size_t len = l.size() * elem_ti.
get_size();
3155 if (size > 0 && static_cast<size_t>(size) != len) {
3156 throw std::runtime_error(
"Array column " + cd->columnName +
" expects " +
3158 " values, " +
"received " +
3166 int32_t* p =
reinterpret_cast<int32_t*
>(buf);
3173 &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3197 str_col_buffers[col_ids[col_idx]].push_back(
3198 col_datum.stringval ? *col_datum.stringval :
"");
3205 start_row += package_size;
3206 rows_left -= package_size;
3209 insert_data.
databaseId = catalog.getCurrentDB().dbId;
3210 insert_data.
tableId = table_id;
3211 insert_data.
data.resize(col_ids.size());
3213 for (
const auto& kv : col_buffers) {
3215 p.
numbersPtr =
reinterpret_cast<int8_t*
>(kv.second.get());
3216 insert_data.
data[sequential_ids[kv.first]] = p;
3218 for (
auto& kv : str_col_buffers) {
3221 insert_data.
data[sequential_ids[kv.first]] = p;
3223 for (
auto& kv : arr_col_buffers) {
3226 insert_data.
data[sequential_ids[kv.first]] = p;
3228 insert_data.
numRows = package_size;
3239 std::vector<TargetMetaInfo> empty_targets;
3240 return {rs, empty_targets};
3246 const auto aggregate =
dynamic_cast<const RelAggregate*
>(ra);
3250 const auto compound =
dynamic_cast<const RelCompound*
>(ra);
3251 return (compound && compound->isAggregate()) ? 0 : limit;
3255 return !order_entries.empty() && order_entries.front().is_desc;
3264 const int64_t queue_time_ms) {
3267 const auto source = sort->
getInput(0);
3274 executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3277 auto& aggregated_result = it->second;
3278 auto& result_rows = aggregated_result.rs;
3279 const size_t limit = sort->
getLimit();
3280 const size_t offset = sort->
getOffset();
3281 if (limit || offset) {
3282 if (!order_entries.empty()) {
3283 result_rows->sort(order_entries, limit + offset,
executor_);
3285 result_rows->dropFirstN(offset);
3287 result_rows->keepFirstN(limit);
3297 source_work_unit.exe_unit.target_exprs,
3298 aggregated_result.targets_meta);
3307 std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3308 bool is_desc{
false};
3309 bool use_speculative_top_n_sort{
false};
3311 auto execute_sort_query = [
this,
3323 const size_t limit = sort->
getLimit();
3324 const size_t offset = sort->
getOffset();
3326 auto source_node = sort->
getInput(0);
3329 auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3333 if (
auto cached_resultset =
3334 executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3335 source_query_plan_dag)) {
3336 CHECK(cached_resultset->canUseSpeculativeTopNSort());
3337 VLOG(1) <<
"recycle resultset of the root node " << source_node->getRelNodeDagId()
3338 <<
" from resultset cache";
3340 ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3344 use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3346 source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3350 if (!source_result.getDataPtr()) {
3358 eo.allow_loop_joins,
3362 eo.with_dynamic_watchdog,
3363 eo.dynamic_watchdog_time_limit,
3364 eo.find_push_down_candidates,
3365 eo.just_calcite_explain,
3366 eo.gpu_input_mem_limit_percent,
3367 eo.allow_runtime_query_interrupt,
3368 eo.running_query_interrupt_freq,
3369 eo.pending_query_interrupt_freq,
3370 eo.optimize_cuda_block_and_grid_sizes,
3371 eo.max_join_hash_table_size,
3375 groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3377 source->getOutputMetainfo(),
3383 use_speculative_top_n_sort =
3384 source_result.
getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3386 source_result.getRows()->getQueryMemDesc());
3388 if (render_info && render_info->isInSitu()) {
3389 return source_result;
3391 if (source_result.isFilterPushDownEnabled()) {
3392 return source_result;
3394 auto rows_to_sort = source_result.getRows();
3395 if (eo.just_explain) {
3396 return {rows_to_sort, {}};
3398 if (sort->
collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3399 !use_speculative_top_n_sort) {
3400 const size_t top_n = limit == 0 ? 0 : limit + offset;
3401 rows_to_sort->sort(order_entries, top_n,
executor_);
3403 if (limit || offset) {
3405 if (offset >= rows_to_sort->rowCount()) {
3406 rows_to_sort->dropFirstN(offset);
3408 rows_to_sort->keepFirstN(limit + offset);
3411 rows_to_sort->dropFirstN(offset);
3413 rows_to_sort->keepFirstN(limit);
3417 return {rows_to_sort, source_result.getTargetsMeta()};
3421 return execute_sort_query();
3423 CHECK_EQ(
size_t(1), groupby_exprs.size());
3424 CHECK(groupby_exprs.front());
3426 return execute_sort_query();
3432 std::list<Analyzer::OrderEntry>& order_entries,
3434 const auto source = sort->
getInput(0);
3435 const size_t limit = sort->
getLimit();
3436 const size_t offset = sort->
getOffset();
3438 const size_t scan_total_limit =
3440 size_t max_groups_buffer_entry_guess{
3446 const auto& source_exe_unit = source_work_unit.exe_unit;
3449 for (
auto order_entry : order_entries) {
3451 const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3453 if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3454 throw std::runtime_error(
3455 "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3459 if (source_exe_unit.groupby_exprs.size() == 1) {
3460 if (!source_exe_unit.groupby_exprs.front()) {
3474 std::move(source_exe_unit.input_col_descs),
3475 source_exe_unit.simple_quals,
3476 source_exe_unit.quals,
3477 source_exe_unit.join_quals,
3478 source_exe_unit.groupby_exprs,
3479 source_exe_unit.target_exprs,
3480 source_exe_unit.target_exprs_original_type_infos,
3482 {sort_info.order_entries,
3486 sort_info.limit_delivered},
3488 source_exe_unit.query_hint,
3489 source_exe_unit.query_plan_dag_hash,
3490 source_exe_unit.hash_table_build_plan_dag,
3491 source_exe_unit.table_id_to_node_map,
3492 source_exe_unit.use_bump_allocator,
3493 source_exe_unit.union_all,
3494 source_exe_unit.query_state},
3496 max_groups_buffer_entry_guess,
3497 std::move(source_work_unit.query_rewriter),
3498 source_work_unit.input_permutation,
3499 source_work_unit.left_deep_join_input_sizes};
3511 CHECK(!table_infos.empty());
3512 const auto& first_table = table_infos.front();
3513 size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3514 for (
const auto& table_info : table_infos) {
3515 if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3516 max_num_groups = table_info.info.getNumTuplesUpperBound();
3519 return std::max(max_num_groups,
size_t(1));
3532 if (render_info && render_info->
isInSitu()) {
3539 for (
const auto& target_expr : ra_exe_unit.
target_exprs) {
3542 if (target_expr->get_type_info().is_varlen()) {
3546 if (
auto top_project = dynamic_cast<const RelProject*>(body)) {
3547 if (top_project->isRowwiseOutputForced()) {
3566 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
3567 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3579 return !(ra_exe_unit.
quals.empty() && ra_exe_unit.
join_quals.empty() &&
3585 const std::vector<InputTableInfo>& table_infos,
3586 const Executor* executor,
3588 std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3590 for (
size_t i = 0; i < ra_exe_unit.
target_exprs.size(); ++i) {
3596 CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3599 const auto& arg_ti = arg->get_type_info();
3605 if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3606 (arg_ti.is_string() && arg_ti.get_compression() ==
kENCODING_DICT))) {
3617 const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3618 const auto sub_bitmap_count =
3620 int64_t approx_bitmap_sz_bits{0};
3621 const auto error_rate_expr =
static_cast<Analyzer::AggExpr*
>(target_expr)->get_arg1();
3622 if (error_rate_expr) {
3623 CHECK(error_rate_expr->get_type_info().get_type() ==
kINT);
3624 auto const error_rate =
3627 CHECK_GE(error_rate->get_constval().intval, 1);
3633 arg_range.getIntMin(),
3634 approx_bitmap_sz_bits,
3639 arg_range.getIntMin(),
3644 if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3645 precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3646 auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3648 target_exprs_owned.push_back(precise_count_distinct);
3649 ra_exe_unit.
target_exprs[i] = precise_count_distinct.get();
3666 const std::vector<TargetMetaInfo>& targets_meta,
3671 const int64_t queue_time_ms,
3672 const std::optional<size_t> previous_count) {
3683 ScopeGuard clearWindowContextIfNecessary = [&]() {
3690 throw std::runtime_error(
"Window functions support is disabled");
3693 co.allow_lazy_fetch =
false;
3694 computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3696 if (!eo.just_explain && eo.find_push_down_candidates) {
3699 if (!selected_filters.empty() || eo.just_calcite_explain) {
3700 return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3703 if (render_info && render_info->
isInSitu()) {
3704 co.allow_lazy_fetch =
false;
3706 const auto body = work_unit.
body;
3709 VLOG(3) <<
"body->getId()=" << body->getId()
3711 <<
" it==leaf_results_.end()=" << (it ==
leaf_results_.end());
3715 auto& aggregated_result = it->second;
3716 auto& result_rows = aggregated_result.rs;
3718 body->setOutputMetainfo(aggregated_result.targets_meta);
3732 auto candidate =
query_dag_->getQueryHint(body);
3734 ra_exe_unit.query_hint = *candidate;
3738 const auto& query_hints = ra_exe_unit.query_hint;
3740 orig_block_size =
executor_->blockSize(),
3741 orig_grid_size =
executor_->gridSize()]() {
3742 if (
executor_->getDataMgr()->getCudaMgr()) {
3744 if (orig_block_size) {
3745 executor_->setBlockSize(orig_block_size);
3751 if (orig_grid_size) {
3762 if (!
executor_->getDataMgr()->getCudaMgr()) {
3763 VLOG(1) <<
"Skip CUDA grid size query hint: cannot detect CUDA device";
3765 const auto num_sms =
executor_->cudaMgr()->getMinNumMPsForAllDevices();
3766 const auto new_grid_size =
static_cast<unsigned>(
3767 std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3768 const auto default_grid_size =
executor_->gridSize();
3769 if (new_grid_size != default_grid_size) {
3770 VLOG(1) <<
"Change CUDA grid size: " << default_grid_size
3771 <<
" (default_grid_size) -> " << new_grid_size <<
" (# SMs * "
3772 << query_hints.cuda_grid_size_multiplier <<
")";
3776 VLOG(1) <<
"Skip CUDA grid size query hint: invalid grid size";
3781 if (!
executor_->getDataMgr()->getCudaMgr()) {
3782 VLOG(1) <<
"Skip CUDA block size query hint: cannot detect CUDA device";
3784 int cuda_block_size = query_hints.cuda_block_size;
3786 if (cuda_block_size >= warp_size) {
3787 cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3788 VLOG(1) <<
"Change CUDA block size w.r.t warp size (" << warp_size
3789 <<
"): " <<
executor_->blockSize() <<
" -> " << cuda_block_size;
3791 VLOG(1) <<
"Change CUDA block size: " <<
executor_->blockSize() <<
" -> "
3794 executor_->setBlockSize(cuda_block_size);
3801 CHECK_EQ(table_infos.size(), size_t(1));
3802 CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3803 max_groups_buffer_entry_guess =
3804 table_infos.front().info.fragments.front().getNumTuples();
3805 ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3808 ra_exe_unit.scan_limit = *previous_count;
3812 ra_exe_unit.scan_limit = 0;
3813 ra_exe_unit.use_bump_allocator =
true;
3815 ra_exe_unit.scan_limit = 0;
3816 }
else if (!eo.just_explain) {
3818 if (filter_count_all) {
3819 ra_exe_unit.scan_limit = std::max(*filter_count_all,
size_t(1));
3830 VLOG(1) <<
"Using columnar layout for projection as output size of "
3831 << ra_exe_unit.scan_limit <<
" rows exceeds threshold of "
3833 eo.output_columnar_hint =
true;
3836 eo.output_columnar_hint =
false;
3847 auto execute_and_handle_errors = [&](
const auto max_groups_buffer_entry_guess_in,
3848 const bool has_cardinality_estimation,
3853 auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3855 return {
executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3862 has_cardinality_estimation,
3871 {ra_exe_unit, work_unit.
body, local_groups_buffer_entry_guess},
3883 for (
const auto& table_info : table_infos) {
3884 const auto db_id = table_info.table_key.db_id;
3887 if (td && (td->isTemporaryTable() || td->isView)) {
3888 use_resultset_cache =
false;
3889 if (eo.keep_result) {
3890 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has either "
3891 "temporary table or view";
3899 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3900 auto card = cached_cardinality.second;
3901 if (cached_cardinality.first && card >= 0) {
3902 result = execute_and_handle_errors(
3905 result = execute_and_handle_errors(
3906 max_groups_buffer_entry_guess,
3912 auto cached_cardinality =
executor_->getCachedCardinality(cache_key);
3913 auto card = cached_cardinality.second;
3914 if (cached_cardinality.first && card >= 0) {
3915 result = execute_and_handle_errors(card,
true,
true);
3917 const auto ndv_groups_estimation =
3919 const auto estimated_groups_buffer_entry_guess =
3920 ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3923 CHECK_GT(estimated_groups_buffer_entry_guess,
size_t(0));
3924 result = execute_and_handle_errors(
3925 estimated_groups_buffer_entry_guess,
true,
true);
3926 if (!(eo.just_validate || eo.just_explain)) {
3927 executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3932 result.setQueueTime(queue_time_ms);
3937 return {std::make_shared<ResultSet>(
3941 ?
executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3947 for (
auto& target_info :
result.getTargetsMeta()) {
3948 if (target_info.get_type_info().is_string() &&
3949 !target_info.get_type_info().is_dict_encoded_string()) {
3951 use_resultset_cache =
false;
3952 if (eo.keep_result) {
3953 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has non-encoded "
3954 "string column projection";
3960 auto allow_auto_caching_resultset =
3963 if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset) &&
3965 auto query_exec_time =
timer_stop(query_exec_time_begin);
3966 res->setExecTime(query_exec_time);
3967 res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3968 res->setTargetMetaInfo(body->getOutputMetainfo());
3970 res->setInputTableKeys(std::move(input_table_keys));
3971 if (allow_auto_caching_resultset) {
3972 VLOG(1) <<
"Automatically keep query resultset to recycler";
3974 res->setUseSpeculativeTopNSort(
3976 executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3977 ra_exe_unit.query_plan_dag_hash,
3978 res->getInputTableKeys(),
3980 res->getBufferSizeBytes(co.device_type),
3983 if (eo.keep_result) {
3985 VLOG(1) <<
"Query hint \'keep_result\' is ignored since we do not support "
3986 "resultset recycling on distributed mode";
3988 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query has union-(all) "
3990 }
else if (render_info && render_info->
isInSitu()) {
3991 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is classified as "
3992 "a in-situ rendering query";
3994 VLOG(1) <<
"Query hint \'keep_result\' is ignored since a query is either "
3995 "validate or explain query";
3997 VLOG(1) <<
"Query hint \'keep_result\' is ignored";
4015 const auto count_all_exe_unit =
4036 }
catch (
const std::exception& e) {
4037 LOG(
WARNING) <<
"Failed to run pre-flight filtered count with error " << e.what();
4038 return std::nullopt;
4040 const auto count_row = count_all_result->getNextRow(
false,
false);
4041 CHECK_EQ(
size_t(1), count_row.size());
4042 const auto& count_tv = count_row.front();
4043 const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4044 CHECK(count_scalar_tv);
4045 const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4048 auto count_upper_bound =
static_cast<size_t>(*count_ptr);
4049 return std::max(count_upper_bound,
size_t(1));
4053 const auto& ra_exe_unit = work_unit.
exe_unit;
4054 if (ra_exe_unit.input_descs.size() != 1) {
4057 const auto& table_desc = ra_exe_unit.
input_descs.front();
4061 const auto& table_key = table_desc.getTableKey();
4062 for (
const auto& simple_qual : ra_exe_unit.simple_quals) {
4063 const auto comp_expr =
4065 if (!comp_expr || comp_expr->get_optype() !=
kEQ) {
4070 if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4073 const auto rhs = comp_expr->get_right_operand();
4079 {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4080 if (cd->isVirtualCol) {
4090 const std::vector<TargetMetaInfo>& targets_meta,
4095 const bool was_multifrag_kernel_launch,
4096 const int64_t queue_time_ms) {
4101 auto ra_exe_unit_in = work_unit.
exe_unit;
4114 auto eo_no_multifrag = eo;
4116 eo_no_multifrag.allow_multifrag =
false;
4117 eo_no_multifrag.find_push_down_candidates =
false;
4118 if (was_multifrag_kernel_launch) {
4122 LOG(
WARNING) <<
"Multifrag query ran out of memory, retrying with multifragment "
4123 "kernels disabled.";
4137 result.setQueueTime(queue_time_ms);
4140 LOG(
WARNING) <<
"Kernel per fragment query ran out of memory, retrying on CPU.";
4154 VLOG(1) <<
"Resetting max groups buffer entry guess.";
4155 max_groups_buffer_entry_guess = 0;
4157 int iteration_ctr = -1;
4180 CHECK(max_groups_buffer_entry_guess);
4184 throw std::runtime_error(
"Query ran out of output slots in the result");
4186 max_groups_buffer_entry_guess *= 2;
4187 LOG(
WARNING) <<
"Query ran out of slots in the output buffer, retrying with max "
4188 "groups buffer entry "
4190 << max_groups_buffer_entry_guess;
4196 result.setQueueTime(queue_time_ms);
4203 LOG(
ERROR) <<
"Query execution failed with error "
4209 LOG(
INFO) <<
"Query ran out of GPU memory, attempting punt to CPU";
4211 throw std::runtime_error(
4212 "Query ran out of GPU memory, unable to automatically retry on CPU");
4221 const char* code{
nullptr};
4222 const char* description{
nullptr};
4227 switch (error_code) {
4229 return {
"ERR_DIV_BY_ZERO",
"Division by zero"};
4231 return {
"ERR_OUT_OF_GPU_MEM",
4233 "Query couldn't keep the entire working set of columns in GPU memory"};
4235 return {
"ERR_UNSUPPORTED_SELF_JOIN",
"Self joins not supported yet"};
4237 return {
"ERR_OUT_OF_CPU_MEM",
"Not enough host memory to execute the query"};
4239 return {
"ERR_OVERFLOW_OR_UNDERFLOW",
"Overflow or underflow"};
4241 return {
"ERR_OUT_OF_TIME",
"Query execution has exceeded the time limit"};
4243 return {
"ERR_INTERRUPTED",
"Query execution has been interrupted"};
4245 return {
"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4246 "Columnar conversion not supported for variable length types"};
4248 return {
"ERR_TOO_MANY_LITERALS",
"Too many literals in the query"};
4250 return {
"ERR_STRING_CONST_IN_RESULTSET",
4252 "NONE ENCODED String types are not supported as input result set."};
4254 return {
"ERR_OUT_OF_RENDER_MEM",
4256 "Insufficient GPU memory for query results in render output buffer "
4257 "sized by render-mem-bytes"};
4259 return {
"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4260 "Streaming-Top-N not supported in Render Query"};
4262 return {
"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4263 "Multiple distinct values encountered"};
4265 return {
"ERR_GEOS",
"ERR_GEOS"};
4267 return {
"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4269 "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4271 return {
nullptr,
nullptr};
4278 if (error_code < 0) {
4279 return "Ran out of slots in the query output buffer";
4283 if (errorInfo.code) {
4284 return errorInfo.code +
": "s + errorInfo.description;
4292 VLOG(1) <<
"Running post execution callback.";
4293 (*post_execution_callback_)();
4300 const auto compound =
dynamic_cast<const RelCompound*
>(node);
4304 const auto project =
dynamic_cast<const RelProject*
>(node);
4308 const auto aggregate =
dynamic_cast<const RelAggregate*
>(node);
4312 const auto filter =
dynamic_cast<const RelFilter*
>(node);
4316 LOG(
FATAL) <<
"Unhandled node type: "
4325 if (
auto join = dynamic_cast<const RelJoin*>(sink)) {
4326 return join->getJoinType();
4328 if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4336 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4337 if (!condition || condition->getOperator() !=
kOR || condition->size() != 2) {
4340 const auto equi_join_condition =
4341 dynamic_cast<const RexOperator*
>(condition->getOperand(0));
4342 if (!equi_join_condition || equi_join_condition->getOperator() !=
kEQ) {
4345 const auto both_are_null_condition =
4346 dynamic_cast<const RexOperator*
>(condition->getOperand(1));
4347 if (!both_are_null_condition || both_are_null_condition->getOperator() !=
kAND ||
4348 both_are_null_condition->size() != 2) {
4351 const auto lhs_is_null =
4352 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(0));
4353 const auto rhs_is_null =
4354 dynamic_cast<const RexOperator*
>(both_are_null_condition->getOperand(1));
4355 if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() !=
kISNULL ||
4356 rhs_is_null->getOperator() !=
kISNULL) {
4359 CHECK_EQ(
size_t(1), lhs_is_null->size());
4360 CHECK_EQ(
size_t(1), rhs_is_null->size());
4361 CHECK_EQ(
size_t(2), equi_join_condition->size());
4362 const auto eq_lhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(0));
4363 const auto eq_rhs =
dynamic_cast<const RexInput*
>(equi_join_condition->getOperand(1));
4364 const auto is_null_lhs =
dynamic_cast<const RexInput*
>(lhs_is_null->getOperand(0));
4365 const auto is_null_rhs =
dynamic_cast<const RexInput*
>(rhs_is_null->getOperand(0));
4366 if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4369 std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4370 if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4372 auto lhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(0));
4373 auto rhs_op_copy = deep_copy_visitor.
visit(equi_join_condition->getOperand(1));
4374 eq_operands.emplace_back(lhs_op_copy.release());
4375 eq_operands.emplace_back(rhs_op_copy.release());
4376 return boost::make_unique<const RexOperator>(
4377 kBW_EQ, eq_operands, equi_join_condition->getType());
4384 const auto condition =
dynamic_cast<const RexOperator*
>(scalar);
4385 if (condition && condition->getOperator() ==
kAND) {
4386 CHECK_GE(condition->size(), size_t(2));
4391 for (
size_t i = 1; i < condition->size(); ++i) {
4392 std::vector<std::unique_ptr<const RexScalar>> and_operands;
4393 and_operands.emplace_back(std::move(acc));
4396 boost::make_unique<const RexOperator>(
kAND, and_operands, condition->getType());
4406 for (
size_t nesting_level = 1; nesting_level <= left_deep_join->
inputCount() - 1;
4411 auto cur_level_join_type = left_deep_join->
getJoinType(nesting_level);
4413 join_types[nesting_level - 1] = cur_level_join_type;
4421 std::vector<InputDescriptor>& input_descs,
4422 std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4424 std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4426 const std::vector<InputTableInfo>& query_infos,
4427 const Executor* executor) {
4433 for (
const auto& table_info : query_infos) {
4434 if (table_info.table_key.table_id < 0) {
4443 const auto input_permutation =
4446 std::tie(input_descs, input_col_descs, std::ignore) =
4448 return input_permutation;
4453 std::vector<size_t> input_sizes;
4454 for (
size_t i = 0; i < left_deep_join->
inputCount(); ++i) {
4456 input_sizes.push_back(inputs.size());
4462 const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4463 std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4464 for (
const auto& qual : quals) {
4466 rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4468 return rewritten_quals;
4477 std::vector<InputDescriptor> input_descs;
4478 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4480 std::tie(input_descs, input_col_descs, std::ignore) =
4485 const auto left_deep_join =
4490 std::vector<size_t> input_permutation;
4491 std::vector<size_t> left_deep_join_input_sizes;
4492 std::optional<unsigned> left_deep_tree_id;
4493 if (left_deep_join) {
4494 left_deep_tree_id = left_deep_join->getId();
4497 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4499 std::find(join_types.begin(), join_types.end(),
JoinType::LEFT) ==
4503 left_deep_join_quals,
4504 input_to_nest_level,
4509 std::tie(input_descs, input_col_descs, std::ignore) =
4510 get_input_desc(compound, input_to_nest_level, input_permutation);
4512 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4517 const auto scalar_sources =
4521 std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4523 target_exprs_type_infos,
4532 auto candidate =
query_dag_->getQueryHint(compound);
4534 query_hint = *candidate;
4542 left_deep_join_quals,
4545 target_exprs_type_infos,
4556 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4557 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4558 const auto targets_meta =
get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4561 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4562 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4563 rewritten_exe_unit.join_quals);
4567 compound, left_deep_tree_id, left_deep_trees_info,
executor_);
4568 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4569 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4571 return {rewritten_exe_unit,
4574 std::move(query_rewriter),
4576 left_deep_join_input_sizes};
4582 const auto left_deep_join =
4586 return std::make_shared<RelAlgTranslator>(
4594 const auto bin_oper =
dynamic_cast<const RexOperator*
>(qual_expr);
4595 if (!bin_oper || bin_oper->getOperator() !=
kAND) {
4598 CHECK_GE(bin_oper->size(), size_t(2));
4600 for (
size_t i = 1; i < bin_oper->size(); ++i) {
4602 lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4608 const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4610 CHECK(!factors.empty());
4611 auto acc = factors.front();
4612 for (
size_t i = 1; i < factors.size(); ++i) {
4618 template <
class QualsList>
4620 const std::shared_ptr<Analyzer::Expr>& needle) {
4621 for (
const auto& qual : haystack) {
4622 if (*qual == *needle) {
4633 const std::shared_ptr<Analyzer::Expr>& expr) {
4635 CHECK_GE(expr_terms.size(), size_t(1));
4636 const auto& first_term = expr_terms.front();
4638 std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4641 for (
const auto& first_term_factor : first_term_factors.quals) {
4643 expr_terms.size() > 1;
4644 for (
size_t i = 1; i < expr_terms.size(); ++i) {
4652 common_factors.push_back(first_term_factor);
4655 if (common_factors.empty()) {
4659 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4660 for (
const auto& term : expr_terms) {
4662 std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4663 term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4664 for (
const auto& qual : term_cf.quals) {
4666 remaining_quals.push_back(qual);
4669 if (!remaining_quals.empty()) {
4675 if (remaining_terms.empty()) {
4686 const std::vector<JoinType>& join_types,
4687 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4688 const bool just_explain)
const {
4692 std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4693 for (
const auto rex_condition_component : rex_condition_cf) {
4696 translator.
translate(bw_equals ? bw_equals.get() : rex_condition_component));
4699 auto append_folded_cf_quals = [&join_condition_quals](
const auto& cf_quals) {
4700 for (
const auto& cf_qual : cf_quals) {
4701 join_condition_quals.emplace_back(
fold_expr(cf_qual.get()));
4705 append_folded_cf_quals(join_condition_cf.quals);
4706 append_folded_cf_quals(join_condition_cf.simple_quals);
4716 const std::vector<InputDescriptor>& input_descs,
4717 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4718 const bool just_explain) {
4724 std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4725 for (
size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4727 if (outer_condition) {
4728 result[rte_idx - 1].quals =
4729 makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4730 CHECK_LE(rte_idx, join_types.size());
4735 for (
const auto& qual : join_condition_quals) {
4736 if (visited_quals.count(qual)) {
4739 const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4740 if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4741 const auto it_ok = visited_quals.emplace(qual);
4742 CHECK(it_ok.second);
4743 result[rte_idx - 1].quals.push_back(qual);
4746 CHECK_LE(rte_idx, join_types.size());
4750 result[rte_idx - 1].type = join_types[rte_idx - 1];
4759 const size_t nest_level,
4760 const std::vector<TargetMetaInfo>& in_metainfo,
4761 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4764 const auto input = ra_node->
getInput(nest_level);
4765 const auto it_rte_idx = input_to_nest_level.find(input);
4766 CHECK(it_rte_idx != input_to_nest_level.end());
4767 const int rte_idx = it_rte_idx->second;
4769 std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4770 const auto scan_ra =
dynamic_cast<const RelScan*
>(input);
4772 for (
const auto& input_meta : in_metainfo) {
4773 inputs.push_back(std::make_shared<Analyzer::ColumnVar>(
4774 input_meta.get_type_info(),
4783 std::vector<std::shared_ptr<Analyzer::Expr>>
const& input) {
4784 std::vector<Analyzer::Expr*> output(input.size());
4785 auto const raw_ptr = [](
auto& shared_ptr) {
return shared_ptr.get(); };
4786 std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4795 const bool just_explain) {
4796 std::vector<InputDescriptor> input_descs;
4797 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4798 std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4800 std::tie(input_descs, input_col_descs, used_inputs_owned) =
4807 const auto source = aggregate->
getInput(0);
4809 const auto scalar_sources =
4812 std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4814 target_exprs_type_infos,
4826 auto candidate =
query_dag_->getQueryHint(aggregate);
4828 query_hint = *candidate;
4840 target_exprs_type_infos,
4846 join_info.hash_table_plan_dag,
4847 join_info.table_id_to_node_map,
4860 std::vector<InputDescriptor> input_descs;
4861 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4863 std::tie(input_descs, input_col_descs, std::ignore) =
4867 const auto left_deep_join =
4872 std::vector<size_t> input_permutation;
4873 std::vector<size_t> left_deep_join_input_sizes;
4874 std::optional<unsigned> left_deep_tree_id;
4875 if (left_deep_join) {
4876 left_deep_tree_id = left_deep_join->getId();
4880 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4884 left_deep_join_quals,
4885 input_to_nest_level,
4890 std::tie(input_descs, input_col_descs, std::ignore) =
4893 left_deep_join, input_descs, input_to_nest_level, eo.
just_explain);
4899 const auto target_exprs_owned =
4907 auto candidate =
query_dag_->getQueryHint(project);
4909 query_hint = *candidate;
4916 left_deep_join_quals,
4930 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
4931 auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4932 const auto targets_meta =
get_targets_meta(project, rewritten_exe_unit.target_exprs);
4935 if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4936 left_deep_trees_info.emplace(left_deep_tree_id.value(),
4937 rewritten_exe_unit.join_quals);
4941 project, left_deep_tree_id, left_deep_trees_info,
executor_);
4942 rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4943 rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4945 return {rewritten_exe_unit,
4948 std::move(query_rewriter),
4950 left_deep_join_input_sizes};
4959 const int negative_node_id = -input_node->
getId();
4961 if (
auto rel_scan = dynamic_cast<const RelScan*>(input_node)) {
4962 db_id = rel_scan->getCatalog().getDatabaseId();
4964 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4965 target_exprs.reserve(tmis.size());
4966 for (
size_t i = 0; i < tmis.size(); ++i) {
4967 target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4968 tmis[i].get_type_info(),
4972 return target_exprs;
4981 std::vector<InputDescriptor> input_descs;
4982 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4985 std::tie(input_descs, input_col_descs, std::ignore) =
4988 auto const max_num_tuples =
4992 [](
auto max,
auto const& query_info) {
4993 return std::max(max, query_info.info.getNumTuples());
4996 VLOG(3) <<
"input_to_nest_level.size()=" << input_to_nest_level.size() <<
" Pairs are:";
4997 for (
auto& pair : input_to_nest_level) {
4999 << pair.second <<
')';
5004 std::vector<Analyzer::Expr*> target_exprs_pair[2];
5005 for (
unsigned i = 0; i < 2; ++i) {
5007 CHECK(!input_exprs_owned.empty())
5008 <<
"No metainfo found for input node(" << i <<
") "
5010 VLOG(3) <<
"i(" << i <<
") input_exprs_owned.size()=" << input_exprs_owned.size();
5011 for (
auto& input_expr : input_exprs_owned) {
5012 VLOG(3) <<
" " << input_expr->toString();
5020 <<
" target_exprs.size()=" << target_exprs_pair[0].size()
5021 <<
" max_num_tuples=" << max_num_tuples;
5029 target_exprs_pair[0],
5039 logical_union->
isAll(),
5041 target_exprs_pair[1]};
5042 auto query_rewriter = std::make_unique<QueryRewriter>(query_infos,
executor_);
5043 const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5046 if (
auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5049 }
else if (
auto const* node = dynamic_cast<const RelProject*>(input0)) {
5052 }
else if (
auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5055 }
else if (
auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5058 }
else if (
auto const* node = dynamic_cast<const RelScan*>(input0)) {
5061 }
else if (
auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5064 }
else if (
auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5067 }
else if (dynamic_cast<const RelSort*>(input0)) {
5068 throw QueryNotSupported(
"LIMIT and OFFSET are not currently supported with UNION.");
5073 VLOG(3) <<
"logical_union->getOutputMetainfo()="
5075 <<
" rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5076 << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5078 return {rewritten_exe_unit,
5081 std::move(query_rewriter)};
5086 const bool just_explain,
5087 const bool is_gpu) {
5088 std::vector<InputDescriptor> input_descs;
5089 std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5091 std::tie(input_descs, input_col_descs, std::ignore) =
5101 const auto table_function_impl_and_type_infos = [=]() {
5107 LOG(
WARNING) <<
"createTableFunctionWorkUnit[GPU]: " << e.what()
5109 <<
" step to run on CPU.";
5117 LOG(
WARNING) <<
"createTableFunctionWorkUnit[CPU]: " << e.what();
5122 const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5123 const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5124 size_t output_row_sizing_param = 0;
5125 if (table_function_impl
5126 .hasUserSpecifiedOutputSizeParameter()) {
5127 const auto parameter_index =
5128 table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5129 CHECK_GT(parameter_index,
size_t(0));
5131 const auto parameter_expr =
5133 const auto parameter_expr_literal =
dynamic_cast<const RexLiteral*
>(parameter_expr);
5134 if (!parameter_expr_literal) {
5135 throw std::runtime_error(
5136 "Provided output buffer sizing parameter is not a literal. Only literal "
5137 "values are supported with output buffer sizing configured table "
5140 int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5141 if (literal_val < 0) {
5142 throw std::runtime_error(
"Provided output sizing parameter " +
5144 " must be positive integer.");
5146 output_row_sizing_param =
static_cast<size_t>(literal_val);
5149 output_row_sizing_param = 1;
5152 makeExpr<Analyzer::Constant>(
kINT,
false, d);
5154 input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5155 DEFAULT_ROW_MULTIPLIER_EXPR);
5157 }
else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5158 output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5163 std::vector<Analyzer::ColumnVar*> input_col_exprs;
5164 size_t input_index = 0;
5165 size_t arg_index = 0;
5166 const auto table_func_args = table_function_impl.getInputArgs();
5167 CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5168 for (
const auto& ti : table_function_type_infos) {
5169 if (ti.is_column_list()) {
5170 for (
int i = 0; i < ti.get_dimension(); i++) {
5171 auto& input_expr = input_exprs_owned[input_index];
5177 auto type_info = input_expr->get_type_info();
5178 if (ti.is_column_array()) {
5180 type_info.set_subtype(type_info.get_subtype());
5182 type_info.set_subtype(type_info.get_type());
5184 type_info.set_type(ti.get_type());
5185 type_info.set_dimension(ti.get_dimension());
5186 input_expr->set_type_info(type_info);
5188 input_col_exprs.push_back(col_var);
5191 }
else if (ti.is_column()) {
5192 auto& input_expr = input_exprs_owned[input_index];
5197 auto type_info = input_expr->get_type_info();
5198 if (ti.is_column_array()) {
5200 type_info.set_subtype(type_info.get_subtype());
5202 type_info.set_subtype(type_info.get_type());
5204 type_info.set_type(ti.get_type());
5205 input_expr->set_type_info(type_info);
5206 input_col_exprs.push_back(col_var);
5209 auto input_expr = input_exprs_owned[input_index];
5211 if (ext_func_arg_ti != input_expr->get_type_info()) {
5212 input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5219 std::vector<Analyzer::Expr*> table_func_outputs;
5220 constexpr int32_t transient_pos{-1};
5221 for (
size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5222 auto ti = table_function_impl.getOutputSQLType(i);
5223 if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5224 auto p = table_function_impl.getInputID(i);
5226 int32_t input_pos = p.first;
5227 if (input_pos == transient_pos) {
5233 for (
int j = 0; j < input_pos; j++) {
5234 const auto ti = table_function_type_infos[j];
5235 offset += ti.is_column_list() ? ti.get_dimension() : 1;
5237 input_pos = offset + p.second;
5239 CHECK_LT(input_pos, input_exprs_owned.size());
5240 const auto& dict_key =
5241 input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5242 ti.set_comp_param(dict_key.dict_id);
5243 ti.setStringDictKey(dict_key);
5257 output_row_sizing_param,
5258 table_function_impl};
5261 return {exe_unit, rel_table_func};
5266 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5269 const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5270 const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5271 std::vector<TargetMetaInfo> in_metainfo;
5272 std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5274 auto input_it = inputs_owned.begin();
5275 for (
size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5276 const auto source = data_sink_node->getInput(nest_level);
5277 const auto scan_source =
dynamic_cast<const RelScan*
>(source);
5279 CHECK(source->getOutputMetainfo().empty());
5280 std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5281 for (
size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5282 scalar_sources_owned.push_back(translator.
translate(input_it->get()));
5284 const auto source_metadata =
5287 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5289 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5291 const auto& source_metadata = source->getOutputMetainfo();
5292 input_it += source_metadata.size();
5294 in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5296 data_sink_node, nest_level, source_metadata, input_to_nest_level);
5298 exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5301 return std::make_pair(in_metainfo, exprs_owned);