OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
21 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RelAlgDag.h"
38 #include "QueryEngine/RexVisitor.h"
43 #include "Shared/measure.h"
44 #include "Shared/misc.h"
45 #include "Shared/shard_key.h"
46 
47 #include <boost/algorithm/cxx11/any_of.hpp>
48 #include <boost/range/adaptor/reversed.hpp>
49 
50 #include <algorithm>
51 #include <functional>
52 #include <numeric>
53 
55 bool g_enable_interop{false};
56 bool g_enable_union{true}; // DEPRECATED
61 
62 extern bool g_enable_watchdog;
65 extern bool g_enable_bump_allocator;
67 extern bool g_enable_system_tables;
68 
69 namespace {
70 
71 bool node_is_aggregate(const RelAlgNode* ra) {
72  const auto compound = dynamic_cast<const RelCompound*>(ra);
73  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
74  return ((compound && compound->isAggregate()) || aggregate);
75 }
76 
77 std::unordered_set<PhysicalInput> get_physical_inputs_with_spi_col_id(
78  const RelAlgNode* ra) {
79  const auto phys_inputs = get_physical_inputs(ra);
80  std::unordered_set<PhysicalInput> phys_inputs2;
81  for (auto& phi : phys_inputs) {
82  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(phi.db_id);
83  CHECK(catalog);
84  phys_inputs2.insert(PhysicalInput{
85  catalog->getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id, phi.db_id});
86  }
87  return phys_inputs2;
88 }
89 
90 void set_parallelism_hints(const RelAlgNode& ra_node) {
91  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
92  parallelism_hints_per_table;
93  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
94  const auto table_id = physical_input.table_id;
95  const auto catalog =
97  CHECK(catalog);
98  const auto table = catalog->getMetadataForTable(table_id, true);
99  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
100  !table->is_in_memory_system_table) {
101  const auto col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
102  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
103  const auto foreign_table = catalog->getForeignTable(table_id);
104  for (const auto& fragment :
105  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
106  Chunk_NS::Chunk chunk{col_desc};
107  ChunkKey chunk_key = {
108  physical_input.db_id, table_id, col_id, fragment.fragmentId};
109 
110  // Parallelism hints should not include fragments that are not mapped to the
111  // current node, otherwise we will try to prefetch them and run into trouble.
113  continue;
114  }
115 
116  // do not include chunk hints that are in CPU memory
117  if (!chunk.isChunkOnDevice(
118  &catalog->getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
119  parallelism_hints_per_table[{physical_input.db_id, table_id}].insert(
121  fragment.fragmentId});
122  }
123  }
124  }
125  }
126  if (!parallelism_hints_per_table.empty()) {
127  auto foreign_storage_mgr = Catalog_Namespace::SysCatalog::instance()
128  .getDataMgr()
131  CHECK(foreign_storage_mgr);
132  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
133  }
134 }
135 
137  for (const auto [col_id, table_id, db_id] : get_physical_inputs(&ra_node)) {
138  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
139  CHECK(catalog);
140  const auto table = catalog->getMetadataForTable(table_id, false);
141  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
142  const auto spi_col_id = catalog->getColumnIdBySpi(table_id, col_id);
143  foreign_storage::populate_string_dictionary(table_id, spi_col_id, db_id);
144  }
145  }
146 }
147 
149  // Iterate through ra_node inputs for types that need to be loaded pre-execution
150  // If they do not have valid metadata, load them into CPU memory to generate
151  // the metadata and leave them ready to be used by the query
152  set_parallelism_hints(ra_node);
154 }
155 
157  const CompilationOptions& co) {
159  const auto info_schema_catalog =
161  CHECK(info_schema_catalog);
162  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
163  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
164  if (info_schema_catalog->getDatabaseId() != physical_input.db_id) {
165  continue;
166  }
167  const auto table_id = physical_input.table_id;
168  const auto table = info_schema_catalog->getMetadataForTable(table_id, false);
169  CHECK(table);
170  if (table->is_in_memory_system_table) {
171  const auto column_id =
172  info_schema_catalog->getColumnIdBySpi(table_id, physical_input.col_id);
173  system_table_columns_by_table_id[table_id].emplace_back(column_id);
174  }
175  }
176  // Execute on CPU for queries involving system tables
177  if (!system_table_columns_by_table_id.empty() &&
179  throw QueryMustRunOnCpu();
180  }
181 
182  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
183  // Clear any previously cached data, since system tables depend on point in
184  // time data snapshots.
185  info_schema_catalog->getDataMgr().deleteChunksWithPrefix(
186  ChunkKey{info_schema_catalog->getDatabaseId(), table_id},
188 
189  // TODO(Misiu): This prefetching can be removed if we can add support for
190  // ExpressionRanges to reduce invalid with valid ranges (right now prefetching
191  // causes us to fetch the chunks twice). Right now if we do not prefetch (i.e. if
192  // we remove the code below) some nodes will return valid ranges and others will
193  // return unknown because they only use placeholder metadata and the LeafAggregator
194  // has no idea how to reduce the two.
195  const auto td = info_schema_catalog->getMetadataForTable(table_id);
196  CHECK(td);
197  CHECK(td->fragmenter);
198  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
199  CHECK_LE(fragment_count, static_cast<size_t>(1))
200  << "In-memory system tables are expected to have a single fragment.";
201  if (fragment_count > 0) {
202  for (auto column_id : column_ids) {
203  // Prefetch system table chunks in order to force chunk statistics metadata
204  // computation.
205  const auto cd = info_schema_catalog->getMetadataForColumn(table_id, column_id);
206  const ChunkKey chunk_key{
207  info_schema_catalog->getDatabaseId(), table_id, column_id, 0};
209  &(info_schema_catalog->getDataMgr()),
210  chunk_key,
212  0,
213  0,
214  0);
215  }
216  }
217  }
218  }
219 }
220 
223 }
224 } // namespace
225 
227  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
228  const std::vector<TargetMetaInfo>& targets_meta) {
229  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
230  render_info.targets.clear();
231  for (size_t i = 0; i < targets_meta.size(); ++i) {
232  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
233  targets_meta[i].get_resname(),
234  work_unit_target_exprs[i]->get_shared_ptr(),
235  false));
236  }
237 }
238 
240  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
241 }
242 
243 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
244  public:
245  std::vector<unsigned> visitLeftDeepInnerJoin(
246  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
247  return {left_deep_join_tree->getId()};
248  }
249 
250  protected:
251  std::vector<unsigned> aggregateResult(
252  const std::vector<unsigned>& aggregate,
253  const std::vector<unsigned>& next_result) const override {
254  auto result = aggregate;
255  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
256  return result;
257  }
258 };
259 
265  const size_t text_encoding_casts)
266  : text_decoding_casts(text_decoding_casts)
267  , text_encoding_casts(text_encoding_casts) {}
268 };
269 
270 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
271  public:
272  TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
274  default_disregard_casts_to_none_encoding) {}
275 
276  protected:
277  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
279  // Save state of input disregard_casts_to_none_encoding_ as child node traversal
280  // will reset it
281  const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
282  result = aggregateResult(result, visit(u_oper->get_operand()));
283  if (u_oper->get_optype() != kCAST) {
284  return result;
285  }
286  const auto& operand_ti = u_oper->get_operand()->get_type_info();
287  const auto& casted_ti = u_oper->get_type_info();
288  if (!operand_ti.is_string() && casted_ti.is_dict_encoded_string()) {
289  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
290  }
291  if (!casted_ti.is_string()) {
292  return result;
293  }
294  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
295  if (literals_only) {
296  return result;
297  }
298  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
299  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
300  }
301  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
302  if (!disregard_casts_to_none_encoding) {
303  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
304  } else {
305  return result;
306  }
307  }
308  return result;
309  }
310 
312  const Analyzer::StringOper* string_oper) const override {
314  if (string_oper->getArity() > 0) {
315  result = aggregateResult(result, visit(string_oper->getArg(0)));
316  }
317  if (string_op_returns_string(string_oper->get_kind()) &&
318  string_oper->hasNoneEncodedTextArg()) {
319  result = aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
320  }
321  return result;
322  }
323 
324  TextEncodingCastCounts visitBinOper(const Analyzer::BinOper* bin_oper) const override {
326  // Currently the join framework handles casts between string types, and
327  // casts to none-encoded strings should be considered spurious, except
328  // when the join predicate is not a =/<> operator, in which case
329  // for both join predicates and all other instances we have to decode
330  // to a none-encoded string to do the comparison. The logic below essentially
331  // overrides the logic such as to always count none-encoded casts on strings
332  // that are children of binary operators other than =/<>
333  if (bin_oper->get_optype() != kEQ && bin_oper->get_optype() != kNE) {
334  // Override the global override so that join opers don't skip
335  // the check when there is an actual cast to none-encoded string
336  const auto prev_disregard_casts_to_none_encoding_state =
338  const auto left_u_oper =
339  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
340  if (left_u_oper && left_u_oper->get_optype() == kCAST) {
342  result = aggregateResult(result, visitUOper(left_u_oper));
343  } else {
344  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
345  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
346  }
347 
348  const auto right_u_oper =
349  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
350  if (right_u_oper && right_u_oper->get_optype() == kCAST) {
352  result = aggregateResult(result, visitUOper(right_u_oper));
353  } else {
354  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
355  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
356  }
357  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
358  } else {
359  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
360  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
361  }
362  return result;
363  }
364 
367  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
368  const auto prev_disregard_casts_to_none_encoding_state =
370  if (u_oper && u_oper->get_optype() == kCAST) {
372  result = aggregateResult(result, visitUOper(u_oper));
373  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
374  } else {
375  result = aggregateResult(result, visit(like->get_arg()));
376  }
377  result = aggregateResult(result, visit(like->get_like_expr()));
378  if (like->get_escape_expr()) {
379  result = aggregateResult(result, visit(like->get_escape_expr()));
380  }
381  return result;
382  }
383 
385  const TextEncodingCastCounts& aggregate,
386  const TextEncodingCastCounts& next_result) const override {
387  auto result = aggregate;
388  result.text_decoding_casts += next_result.text_decoding_casts;
389  result.text_encoding_casts += next_result.text_encoding_casts;
390  return result;
391  }
392 
393  void visitBegin() const override {
395  }
396 
398  return TextEncodingCastCounts();
399  }
400 
401  private:
402  mutable bool disregard_casts_to_none_encoding_ = false;
404 };
405 
407  TextEncodingCastCounts cast_counts;
408 
409  auto check_node_for_text_casts = [&cast_counts](
410  const Analyzer::Expr* expr,
411  const bool disregard_casts_to_none_encoding) {
412  if (!expr) {
413  return;
414  }
415  TextEncodingCastCountVisitor visitor(disregard_casts_to_none_encoding);
416  const auto this_node_cast_counts = visitor.visit(expr);
417  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
418  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
419  };
420 
421  for (const auto& qual : ra_exe_unit.quals) {
422  check_node_for_text_casts(qual.get(), false);
423  }
424  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
425  check_node_for_text_casts(simple_qual.get(), false);
426  }
427  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
428  check_node_for_text_casts(groupby_expr.get(), false);
429  }
430  for (const auto& target_expr : ra_exe_unit.target_exprs) {
431  check_node_for_text_casts(target_expr, false);
432  }
433  for (const auto& join_condition : ra_exe_unit.join_quals) {
434  for (const auto& join_qual : join_condition.quals) {
435  // We currently need to not count casts to none-encoded strings for join quals,
436  // as analyzer will generate these but our join framework disregards them.
437  // Some investigation was done on having analyzer issue the correct inter-string
438  // dictionary casts, but this actually causes them to get executed and so the same
439  // work gets done twice.
440  check_node_for_text_casts(join_qual.get(),
441  true /* disregard_casts_to_none_encoding */);
442  }
443  }
444  return cast_counts;
445 }
446 
447 namespace {
448 
450  const std::vector<InputTableInfo>& query_infos,
451  const RelAlgExecutionUnit& ra_exe_unit) {
452  if (!g_enable_watchdog) {
453  return;
454  }
455  auto const tuples_upper_bound =
456  std::accumulate(query_infos.cbegin(),
457  query_infos.cend(),
458  size_t(0),
459  [](auto max, auto const& query_info) {
460  return std::max(max, query_info.info.getNumTuples());
461  });
462  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
463  return;
464  }
465 
466  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
467  const bool has_text_casts =
468  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
469 
470  if (!has_text_casts) {
471  return;
472  }
473  std::ostringstream oss;
474  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
475  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
476  << "exceeds the configured watchdog none-encoded string translation limit of "
478  throw std::runtime_error(oss.str());
479 }
480 
481 } // namespace
482 
484  RenderInfo* render_info) const {
485  auto validate_or_explain_query = is_validate_or_explain_query(eo);
486  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
488  !validate_or_explain_query && !hasStepForUnion() &&
489  !query_for_partial_outer_frag &&
490  (!render_info || (render_info && !render_info->isInSitu()));
491 }
492 
494  const ExecutionOptions& eo) {
495  if (eo.find_push_down_candidates) {
496  return 0;
497  }
498 
499  if (eo.just_explain) {
500  return 0;
501  }
502 
503  CHECK(query_dag_);
504 
505  query_dag_->resetQueryExecutionState();
506  const auto& ra = query_dag_->getRootNode();
507 
508  auto lock = executor_->acquireExecuteMutex();
509  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
510  setupCaching(&ra);
511 
512  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
513  auto ed_seq = RaExecutionSequence(&ra, executor_);
514 
515  if (!getSubqueries().empty()) {
516  return 0;
517  }
518 
519  CHECK(!ed_seq.empty());
520  if (ed_seq.size() > 1) {
521  return 0;
522  }
523 
526  executor_->temporary_tables_ = &temporary_tables_;
527 
528  auto exec_desc_ptr = ed_seq.getDescriptor(0);
529  CHECK(exec_desc_ptr);
530  auto& exec_desc = *exec_desc_ptr;
531  const auto body = exec_desc.getBody();
532  if (body->isNop()) {
533  return 0;
534  }
535 
536  const auto project = dynamic_cast<const RelProject*>(body);
537  if (project) {
538  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
539  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
540  executor_);
541  }
542 
543  const auto compound = dynamic_cast<const RelCompound*>(body);
544  if (compound) {
545  if (compound->isDeleteViaSelect()) {
546  return 0;
547  } else if (compound->isUpdateViaSelect()) {
548  return 0;
549  } else {
550  if (compound->isAggregate()) {
551  return 0;
552  }
553 
554  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
555 
556  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
557  executor_);
558  }
559  }
560 
561  return 0;
562 }
563 
565  const ExecutionOptions& eo,
566  const bool just_explain_plan,
567  const bool explain_verbose,
568  RenderInfo* render_info) {
569  CHECK(query_dag_);
571  << static_cast<int>(query_dag_->getBuildState());
572 
573  auto timer = DEBUG_TIMER(__func__);
575 
576  auto run_query = [&](const CompilationOptions& co_in) {
577  auto execution_result = executeRelAlgQueryNoRetry(
578  co_in, eo, just_explain_plan, explain_verbose, render_info);
579 
580  constexpr bool vlog_result_set_summary{false};
581  if constexpr (vlog_result_set_summary) {
582  VLOG(1) << execution_result.getRows()->summaryToString();
583  }
584 
586  VLOG(1) << "Running post execution callback.";
587  (*post_execution_callback_)();
588  }
589  return execution_result;
590  };
591 
592  try {
593  return run_query(co);
594  } catch (const QueryMustRunOnCpu&) {
595  if (!g_allow_cpu_retry) {
596  throw;
597  }
598  }
599  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
600  auto co_cpu = CompilationOptions::makeCpuOnly(co);
601 
602  if (render_info) {
603  render_info->forceNonInSitu();
604  }
605  return run_query(co_cpu);
606 }
607 
609  const ExecutionOptions& eo,
610  const bool just_explain_plan,
611  const bool explain_verbose,
612  RenderInfo* render_info) {
614  auto timer = DEBUG_TIMER(__func__);
615  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
616 
617  query_dag_->resetQueryExecutionState();
618  const auto& ra = query_dag_->getRootNode();
619 
620  // capture the lock acquistion time
621  auto clock_begin = timer_start();
623  executor_->resetInterrupt();
624  }
625  std::string query_session{""};
626  std::string query_str{"N/A"};
627  std::string query_submitted_time{""};
628  // gather necessary query's info
629  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
630  query_session = query_state_->getConstSessionInfo()->get_session_id();
631  query_str = query_state_->getQueryStr();
632  query_submitted_time = query_state_->getQuerySubmittedTime();
633  }
634 
635  auto validate_or_explain_query =
636  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
637  auto interruptable = !render_info && !query_session.empty() &&
638  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
639  if (interruptable) {
640  // if we reach here, the current query which was waiting an idle executor
641  // within the dispatch queue is now scheduled to the specific executor
642  // (not UNITARY_EXECUTOR)
643  // so we update the query session's status with the executor that takes this query
644  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
645  query_session, query_str, query_submitted_time);
646 
647  // now the query is going to be executed, so update the status as
648  // "RUNNING_QUERY_KERNEL"
649  executor_->updateQuerySessionStatus(
650  query_session,
651  query_submitted_time,
652  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
653  }
654 
655  // so it should do cleanup session info after finishing its execution
656  ScopeGuard clearQuerySessionInfo =
657  [this, &query_session, &interruptable, &query_submitted_time] {
658  // reset the runtime query interrupt status after the end of query execution
659  if (interruptable) {
660  // cleanup running session's info
661  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
662  }
663  };
664 
665  auto acquire_execute_mutex = [](Executor * executor) -> auto{
666  auto ret = executor->acquireExecuteMutex();
667  return ret;
668  };
669  // now we acquire executor lock in here to make sure that this executor holds
670  // all necessary resources and at the same time protect them against other executor
671  auto lock = acquire_execute_mutex(executor_);
672 
673  if (interruptable) {
674  // check whether this query session is "already" interrupted
675  // this case occurs when there is very short gap between being interrupted and
676  // taking the execute lock
677  // if so we have to remove "all" queries initiated by this session and we do in here
678  // without running the query
679  try {
680  executor_->checkPendingQueryStatus(query_session);
681  } catch (QueryExecutionError& e) {
682  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
683  throw std::runtime_error("Query execution has been interrupted (pending query)");
684  }
685  throw e;
686  } catch (...) {
687  throw std::runtime_error("Checking pending query status failed: unknown error");
688  }
689  }
690  int64_t queue_time_ms = timer_stop(clock_begin);
691 
693 
694  // Notify foreign tables to load prior to caching
696 
697  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
698  setupCaching(&ra);
699 
700  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
701  auto ed_seq = RaExecutionSequence(&ra, executor_);
702 
703  if (just_explain_plan) {
704  const auto& ra = getRootRelAlgNode();
705  auto ed_seq = RaExecutionSequence(&ra, executor_);
706  std::vector<const RelAlgNode*> steps;
707  for (size_t i = 0; i < ed_seq.size(); i++) {
708  steps.emplace_back(ed_seq.getDescriptor(i)->getBody());
709  steps.back()->setStepNumber(i + 1);
710  }
711  std::stringstream ss;
712  auto& nodes = getRelAlgDag()->getNodes();
713  RelAlgDagViewer dagv(false, explain_verbose);
714  dagv.handleQueryEngineVector(nodes);
715  ss << dagv;
716  auto rs = std::make_shared<ResultSet>(ss.str());
717  return {rs, {}};
718  }
719 
720  if (eo.find_push_down_candidates) {
721  // this extra logic is mainly due to current limitations on multi-step queries
722  // and/or subqueries.
724  ed_seq, co, eo, render_info, queue_time_ms);
725  }
726  timer_setup.stop();
727 
728  // Dispatch the subqueries first
729  const auto global_hints = getGlobalQueryHint();
730  for (auto subquery : getSubqueries()) {
731  const auto subquery_ra = subquery->getRelAlg();
732  CHECK(subquery_ra);
733  if (subquery_ra->hasContextData()) {
734  continue;
735  }
736  // Execute the subquery and cache the result.
737  RelAlgExecutor subquery_executor(executor_, query_state_);
738  // Propagate global and local query hint if necessary
739  const auto local_hints = getParsedQueryHint(subquery_ra);
740  if (global_hints || local_hints) {
741  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
742  if (global_hints) {
743  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
744  }
745  if (local_hints) {
746  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
747  }
748  }
749  RaExecutionSequence subquery_seq(subquery_ra, executor_);
750  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
751  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
752  }
753  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
754 }
755 
757  AggregatedColRange agg_col_range_cache;
758  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
759  return executor_->computeColRangesCache(phys_inputs);
760 }
761 
763  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
764  return executor_->computeStringDictionaryGenerations(phys_inputs);
765 }
766 
768  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
769  return executor_->computeTableGenerations(phys_table_ids);
770 }
771 
772 Executor* RelAlgExecutor::getExecutor() const {
773  return executor_;
774 }
775 
777  CHECK(executor_);
778  executor_->row_set_mem_owner_ = nullptr;
779 }
780 
781 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
783  auto sort_node = dynamic_cast<const RelSort*>(root_node);
784  if (sort_node) {
785  // we assume that test query that needs join info does not contain any sort node
786  return {};
787  }
788  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
790  auto left_deep_tree_ids = visitor.visit(root_node);
791  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
792 }
793 
794 namespace {
795 
797  CHECK_EQ(size_t(1), sort->inputCount());
798  const auto source = sort->getInput(0);
799  if (dynamic_cast<const RelSort*>(source)) {
800  throw std::runtime_error("Sort node not supported as input to another sort");
801  }
802 }
803 
804 } // namespace
805 
807  const RaExecutionSequence& seq,
808  const size_t step_idx,
809  const CompilationOptions& co,
810  const ExecutionOptions& eo,
811  RenderInfo* render_info) {
812  INJECT_TIMER(executeRelAlgQueryStep);
813 
814  auto exe_desc_ptr = seq.getDescriptor(step_idx);
815  CHECK(exe_desc_ptr);
816  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
817 
818  size_t shard_count{0};
819  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
820  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
821  };
822 
823  if (sort) {
825  auto order_entries = sort->getOrderEntries();
826  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
827  shard_count =
828  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
829  if (!shard_count) {
830  // No point in sorting on the leaf, only execute the input to the sort node.
831  CHECK_EQ(size_t(1), sort->inputCount());
832  const auto source = sort->getInput(0);
833  if (sort->collationCount() || node_is_aggregate(source)) {
834  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
835  CHECK_EQ(temp_seq.size(), size_t(1));
836  ExecutionOptions eo_copy = eo;
837  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
838  // Use subseq to avoid clearing existing temporary tables
839  return {
840  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
841  merge_type(source),
842  source->getId(),
843  false};
844  }
845  }
846  }
847  QueryStepExecutionResult query_step_result{ExecutionResult{},
848  merge_type(exe_desc_ptr->getBody()),
849  exe_desc_ptr->getBody()->getId(),
850  false};
851  try {
852  query_step_result.result = executeRelAlgSubSeq(
853  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
854  } catch (QueryMustRunOnCpu const& e) {
856  auto copied_co = co;
858  LOG(INFO) << "Retry the query via CPU mode";
859  query_step_result.result = executeRelAlgSubSeq(seq,
860  std::make_pair(step_idx, step_idx + 1),
861  copied_co,
862  eo,
863  render_info,
865  }
867  VLOG(1) << "Running post execution callback.";
868  (*post_execution_callback_)();
869  }
870  return query_step_result;
871 }
872 
874  const AggregatedColRange& agg_col_range,
875  const StringDictionaryGenerations& string_dictionary_generations,
876  const TableGenerations& table_generations) {
877  // capture the lock acquistion time
878  auto clock_begin = timer_start();
880  executor_->resetInterrupt();
881  }
882  queue_time_ms_ = timer_stop(clock_begin);
883  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(
884  Executor::getArenaBlockSize(), executor_->executor_id_);
885  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
886  executor_->table_generations_ = table_generations;
887  executor_->agg_col_range_cache_ = agg_col_range;
888 }
889 
891  const CompilationOptions& co,
892  const ExecutionOptions& eo,
893  RenderInfo* render_info,
894  const int64_t queue_time_ms,
895  const bool with_existing_temp_tables) {
897  auto timer = DEBUG_TIMER(__func__);
898  if (!with_existing_temp_tables) {
900  }
903  executor_->temporary_tables_ = &temporary_tables_;
904 
905  time(&now_);
906  CHECK(!seq.empty());
907 
908  auto get_descriptor_count = [&seq, &eo]() -> size_t {
909  if (eo.just_explain) {
910  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
911  // run the logical values descriptor to generate the result set, then the next
912  // descriptor to generate the explain
913  CHECK_GE(seq.size(), size_t(2));
914  return 2;
915  } else {
916  return 1;
917  }
918  } else {
919  return seq.size();
920  }
921  };
922 
923  const auto exec_desc_count = get_descriptor_count();
924  auto eo_copied = eo;
925  if (seq.hasQueryStepForUnion()) {
926  // we currently do not support resultset recycling when an input query
927  // contains union (all) operation
928  eo_copied.keep_result = false;
929  }
930 
931  // we have to register resultset(s) of the skipped query step(s) as temporary table
932  // before executing the remaining query steps
933  // since they may be required during the query processing
934  // i.e., get metadata of the target expression from the skipped query step
936  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
937  const auto cached_res =
938  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
939  CHECK(cached_res);
940  addTemporaryTable(kv.first, cached_res);
941  }
942  }
943 
944  const auto num_steps = exec_desc_count - 1;
945  for (size_t i = 0; i < exec_desc_count; i++) {
946  VLOG(1) << "Executing query step " << i << " / " << num_steps;
947  try {
949  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
950  } catch (const QueryMustRunOnCpu&) {
951  // Do not allow per-step retry if flag is off or in distributed mode
952  // TODO(todd): Determine if and when we can relax this restriction
953  // for distributed
956  throw;
957  }
958  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
959  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
960  if (render_info && i == num_steps) {
961  // only render on the last step
962  render_info->forceNonInSitu();
963  }
964  executeRelAlgStep(seq,
965  i,
966  co_cpu,
967  eo_copied,
968  (i == num_steps) ? render_info : nullptr,
969  queue_time_ms);
970  } catch (const NativeExecutionError&) {
971  if (!g_enable_interop) {
972  throw;
973  }
974  auto eo_extern = eo_copied;
975  eo_extern.executor_type = ::ExecutorType::Extern;
976  auto exec_desc_ptr = seq.getDescriptor(i);
977  const auto body = exec_desc_ptr->getBody();
978  const auto compound = dynamic_cast<const RelCompound*>(body);
979  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
980  LOG(INFO) << "Also failed to run the query using interoperability";
981  throw;
982  }
984  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
985  }
986  }
987 
988  return seq.getDescriptor(num_steps)->getResult();
989 }
990 
992  const RaExecutionSequence& seq,
993  const std::pair<size_t, size_t> interval,
994  const CompilationOptions& co,
995  const ExecutionOptions& eo,
996  RenderInfo* render_info,
997  const int64_t queue_time_ms) {
999  executor_->temporary_tables_ = &temporary_tables_;
1001  time(&now_);
1002  for (size_t i = interval.first; i < interval.second; i++) {
1003  // only render on the last step
1004  try {
1005  executeRelAlgStep(seq,
1006  i,
1007  co,
1008  eo,
1009  (i == interval.second - 1) ? render_info : nullptr,
1010  queue_time_ms);
1011  } catch (const QueryMustRunOnCpu&) {
1012  // Do not allow per-step retry if flag is off or in distributed mode
1013  // TODO(todd): Determine if and when we can relax this restriction
1014  // for distributed
1017  throw;
1018  }
1019  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1020  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1021  if (render_info && i == interval.second - 1) {
1022  render_info->forceNonInSitu();
1023  }
1024  executeRelAlgStep(seq,
1025  i,
1026  co_cpu,
1027  eo,
1028  (i == interval.second - 1) ? render_info : nullptr,
1029  queue_time_ms);
1030  }
1031  }
1032 
1033  return seq.getDescriptor(interval.second - 1)->getResult();
1034 }
1035 
1036 namespace {
1037 void handle_query_hint(RegisteredQueryHint const& query_hints,
1038  ExecutionOptions& eo,
1039  CompilationOptions& co) {
1040  auto columnar_output_hint_enabled = false;
1041  auto rowwise_output_hint_enabled = false;
1042  if (query_hints.isHintRegistered(QueryHint::kCpuMode)) {
1043  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1045  }
1046  if (query_hints.isHintRegistered(QueryHint::kKeepResult)) {
1047  if (!g_enable_data_recycler) {
1048  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1049  "recycler is disabled";
1050  }
1052  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1053  "resultset recycler is disabled";
1054  } else {
1055  VLOG(1) << "A user enables keeping query resultset";
1056  eo.keep_result = true;
1057  }
1058  }
1060  // we use this hint within the function 'executeTableFunction`
1061  if (!g_enable_data_recycler) {
1062  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1063  "since data recycler is disabled";
1064  }
1066  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1067  "since query resultset recycler is disabled";
1068  } else {
1069  VLOG(1) << "A user enables keeping table function's resultset";
1070  eo.keep_result = true;
1071  }
1072  }
1073  if (query_hints.isHintRegistered(QueryHint::kWatchdog)) {
1074  if (!eo.with_watchdog) {
1075  VLOG(1) << "A user enables watchdog for this query";
1076  eo.with_watchdog = true;
1077  }
1078  }
1079  if (query_hints.isHintRegistered(QueryHint::kWatchdogOff)) {
1080  if (eo.with_watchdog) {
1081  VLOG(1) << "A user disables watchdog for this query";
1082  eo.with_watchdog = false;
1083  }
1084  }
1085  if (query_hints.isHintRegistered(QueryHint::kDynamicWatchdog)) {
1086  if (!eo.with_dynamic_watchdog) {
1087  VLOG(1) << "A user enables dynamic watchdog for this query";
1088  eo.with_watchdog = true;
1089  }
1090  }
1092  if (eo.with_dynamic_watchdog) {
1093  VLOG(1) << "A user disables dynamic watchdog for this query";
1094  eo.with_watchdog = false;
1095  }
1096  }
1097  if (query_hints.isHintRegistered(QueryHint::kQueryTimeLimit)) {
1098  std::ostringstream oss;
1099  oss << "A user sets query time limit to " << query_hints.query_time_limit << " ms";
1101  if (!eo.with_dynamic_watchdog) {
1102  eo.with_dynamic_watchdog = true;
1103  oss << " (and system automatically enables dynamic watchdog to activate the "
1104  "given \"query_time_limit\" hint)";
1105  }
1106  VLOG(1) << oss.str();
1107  }
1108  if (query_hints.isHintRegistered(QueryHint::kAllowLoopJoin)) {
1109  VLOG(1) << "A user enables loop join";
1110  eo.allow_loop_joins = true;
1111  }
1112  if (query_hints.isHintRegistered(QueryHint::kDisableLoopJoin)) {
1113  VLOG(1) << "A user disables loop join";
1114  eo.allow_loop_joins = false;
1115  }
1118  VLOG(1) << "A user forces the maximum size of a join hash table as "
1119  << eo.max_join_hash_table_size << " bytes";
1120  }
1122  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize) ||
1124  VLOG(1) << "Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1125  "of the following query hints are given simultaneously: "
1126  "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1127  } else {
1128  VLOG(1) << "A user enables optimization of cuda block and grid sizes";
1130  }
1131  }
1133  // disable table reordering if `table_reordering_off` is enabled
1134  VLOG(1) << "A user disables table reordering listed in the FROM clause";
1135  eo.table_reordering = false;
1136  }
1137  if (query_hints.isHintRegistered(QueryHint::kColumnarOutput)) {
1138  VLOG(1) << "A user forces the query to run with columnar output";
1139  columnar_output_hint_enabled = true;
1140  } else if (query_hints.isHintRegistered(QueryHint::kRowwiseOutput)) {
1141  VLOG(1) << "A user forces the query to run with rowwise output";
1142  rowwise_output_hint_enabled = true;
1143  }
1144  auto columnar_output_enabled = eo.output_columnar_hint ? !rowwise_output_hint_enabled
1145  : columnar_output_hint_enabled;
1146  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1147  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1148  "output layout in distributed mode.";
1149  }
1150  eo.output_columnar_hint = columnar_output_enabled;
1151 }
1152 } // namespace
1153 
1155  const size_t step_idx,
1156  const CompilationOptions& co,
1157  const ExecutionOptions& eo,
1158  RenderInfo* render_info,
1159  const int64_t queue_time_ms) {
1161  auto timer = DEBUG_TIMER(__func__);
1162  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1163  CHECK(exec_desc_ptr);
1164  auto& exec_desc = *exec_desc_ptr;
1165  const auto body = exec_desc.getBody();
1166  if (body->isNop()) {
1167  handleNop(exec_desc);
1168  return;
1169  }
1170  ExecutionOptions eo_copied = eo;
1171  CompilationOptions co_copied = co;
1172  eo_copied.with_watchdog =
1173  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1174  eo_copied.outer_fragment_indices =
1175  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1176 
1177  auto target_node = body;
1178  auto query_plan_dag_hash = body->getQueryPlanDagHash();
1179  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1180  target_node = sort_body->getInput(0);
1182  target_node->getQueryPlanDagHash(), SortInfo::createFromSortNode(sort_body));
1183  } else {
1185  target_node->getQueryPlanDagHash(), SortInfo());
1186  }
1187  auto query_hints = getParsedQueryHint(target_node);
1188  if (query_hints) {
1189  handle_query_hint(*query_hints, eo_copied, co_copied);
1190  }
1191 
1193  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1194  if (auto cached_resultset =
1195  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
1196  query_plan_dag_hash)) {
1197  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1198  << " from resultset cache";
1199  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1200  if (render_info) {
1201  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1202  executor_->getResultSetRecyclerHolder().getTargetExprs(query_plan_dag_hash);
1203  std::vector<Analyzer::Expr*> copied_target_exprs;
1204  for (const auto& expr : cached_target_exprs) {
1205  copied_target_exprs.push_back(expr.get());
1206  }
1208  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1209  }
1210  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1211  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1212  return;
1213  }
1214  }
1215 
1216  const auto compound = dynamic_cast<const RelCompound*>(body);
1217  if (compound) {
1218  if (compound->isDeleteViaSelect()) {
1219  executeDelete(compound, co_copied, eo_copied, queue_time_ms);
1220  } else if (compound->isUpdateViaSelect()) {
1221  executeUpdate(compound, co_copied, eo_copied, queue_time_ms);
1222  } else {
1223  exec_desc.setResult(
1224  executeCompound(compound, co_copied, eo_copied, render_info, queue_time_ms));
1225  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1226  << static_cast<int>(-compound->getId()) << ", ...)"
1227  << " exec_desc.getResult().getDataPtr()->rowCount()="
1228  << exec_desc.getResult().getDataPtr()->rowCount();
1229  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1230  return;
1231  }
1232  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1233  }
1234  return;
1235  }
1236  const auto project = dynamic_cast<const RelProject*>(body);
1237  if (project) {
1238  if (project->isDeleteViaSelect()) {
1239  executeDelete(project, co_copied, eo_copied, queue_time_ms);
1240  } else if (project->isUpdateViaSelect()) {
1241  executeUpdate(project, co_copied, eo_copied, queue_time_ms);
1242  } else {
1243  std::optional<size_t> prev_count;
1244  // Disabling the intermediate count optimization in distributed, as the previous
1245  // execution descriptor will likely not hold the aggregated result.
1246  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1247  // If the previous node produced a reliable count, skip the pre-flight count.
1248  RelAlgNode const* const prev_body = project->getInput(0);
1249  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1250  if (RaExecutionDesc const* const prev_exec_desc =
1251  prev_body->hasContextData()
1252  ? prev_body->getContextData()
1253  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1254  const auto& prev_exe_result = prev_exec_desc->getResult();
1255  const auto prev_result = prev_exe_result.getRows();
1256  if (prev_result) {
1257  prev_count = prev_result->rowCount();
1258  VLOG(3) << "Setting output row count for projection node to previous node ("
1259  << prev_exec_desc->getBody()->toString(
1261  << ") to " << *prev_count;
1262  }
1263  }
1264  }
1265  }
1266  exec_desc.setResult(executeProject(
1267  project, co_copied, eo_copied, render_info, queue_time_ms, prev_count));
1268  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1269  << static_cast<int>(-project->getId()) << ", ...)"
1270  << " exec_desc.getResult().getDataPtr()->rowCount()="
1271  << exec_desc.getResult().getDataPtr()->rowCount();
1272  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1273  return;
1274  }
1275  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1276  }
1277  return;
1278  }
1279  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1280  if (aggregate) {
1281  exec_desc.setResult(
1282  executeAggregate(aggregate, co_copied, eo_copied, render_info, queue_time_ms));
1283  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1284  return;
1285  }
1286  const auto filter = dynamic_cast<const RelFilter*>(body);
1287  if (filter) {
1288  exec_desc.setResult(
1289  executeFilter(filter, co_copied, eo_copied, render_info, queue_time_ms));
1290  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1291  return;
1292  }
1293  const auto sort = dynamic_cast<const RelSort*>(body);
1294  if (sort) {
1295  exec_desc.setResult(
1296  executeSort(sort, co_copied, eo_copied, render_info, queue_time_ms));
1297  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1298  return;
1299  }
1300  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1301  return;
1302  }
1303  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1304  if (logical_values) {
1305  exec_desc.setResult(executeLogicalValues(logical_values, eo_copied));
1306  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1307  return;
1308  }
1309  const auto modify = dynamic_cast<const RelModify*>(body);
1310  if (modify) {
1311  exec_desc.setResult(executeModify(modify, eo_copied));
1312  return;
1313  }
1314  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1315  if (logical_union) {
1316  exec_desc.setResult(executeUnion(
1317  logical_union, seq, co_copied, eo_copied, render_info, queue_time_ms));
1318  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1319  return;
1320  }
1321  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1322  if (table_func) {
1323  exec_desc.setResult(
1324  executeTableFunction(table_func, co_copied, eo_copied, queue_time_ms));
1325  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1326  return;
1327  }
1328  LOG(FATAL) << "Unhandled body type: "
1329  << body->toString(RelRexToStringConfig::defaults());
1330 }
1331 
1333  // just set the result of the previous node as the result of no op
1334  auto body = ed.getBody();
1335  CHECK(dynamic_cast<const RelAggregate*>(body));
1336  CHECK_EQ(size_t(1), body->inputCount());
1337  const auto input = body->getInput(0);
1338  body->setOutputMetainfo(input->getOutputMetainfo());
1339  const auto it = temporary_tables_.find(-input->getId());
1340  CHECK(it != temporary_tables_.end());
1341  // set up temp table as it could be used by the outer query or next step
1342  addTemporaryTable(-body->getId(), it->second);
1343 
1344  ed.setResult({it->second, input->getOutputMetainfo()});
1345 }
1346 
1347 namespace {
1348 
1349 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1350  public:
1351  RexUsedInputsVisitor() : RexVisitor() {}
1352 
1353  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1354  return synthesized_physical_inputs_owned;
1355  }
1356 
1357  std::unordered_set<const RexInput*> visitInput(
1358  const RexInput* rex_input) const override {
1359  const auto input_ra = rex_input->getSourceNode();
1360  CHECK(input_ra);
1361  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1362  if (scan_ra) {
1363  const auto td = scan_ra->getTableDescriptor();
1364  if (td) {
1365  const auto col_id = rex_input->getIndex();
1366  const auto cd =
1367  scan_ra->getCatalog().getMetadataForColumnBySpi(td->tableId, col_id + 1);
1368  if (cd && cd->columnType.get_physical_cols() > 0) {
1369  CHECK(IS_GEO(cd->columnType.get_type()));
1370  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1371  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1372  auto physical_input =
1373  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1374  synthesized_physical_inputs_owned.emplace_back(physical_input);
1375  synthesized_physical_inputs.insert(physical_input);
1376  }
1377  return synthesized_physical_inputs;
1378  }
1379  }
1380  }
1381  return {rex_input};
1382  }
1383 
1384  protected:
1385  std::unordered_set<const RexInput*> aggregateResult(
1386  const std::unordered_set<const RexInput*>& aggregate,
1387  const std::unordered_set<const RexInput*>& next_result) const override {
1388  auto result = aggregate;
1389  result.insert(next_result.begin(), next_result.end());
1390  return result;
1391  }
1392 
1393  private:
1394  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1395 };
1396 
1397 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1398  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1399  return table_func;
1400  }
1401  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1402  CHECK_EQ(size_t(2), join->inputCount());
1403  return join;
1404  }
1405  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1406  CHECK_EQ(size_t(1), ra_node->inputCount());
1407  }
1408  auto only_src = ra_node->getInput(0);
1409  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1410  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1411  return is_join ? only_src : ra_node;
1412 }
1413 
1414 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1415 get_used_inputs(const RelCompound* compound) {
1416  RexUsedInputsVisitor visitor;
1417  const auto filter_expr = compound->getFilterExpr();
1418  std::unordered_set<const RexInput*> used_inputs =
1419  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1420  const auto sources_size = compound->getScalarSourcesSize();
1421  for (size_t i = 0; i < sources_size; ++i) {
1422  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1423  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1424  }
1425  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1426  return std::make_pair(used_inputs, used_inputs_owned);
1427 }
1428 
1429 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1430 get_used_inputs(const RelAggregate* aggregate) {
1431  CHECK_EQ(size_t(1), aggregate->inputCount());
1432  std::unordered_set<const RexInput*> used_inputs;
1433  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1434  const auto source = aggregate->getInput(0);
1435  const auto& in_metainfo = source->getOutputMetainfo();
1436  const auto group_count = aggregate->getGroupByCount();
1437  CHECK_GE(in_metainfo.size(), group_count);
1438  for (size_t i = 0; i < group_count; ++i) {
1439  auto synthesized_used_input = new RexInput(source, i);
1440  used_inputs_owned.emplace_back(synthesized_used_input);
1441  used_inputs.insert(synthesized_used_input);
1442  }
1443  for (const auto& agg_expr : aggregate->getAggExprs()) {
1444  for (size_t i = 0; i < agg_expr->size(); ++i) {
1445  const auto operand_idx = agg_expr->getOperand(i);
1446  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1447  auto synthesized_used_input = new RexInput(source, operand_idx);
1448  used_inputs_owned.emplace_back(synthesized_used_input);
1449  used_inputs.insert(synthesized_used_input);
1450  }
1451  }
1452  return std::make_pair(used_inputs, used_inputs_owned);
1453 }
1454 
1455 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1456 get_used_inputs(const RelProject* project) {
1457  RexUsedInputsVisitor visitor;
1458  std::unordered_set<const RexInput*> used_inputs;
1459  for (size_t i = 0; i < project->size(); ++i) {
1460  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1461  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1462  }
1463  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1464  return std::make_pair(used_inputs, used_inputs_owned);
1465 }
1466 
1467 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1468 get_used_inputs(const RelTableFunction* table_func) {
1469  RexUsedInputsVisitor visitor;
1470  std::unordered_set<const RexInput*> used_inputs;
1471  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1472  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1473  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1474  }
1475  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1476  return std::make_pair(used_inputs, used_inputs_owned);
1477 }
1478 
1479 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1480 get_used_inputs(const RelFilter* filter) {
1481  std::unordered_set<const RexInput*> used_inputs;
1482  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1483  const auto data_sink_node = get_data_sink(filter);
1484  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1485  const auto source = data_sink_node->getInput(nest_level);
1486  const auto scan_source = dynamic_cast<const RelScan*>(source);
1487  if (scan_source) {
1488  CHECK(source->getOutputMetainfo().empty());
1489  for (size_t i = 0; i < scan_source->size(); ++i) {
1490  auto synthesized_used_input = new RexInput(scan_source, i);
1491  used_inputs_owned.emplace_back(synthesized_used_input);
1492  used_inputs.insert(synthesized_used_input);
1493  }
1494  } else {
1495  const auto& partial_in_metadata = source->getOutputMetainfo();
1496  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1497  auto synthesized_used_input = new RexInput(source, i);
1498  used_inputs_owned.emplace_back(synthesized_used_input);
1499  used_inputs.insert(synthesized_used_input);
1500  }
1501  }
1502  }
1503  return std::make_pair(used_inputs, used_inputs_owned);
1504 }
1505 
1506 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1507 get_used_inputs(const RelLogicalUnion* logical_union) {
1508  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1509  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1510  used_inputs_owned.reserve(logical_union->inputCount());
1511  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1512  auto const n_inputs = logical_union->inputCount();
1513  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1514  auto input = logical_union->getInput(nest_level);
1515  for (size_t i = 0; i < input->size(); ++i) {
1516  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1517  used_inputs.insert(used_inputs_owned.back().get());
1518  }
1519  }
1520  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1521 }
1522 
1524  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1525  shared::TableKey table_key{0, int32_t(-ra_node->getId())};
1526  if (scan_ra) {
1527  table_key.db_id = scan_ra->getCatalog().getDatabaseId();
1528  const auto td = scan_ra->getTableDescriptor();
1529  CHECK(td);
1530  table_key.table_id = td->tableId;
1531  }
1532  return table_key;
1533 }
1534 
1535 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1536  const RelAlgNode* ra_node,
1537  const std::vector<size_t>& input_permutation) {
1538  const auto data_sink_node = get_data_sink(ra_node);
1539  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1540  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1541  const auto input_node_idx =
1542  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1543  const auto input_ra = data_sink_node->getInput(input_node_idx);
1544  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1545  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1546  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1547  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1548  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1549  CHECK(it_ok.second);
1550  LOG_IF(DEBUG1, !input_permutation.empty())
1551  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1552  << " to nest level " << input_idx;
1553  }
1554  return input_to_nest_level;
1555 }
1556 
1557 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1559  const auto data_sink_node = get_data_sink(ra_node);
1560  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1561  CHECK_EQ(join->inputCount(), 2u);
1562  const auto condition = join->getCondition();
1563  RexUsedInputsVisitor visitor;
1564  auto condition_inputs = visitor.visit(condition);
1565  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1566  visitor.get_inputs_owned());
1567  return std::make_pair(condition_inputs, condition_inputs_owned);
1568  }
1569 
1570  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1571  CHECK_GE(left_deep_join->inputCount(), 2u);
1572  const auto condition = left_deep_join->getInnerCondition();
1573  RexUsedInputsVisitor visitor;
1574  auto result = visitor.visit(condition);
1575  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1576  ++nesting_level) {
1577  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1578  if (outer_condition) {
1579  const auto outer_result = visitor.visit(outer_condition);
1580  result.insert(outer_result.begin(), outer_result.end());
1581  }
1582  }
1583  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1584  return std::make_pair(result, used_inputs_owned);
1585  }
1586 
1587  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1588  CHECK_GT(ra_node->inputCount(), 1u)
1590  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1591  // no-op
1592  CHECK_GE(ra_node->inputCount(), 0u)
1594  } else {
1595  CHECK_EQ(ra_node->inputCount(), 1u)
1597  }
1598  return std::make_pair(std::unordered_set<const RexInput*>{},
1599  std::vector<std::shared_ptr<RexInput>>{});
1600 }
1601 
1603  std::vector<InputDescriptor>& input_descs,
1604  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1605  const RelAlgNode* ra_node,
1606  const std::unordered_set<const RexInput*>& source_used_inputs,
1607  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1608  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1609  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1610  << " source_used_inputs.size()=" << source_used_inputs.size();
1611  for (const auto used_input : source_used_inputs) {
1612  const auto input_ra = used_input->getSourceNode();
1613  const auto table_key = table_key_from_ra(input_ra);
1614  auto col_id = used_input->getIndex();
1615  auto it = input_to_nest_level.find(input_ra);
1616  if (it != input_to_nest_level.end()) {
1617  const int nest_level = it->second;
1618  if (auto rel_scan = dynamic_cast<const RelScan*>(input_ra)) {
1619  const auto& catalog = rel_scan->getCatalog();
1620  col_id = catalog.getColumnIdBySpi(table_key.table_id, col_id + 1);
1621  }
1622  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1623  col_id, table_key.table_id, table_key.db_id, nest_level));
1624  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1625  throw std::runtime_error("Bushy joins not supported");
1626  }
1627  }
1628 }
1629 
1630 template <class RA>
1631 std::pair<std::vector<InputDescriptor>,
1632  std::list<std::shared_ptr<const InputColDescriptor>>>
1633 get_input_desc_impl(const RA* ra_node,
1634  const std::unordered_set<const RexInput*>& used_inputs,
1635  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1636  const std::vector<size_t>& input_permutation) {
1637  std::vector<InputDescriptor> input_descs;
1638  const auto data_sink_node = get_data_sink(ra_node);
1639  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1640  const auto input_node_idx =
1641  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1642  auto input_ra = data_sink_node->getInput(input_node_idx);
1643  const auto table_key = table_key_from_ra(input_ra);
1644  input_descs.emplace_back(table_key.db_id, table_key.table_id, input_idx);
1645  }
1646  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1647  collect_used_input_desc(input_descs,
1648  input_col_descs_unique, // modified
1649  ra_node,
1650  used_inputs,
1651  input_to_nest_level);
1652  std::unordered_set<const RexInput*> join_source_used_inputs;
1653  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1654  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1655  get_join_source_used_inputs(ra_node);
1656  collect_used_input_desc(input_descs,
1657  input_col_descs_unique, // modified
1658  ra_node,
1659  join_source_used_inputs,
1660  input_to_nest_level);
1661  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1662  input_col_descs_unique.begin(), input_col_descs_unique.end());
1663 
1664  std::sort(input_col_descs.begin(),
1665  input_col_descs.end(),
1666  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1667  std::shared_ptr<const InputColDescriptor> const& rhs) {
1668  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1669  lhs->getColId(),
1670  lhs->getScanDesc().getTableKey()) <
1671  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1672  rhs->getColId(),
1673  rhs->getScanDesc().getTableKey());
1674  });
1675  return {input_descs,
1676  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1677  input_col_descs.end())};
1678 }
1679 
1680 template <class RA>
1681 std::tuple<std::vector<InputDescriptor>,
1682  std::list<std::shared_ptr<const InputColDescriptor>>,
1683  std::vector<std::shared_ptr<RexInput>>>
1684 get_input_desc(const RA* ra_node,
1685  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1686  const std::vector<size_t>& input_permutation) {
1687  std::unordered_set<const RexInput*> used_inputs;
1688  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1689  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node);
1690  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1691  auto input_desc_pair =
1692  get_input_desc_impl(ra_node, used_inputs, input_to_nest_level, input_permutation);
1693  return std::make_tuple(
1694  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1695 }
1696 
1697 size_t get_scalar_sources_size(const RelCompound* compound) {
1698  return compound->getScalarSourcesSize();
1699 }
1700 
1701 size_t get_scalar_sources_size(const RelProject* project) {
1702  return project->size();
1703 }
1704 
1705 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1706  return table_func->getTableFuncInputsSize();
1707 }
1708 
1709 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1710  return compound->getScalarSource(i);
1711 }
1712 
1713 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1714  return project->getProjectAt(i);
1715 }
1716 
1717 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1718  return table_func->getTableFuncInputAt(i);
1719 }
1720 
1721 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1722  const std::shared_ptr<Analyzer::Expr> expr) {
1723  const auto& ti = expr->get_type_info();
1724  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1725  return expr;
1726  }
1727  auto transient_dict_ti = ti;
1728  transient_dict_ti.set_compression(kENCODING_DICT);
1729  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1730  transient_dict_ti.setStringDictKey(shared::StringDictKey::kTransientDictKey);
1731  transient_dict_ti.set_fixed_size();
1732  return expr->add_cast(transient_dict_ti);
1733 }
1734 
1736  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1737  const std::shared_ptr<Analyzer::Expr>& expr) {
1738  try {
1739  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1740  } catch (...) {
1741  scalar_sources.push_back(fold_expr(expr.get()));
1742  }
1743 }
1744 
1745 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1746  const std::shared_ptr<Analyzer::Expr>& input) {
1747  const auto& input_ti = input->get_type_info();
1748  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1749  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1750  }
1751  return input;
1752 }
1753 
1754 template <class RA>
1755 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1756  const RA* ra_node,
1757  const RelAlgTranslator& translator,
1758  const ::ExecutorType executor_type) {
1759  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1760  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1761  VLOG(3) << "get_scalar_sources_size("
1762  << ra_node->toString(RelRexToStringConfig::defaults())
1763  << ") = " << scalar_sources_size;
1764  for (size_t i = 0; i < scalar_sources_size; ++i) {
1765  const auto scalar_rex = scalar_at(i, ra_node);
1766  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1767  // RexRef are synthetic scalars we append at the end of the real ones
1768  // for the sake of taking memory ownership, no real work needed here.
1769  continue;
1770  }
1771 
1772  const auto scalar_expr =
1773  rewrite_array_elements(translator.translate(scalar_rex).get());
1774  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1775  if (executor_type == ExecutorType::Native) {
1776  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1777  } else if (executor_type == ExecutorType::TableFunctions) {
1778  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1779  } else {
1780  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1781  }
1782  }
1783 
1784  return scalar_sources;
1785 }
1786 
1787 template <class RA>
1788 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1789  const RA* ra_node,
1790  const RelAlgTranslator& translator,
1791  int32_t tableId,
1793  const ColumnNameList& colNames,
1794  size_t starting_projection_column_idx) {
1795  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1796  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1797  const auto scalar_rex = scalar_at(i, ra_node);
1798  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1799  // RexRef are synthetic scalars we append at the end of the real ones
1800  // for the sake of taking memory ownership, no real work needed here.
1801  continue;
1802  }
1803 
1804  std::shared_ptr<Analyzer::Expr> translated_expr;
1805  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1806  translated_expr = cast_to_column_type(translator.translate(scalar_rex),
1807  tableId,
1808  cat,
1809  colNames[i - starting_projection_column_idx]);
1810  } else {
1811  translated_expr = translator.translate(scalar_rex);
1812  }
1813  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1814  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1815  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1816  }
1817 
1818  return scalar_sources;
1819 }
1820 
1821 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1822  const RelCompound* compound,
1823  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1824  if (!compound->isAggregate()) {
1825  return {nullptr};
1826  }
1827  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1828  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1829  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1830  }
1831  return groupby_exprs;
1832 }
1833 
1834 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1835  const RelAggregate* aggregate,
1836  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1837  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1838  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1839  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1840  }
1841  return groupby_exprs;
1842 }
1843 
1845  const RelAlgTranslator& translator) {
1846  const auto filter_rex = compound->getFilterExpr();
1847  const auto filter_expr = filter_rex ? translator.translate(filter_rex) : nullptr;
1848  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1850 }
1851 
1852 namespace {
1853 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1854 // bother decoding it. This is done by changing the sql type to an integer.
1856  size_t target_expr_idx,
1857  std::shared_ptr<Analyzer::Expr>& target_expr,
1858  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1859  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1860  CHECK(agg_expr);
1861  if (agg_expr->get_is_distinct()) {
1862  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1863  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1864  target_exprs_type_infos.emplace(target_expr_idx, ti);
1865  target_expr = target_expr->deep_copy();
1866  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1868  return;
1869  }
1870  }
1871  target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1872 }
1873 } // namespace
1874 
1875 std::vector<Analyzer::Expr*> translate_targets(
1876  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1877  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1878  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1879  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1880  const RelCompound* compound,
1881  const RelAlgTranslator& translator,
1882  const ExecutorType executor_type) {
1883  std::vector<Analyzer::Expr*> target_exprs;
1884  for (size_t i = 0; i < compound->size(); ++i) {
1885  const auto target_rex = compound->getTargetExpr(i);
1886  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1887  std::shared_ptr<Analyzer::Expr> target_expr;
1888  if (target_rex_agg) {
1889  target_expr =
1890  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1891  conditionally_change_arg_to_int_type(i, target_expr, target_exprs_type_infos);
1892  } else {
1893  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1894  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1895  if (target_rex_ref) {
1896  const auto ref_idx = target_rex_ref->getIndex();
1897  CHECK_GE(ref_idx, size_t(1));
1898  CHECK_LE(ref_idx, groupby_exprs.size());
1899  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1900  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1901  } else {
1902  target_expr =
1903  rewrite_array_elements(translator.translate(target_rex_scalar).get());
1904  auto rewritten_expr = rewrite_expr(target_expr.get());
1905  target_expr = fold_expr(rewritten_expr.get());
1906  if (executor_type == ExecutorType::Native) {
1907  try {
1908  target_expr = set_transient_dict(target_expr);
1909  } catch (...) {
1910  // noop
1911  }
1912  } else {
1913  target_expr = cast_dict_to_none(target_expr);
1914  }
1915  }
1916  target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1917  }
1918  CHECK(target_expr);
1919  target_exprs_owned.push_back(target_expr);
1920  target_exprs.push_back(target_expr.get());
1921  }
1922  return target_exprs;
1923 }
1924 
1925 std::vector<Analyzer::Expr*> translate_targets(
1926  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1927  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1928  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1929  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1930  const RelAggregate* aggregate,
1931  const RelAlgTranslator& translator) {
1932  std::vector<Analyzer::Expr*> target_exprs;
1933  size_t group_key_idx = 1;
1934  for (const auto& groupby_expr : groupby_exprs) {
1935  auto target_expr =
1936  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1937  target_exprs_owned.push_back(target_expr);
1938  target_exprs.push_back(target_expr.get());
1939  }
1940 
1941  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1942  auto target_expr =
1943  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1944  CHECK(target_expr);
1945  target_expr = fold_expr(target_expr.get());
1946  target_exprs_owned.push_back(target_expr);
1947  target_exprs.push_back(target_expr.get());
1948  }
1949  return target_exprs;
1950 }
1951 
1953  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1954  return agg_expr && agg_expr->get_is_distinct();
1955 }
1956 
1957 bool is_agg(const Analyzer::Expr* expr) {
1958  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1959  if (agg_expr && agg_expr->get_contains_agg()) {
1960  auto agg_type = agg_expr->get_aggtype();
1962  SQLAgg::kMAX,
1963  SQLAgg::kSUM,
1965  SQLAgg::kAVG>(agg_type)) {
1966  return true;
1967  }
1968  }
1969  return false;
1970 }
1971 
1973  if (is_count_distinct(expr)) {
1974  return SQLTypeInfo(kBIGINT, true);
1975  } else if (is_agg(expr)) {
1977  }
1978  return get_logical_type_info(expr->get_type_info());
1979 }
1980 
1981 template <class RA>
1982 std::vector<TargetMetaInfo> get_targets_meta(
1983  const RA* ra_node,
1984  const std::vector<Analyzer::Expr*>& target_exprs) {
1985  std::vector<TargetMetaInfo> targets_meta;
1986  CHECK_EQ(ra_node->size(), target_exprs.size());
1987  for (size_t i = 0; i < ra_node->size(); ++i) {
1988  CHECK(target_exprs[i]);
1989  // TODO(alex): remove the count distinct type fixup.
1990  auto ti = get_logical_type_for_expr(target_exprs[i]);
1991  targets_meta.emplace_back(
1992  ra_node->getFieldName(i), ti, target_exprs[i]->get_type_info());
1993  }
1994  return targets_meta;
1995 }
1996 
1997 template <>
1998 std::vector<TargetMetaInfo> get_targets_meta(
1999  const RelFilter* filter,
2000  const std::vector<Analyzer::Expr*>& target_exprs) {
2001  RelAlgNode const* input0 = filter->getInput(0);
2002  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
2003  return get_targets_meta(input, target_exprs);
2004  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
2005  return get_targets_meta(input, target_exprs);
2006  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
2007  return get_targets_meta(input, target_exprs);
2008  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
2009  return get_targets_meta(input, target_exprs);
2010  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
2011  return get_targets_meta(input, target_exprs);
2012  } else if (auto const* input = dynamic_cast<RelLogicalValues const*>(input0)) {
2013  return get_targets_meta(input, target_exprs);
2014  }
2015  UNREACHABLE() << "Unhandled node type: "
2017  return {};
2018 }
2019 
2020 } // namespace
2021 
2023  const CompilationOptions& co_in,
2024  const ExecutionOptions& eo_in,
2025  const int64_t queue_time_ms) {
2026  CHECK(node);
2027  auto timer = DEBUG_TIMER(__func__);
2028 
2029  auto co = co_in;
2030  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2031  // encoded string updates
2032 
2033  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2034  auto& work_unit,
2035  const bool is_aggregate) {
2036  auto table_descriptor = node->getModifiedTableDescriptor();
2037  CHECK(table_descriptor);
2038  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2039  throw std::runtime_error(
2040  "UPDATE queries involving variable length columns are only supported on tables "
2041  "with the vacuum attribute set to 'delayed'");
2042  }
2043 
2044  auto catalog = node->getModifiedTableCatalog();
2045  CHECK(catalog);
2046  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2047 
2049  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2050  *catalog,
2051  node->getTargetColumns(),
2052  node->getOutputMetainfo(),
2053  node->isVarlenUpdateRequired());
2054 
2055  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2056 
2057  auto execute_update_ra_exe_unit =
2058  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2059  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2061 
2062  auto eo = eo_in;
2063  if (dml_transaction_parameters_->tableIsTemporary()) {
2064  eo.output_columnar_hint = true;
2065  co_project.allow_lazy_fetch = false;
2066  co_project.filter_on_deleted_column =
2067  false; // project the entire delete column for columnar update
2068  }
2069 
2070  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2071  dml_transaction_parameters_.get());
2072  update_transaction_parameters->setInputSourceNode(node);
2073  CHECK(update_transaction_parameters);
2074  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2075  try {
2076  auto table_update_metadata =
2077  executor_->executeUpdate(ra_exe_unit,
2078  table_infos,
2079  table_descriptor,
2080  co_project,
2081  eo,
2082  *catalog,
2083  executor_->row_set_mem_owner_,
2084  update_callback,
2085  is_aggregate);
2086  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2087  dml_transaction_parameters_->finalizeTransaction(*catalog);
2088  TableOptimizer table_optimizer{
2089  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2090  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2091  };
2092  } catch (const QueryExecutionError& e) {
2093  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2094  }
2095  };
2096 
2097  if (dml_transaction_parameters_->tableIsTemporary()) {
2098  // hold owned target exprs during execution if rewriting
2099  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2100  // rewrite temp table updates to generate the full column by moving the where
2101  // clause into a case if such a rewrite is not possible, bail on the update
2102  // operation build an expr for the update target
2103  auto update_transaction_params =
2104  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2105  CHECK(update_transaction_params);
2106  const auto td = update_transaction_params->getTableDescriptor();
2107  CHECK(td);
2108  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2109  if (update_column_names.size() > 1) {
2110  throw std::runtime_error(
2111  "Multi-column update is not yet supported for temporary tables.");
2112  }
2113 
2114  const auto cd =
2115  catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2116  CHECK(cd);
2117  auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2118  cd->columnType,
2119  shared::ColumnKey{catalog->getDatabaseId(), td->tableId, cd->columnId},
2120  0);
2121  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2122  work_unit.exe_unit, projected_column_to_update);
2123  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2124  throw std::runtime_error(
2125  "Variable length updates not yet supported on temporary tables.");
2126  }
2127  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2128  } else {
2129  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2130  }
2131  };
2132 
2133  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2134  auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2135 
2136  execute_update_for_node(compound, work_unit, compound->isAggregate());
2137  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2138  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2139 
2140  if (project->isSimple()) {
2141  CHECK_EQ(size_t(1), project->inputCount());
2142  const auto input_ra = project->getInput(0);
2143  if (dynamic_cast<const RelSort*>(input_ra)) {
2144  const auto& input_table =
2145  get_temporary_table(&temporary_tables_, -input_ra->getId());
2146  CHECK(input_table);
2147  work_unit.exe_unit.scan_limit = input_table->rowCount();
2148  }
2149  }
2150  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2151  // the first condition means this project node has at least one window function
2152  // and the second condition indicates that this project node falls into
2153  // one of the following cases:
2154  // 1) window function expression on a multi-fragmented table
2155  // 2) window function expression is too complex to evaluate without codegen:
2156  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2157  // evaluate such a complex window function expression
2158  // 3) nested window function expression
2159  // but currently we do not support update on a multi-fragmented table having
2160  // window function, so the second condition only refers to non-fragmented table with
2161  // cases 2) or 3)
2162  // if at least one of two conditions satisfy, we must compute corresponding window
2163  // context before entering `execute_update_for_node` to properly update the table
2164  if (!leaf_results_.empty()) {
2165  throw std::runtime_error(
2166  "Update query having window function is not yet supported in distributed "
2167  "mode.");
2168  }
2169  ColumnCacheMap column_cache;
2170  co.device_type = ExecutorDeviceType::CPU;
2171  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2172  }
2173  execute_update_for_node(project, work_unit, false);
2174  } else {
2175  throw std::runtime_error("Unsupported parent node for update: " +
2176  node->toString(RelRexToStringConfig::defaults()));
2177  }
2178 }
2179 
2181  const CompilationOptions& co,
2182  const ExecutionOptions& eo_in,
2183  const int64_t queue_time_ms) {
2184  CHECK(node);
2185  auto timer = DEBUG_TIMER(__func__);
2186 
2187  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2188  auto& work_unit,
2189  const bool is_aggregate) {
2190  auto* table_descriptor = node->getModifiedTableDescriptor();
2191  CHECK(table_descriptor);
2192  if (!table_descriptor->hasDeletedCol) {
2193  throw std::runtime_error(
2194  "DELETE queries are only supported on tables with the vacuum attribute set to "
2195  "'delayed'");
2196  }
2197 
2198  const auto catalog = node->getModifiedTableCatalog();
2199  CHECK(catalog);
2200  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2201 
2202  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2203 
2204  auto execute_delete_ra_exe_unit =
2205  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2206  const auto& exe_unit, const bool is_aggregate) {
2208  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2209  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2211  CHECK(delete_params);
2212  auto delete_callback = yieldDeleteCallback(*delete_params);
2214 
2215  auto eo = eo_in;
2216  if (dml_transaction_parameters_->tableIsTemporary()) {
2217  eo.output_columnar_hint = true;
2218  co_delete.filter_on_deleted_column =
2219  false; // project the entire delete column for columnar update
2220  } else {
2221  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2222  }
2223 
2224  try {
2225  auto table_update_metadata =
2226  executor_->executeUpdate(exe_unit,
2227  table_infos,
2228  table_descriptor,
2229  co_delete,
2230  eo,
2231  *catalog,
2232  executor_->row_set_mem_owner_,
2233  delete_callback,
2234  is_aggregate);
2235  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2236  dml_transaction_parameters_->finalizeTransaction(*catalog);
2237  TableOptimizer table_optimizer{
2238  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2239  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2240  };
2241  } catch (const QueryExecutionError& e) {
2242  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2243  }
2244  };
2245 
2246  if (table_is_temporary(table_descriptor)) {
2247  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2248  const auto cd = catalog->getDeletedColumn(table_descriptor);
2249  CHECK(cd);
2250  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2251  cd->columnType,
2253  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2254  0);
2255  const auto rewritten_exe_unit =
2256  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2257  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2258  } else {
2259  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2260  }
2261  };
2262 
2263  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2264  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2265  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2266  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2267  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2268  if (project->isSimple()) {
2269  CHECK_EQ(size_t(1), project->inputCount());
2270  const auto input_ra = project->getInput(0);
2271  if (dynamic_cast<const RelSort*>(input_ra)) {
2272  const auto& input_table =
2273  get_temporary_table(&temporary_tables_, -input_ra->getId());
2274  CHECK(input_table);
2275  work_unit.exe_unit.scan_limit = input_table->rowCount();
2276  }
2277  }
2278  execute_delete_for_node(project, work_unit, false);
2279  } else {
2280  throw std::runtime_error("Unsupported parent node for delete: " +
2281  node->toString(RelRexToStringConfig::defaults()));
2282  }
2283 }
2284 
2286  const CompilationOptions& co,
2287  const ExecutionOptions& eo,
2288  RenderInfo* render_info,
2289  const int64_t queue_time_ms) {
2290  auto timer = DEBUG_TIMER(__func__);
2291  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
2292  CompilationOptions co_compound = co;
2293  return executeWorkUnit(work_unit,
2294  compound->getOutputMetainfo(),
2295  compound->isAggregate(),
2296  co_compound,
2297  eo,
2298  render_info,
2299  queue_time_ms);
2300 }
2301 
2303  const CompilationOptions& co,
2304  const ExecutionOptions& eo,
2305  RenderInfo* render_info,
2306  const int64_t queue_time_ms) {
2307  auto timer = DEBUG_TIMER(__func__);
2308  const auto work_unit = createAggregateWorkUnit(aggregate, SortInfo(), eo.just_explain);
2309  return executeWorkUnit(work_unit,
2310  aggregate->getOutputMetainfo(),
2311  true,
2312  co,
2313  eo,
2314  render_info,
2315  queue_time_ms);
2316 }
2317 
2318 namespace {
2319 
2320 // Returns true iff the execution unit contains window functions.
2322  return std::any_of(ra_exe_unit.target_exprs.begin(),
2323  ra_exe_unit.target_exprs.end(),
2324  [](const Analyzer::Expr* expr) {
2325  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2326  });
2327 }
2328 
2329 } // namespace
2330 
2332  const RelProject* project,
2333  const CompilationOptions& co,
2334  const ExecutionOptions& eo,
2335  RenderInfo* render_info,
2336  const int64_t queue_time_ms,
2337  const std::optional<size_t> previous_count) {
2338  auto timer = DEBUG_TIMER(__func__);
2339  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
2340  CompilationOptions co_project = co;
2341  if (project->isSimple()) {
2342  CHECK_EQ(size_t(1), project->inputCount());
2343  const auto input_ra = project->getInput(0);
2344  if (dynamic_cast<const RelSort*>(input_ra)) {
2345  co_project.device_type = ExecutorDeviceType::CPU;
2346  const auto& input_table =
2347  get_temporary_table(&temporary_tables_, -input_ra->getId());
2348  CHECK(input_table);
2349  work_unit.exe_unit.scan_limit =
2350  std::min(input_table->getLimit(), input_table->rowCount());
2351  }
2352  }
2353  return executeWorkUnit(work_unit,
2354  project->getOutputMetainfo(),
2355  false,
2356  co_project,
2357  eo,
2358  render_info,
2359  queue_time_ms,
2360  previous_count);
2361 }
2362 
2364  const CompilationOptions& co_in,
2365  const ExecutionOptions& eo,
2366  const int64_t queue_time_ms) {
2368  auto timer = DEBUG_TIMER(__func__);
2369 
2370  auto co = co_in;
2371 
2372  if (g_cluster) {
2373  throw std::runtime_error("Table functions not supported in distributed mode yet");
2374  }
2375  if (!g_enable_table_functions) {
2376  throw std::runtime_error("Table function support is disabled");
2377  }
2378  auto table_func_work_unit = createTableFunctionWorkUnit(
2379  table_func,
2380  eo.just_explain,
2381  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2382  const auto body = table_func_work_unit.body;
2383  CHECK(body);
2384 
2385  const auto table_infos =
2386  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2387 
2388  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2389  co.device_type,
2391  nullptr,
2392  executor_->blockSize(),
2393  executor_->gridSize()),
2394  {}};
2395 
2396  auto const global_hint = getGlobalQueryHint();
2397  auto const use_resultset_recycler =
2398  canUseResultsetCache(eo, nullptr) && has_valid_query_plan_dag(table_func);
2399  if (use_resultset_recycler) {
2400  auto cached_resultset =
2401  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
2402  table_func->getQueryPlanDagHash());
2403  if (cached_resultset) {
2404  VLOG(1) << "recycle table function's resultset of the root node "
2405  << table_func->getRelNodeDagId() << " from resultset cache";
2406  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2407  addTemporaryTable(-body->getId(), result.getDataPtr());
2408  return result;
2409  }
2410  }
2411 
2412  auto query_exec_time_begin = timer_start();
2413  try {
2414  result = {executor_->executeTableFunction(
2415  table_func_work_unit.exe_unit, table_infos, co, eo),
2416  body->getOutputMetainfo()};
2417  } catch (const QueryExecutionError& e) {
2419  CHECK(e.hasErrorCode(ErrorCode::OUT_OF_GPU_MEM)) << e.getErrorCode();
2420  throw std::runtime_error("Table function ran out of memory during execution");
2421  }
2422  auto query_exec_time = timer_stop(query_exec_time_begin);
2423  result.setQueueTime(queue_time_ms);
2424  auto resultset_ptr = result.getDataPtr();
2425  if (use_resultset_recycler) {
2426  resultset_ptr->setExecTime(query_exec_time);
2427  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2428  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2429  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2430  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2431  auto allow_auto_caching_resultset =
2432  resultset_ptr && resultset_ptr->hasValidBuffer() &&
2434  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2436  if (global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult) ||
2437  allow_auto_caching_resultset) {
2438  if (allow_auto_caching_resultset) {
2439  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2440  }
2441  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
2442  table_func_work_unit.exe_unit.query_plan_dag_hash,
2443  resultset_ptr->getInputTableKeys(),
2444  resultset_ptr,
2445  resultset_ptr->getBufferSizeBytes(co.device_type),
2447  }
2448  } else {
2449  if (eo.keep_result) {
2450  if (g_cluster) {
2451  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2452  "support resultset recycling on distributed mode";
2453  } else if (hasStepForUnion()) {
2454  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2455  "has union-(all) operator";
2456  } else if (is_validate_or_explain_query(eo)) {
2457  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2458  "is either validate or explain query";
2459  } else {
2460  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2461  }
2462  }
2463  }
2464 
2465  return result;
2466 }
2467 
2468 namespace {
2469 
2470 // Creates a new expression which has the range table index set to 1. This is needed to
2471 // reuse the hash join construction helpers to generate a hash table for the window
2472 // function partition: create an equals expression with left and right sides identical
2473 // except for the range table index.
2474 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2475  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2476  if (tuple) {
2477  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2478  for (const auto& element : tuple->getTuple()) {
2479  transformed_tuple.push_back(transform_to_inner(element.get()));
2480  }
2481  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2482  }
2483  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2484  if (!col) {
2485  throw std::runtime_error("Only columns supported in the window partition for now");
2486  }
2487  return makeExpr<Analyzer::ColumnVar>(col->get_type_info(), col->getColumnKey(), 1);
2488 }
2489 
2490 } // namespace
2491 
2493  const CompilationOptions& co,
2494  const ExecutionOptions& eo,
2495  ColumnCacheMap& column_cache_map,
2496  const int64_t queue_time_ms) {
2497  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2498  CHECK_EQ(query_infos.size(), size_t(1));
2499  if (query_infos.front().info.fragments.size() != 1) {
2500  throw std::runtime_error(
2501  "Only single fragment tables supported for window functions for now");
2502  }
2503  if (eo.executor_type == ::ExecutorType::Extern) {
2504  return;
2505  }
2506  query_infos.push_back(query_infos.front());
2507  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2508  // a query may hold multiple window functions having the same partition by condition
2509  // then after building the first hash partition we can reuse it for the rest of
2510  // the window functions
2511  // here, a cached partition can be shared via multiple window function contexts as is
2512  // but sorted partition should be copied to reuse since we use it for (intermediate)
2513  // output buffer
2514  // todo (yoonmin) : support recycler for window function computation?
2515  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2516  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2517  sorted_partition_cache;
2518  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2519  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2520  window_function_context_map;
2521  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2522  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2523  ++target_index) {
2524  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2525  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2526  if (!window_func) {
2527  continue;
2528  }
2529  // Always use baseline layout hash tables for now, make the expression a tuple.
2530  const auto& partition_keys = window_func->getPartitionKeys();
2531  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2532  if (partition_keys.size() >= 1) {
2533  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2534  if (partition_keys.size() > 1) {
2535  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2536  } else {
2537  CHECK_EQ(partition_keys.size(), size_t(1));
2538  partition_key_tuple = partition_keys.front();
2539  }
2540  // Creates a tautology equality with the partition expression on both sides.
2541  partition_key_cond =
2542  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2543  kBW_EQ,
2544  kONE,
2545  partition_key_tuple,
2546  transform_to_inner(partition_key_tuple.get()));
2547  }
2548  auto context =
2549  createWindowFunctionContext(window_func,
2550  partition_key_cond /*nullptr if no partition key*/,
2551  partition_cache,
2552  sorted_partition_key_ref_count_map,
2553  work_unit,
2554  query_infos,
2555  co,
2556  column_cache_map,
2557  executor_->getRowSetMemoryOwner());
2558  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2559  }
2560 
2561  for (auto& kv : window_function_context_map) {
2562  kv.second->compute(
2563  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2564  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2565  }
2566 }
2567 
2568 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2569  const Analyzer::WindowFunction* window_func,
2570  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2571  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2572  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2573  const WorkUnit& work_unit,
2574  const std::vector<InputTableInfo>& query_infos,
2575  const CompilationOptions& co,
2576  ColumnCacheMap& column_cache_map,
2577  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2578  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2579  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2582  std::unique_ptr<WindowFunctionContext> context;
2583  auto partition_cache_key = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
2584  work_unit.body->getQueryPlanDagHash(), work_unit.exe_unit.sort_info);
2585  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2588  if (partition_key_cond) {
2589  auto partition_cond_str = partition_key_cond->toString();
2590  auto partition_key_hash = boost::hash_value(partition_cond_str);
2591  boost::hash_combine(partition_cache_key, partition_key_hash);
2592  boost::hash_combine(partition_cache_key, static_cast<int>(window_partition_type));
2593  std::shared_ptr<HashJoin> partition_ptr;
2594  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2595  if (cached_hash_table_it != partition_cache.end()) {
2596  partition_ptr = cached_hash_table_it->second;
2597  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2598  << partition_cache_key << ", partition condition: " << partition_cond_str
2599  << ")";
2600  } else {
2601  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2602  partition_key_cond,
2603  query_infos,
2604  memory_level,
2605  window_partition_type,
2607  column_cache_map,
2609  work_unit.exe_unit.query_hint,
2610  work_unit.exe_unit.table_id_to_node_map);
2611  if (!hash_table_or_err.fail_reason.empty()) {
2612  throw std::runtime_error(hash_table_or_err.fail_reason);
2613  }
2614  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2615  partition_ptr = hash_table_or_err.hash_table;
2616  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2617  .second);
2618  VLOG(1) << "Put a generated hash table for computing window function context to "
2619  "cache (key: "
2620  << partition_cache_key << ", partition condition: " << partition_cond_str
2621  << ")";
2622  }
2623  CHECK(partition_ptr);
2624  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2625  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2626  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2627  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2628  }
2629  context = std::make_unique<WindowFunctionContext>(window_func,
2630  partition_cache_key,
2631  partition_ptr,
2632  elem_count,
2633  co.device_type,
2634  row_set_mem_owner,
2635  aggregate_tree_fanout);
2636  } else {
2637  context = std::make_unique<WindowFunctionContext>(
2638  window_func, elem_count, co.device_type, row_set_mem_owner);
2639  }
2640  const auto& order_keys = window_func->getOrderKeys();
2641  if (!order_keys.empty()) {
2642  auto sorted_partition_cache_key = partition_cache_key;
2643  for (auto& order_key : order_keys) {
2644  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2645  }
2646  for (auto& collation : window_func->getCollation()) {
2647  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2648  }
2649  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2650  auto cache_key_cnt_it =
2651  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2652  if (!cache_key_cnt_it.second) {
2653  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2654  cache_key_cnt_it.first->second + 1;
2655  }
2656 
2657  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2658  for (const auto& order_key : order_keys) {
2659  const auto order_col =
2660  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2661  if (!order_col) {
2662  throw std::runtime_error("Only order by columns supported for now");
2663  }
2664  auto const [column, col_elem_count] =
2666  *order_col,
2667  query_infos.front().info.fragments.front(),
2668  memory_level,
2669  0,
2670  nullptr,
2671  /*thread_idx=*/0,
2672  chunks_owner,
2673  column_cache_map);
2674 
2675  CHECK_EQ(col_elem_count, elem_count);
2676  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2677  }
2678  }
2679  if (context->getWindowFunction()->hasFraming() ||
2680  context->getWindowFunction()->isMissingValueFillingFunction()) {
2681  // todo (yoonmin) : if we try to support generic window function expression without
2682  // extra project node, we need to revisit here b/c the current logic assumes that
2683  // window function expression has a single input source
2684  auto& window_function_expression_args = window_func->getArgs();
2685  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2686  for (auto& expr : window_function_expression_args) {
2687  if (const auto arg_col_var =
2688  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2689  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2690  executor_,
2691  *arg_col_var,
2692  query_infos.front().info.fragments.front(),
2693  memory_level,
2694  0,
2695  nullptr,
2696  /*thread_idx=*/0,
2697  chunks_owner,
2698  column_cache_map);
2699  CHECK_EQ(col_elem_count, elem_count);
2700  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2701  }
2702  }
2703  }
2704  return context;
2705 }
2706 
2708  const CompilationOptions& co,
2709  const ExecutionOptions& eo,
2710  RenderInfo* render_info,
2711  const int64_t queue_time_ms) {
2712  auto timer = DEBUG_TIMER(__func__);
2713  const auto work_unit = createFilterWorkUnit(filter, SortInfo(), eo.just_explain);
2714  return executeWorkUnit(
2715  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2716 }
2717 
2718 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2719  std::vector<TargetMetaInfo> const& rhs) {
2720  if (lhs.size() == rhs.size()) {
2721  for (size_t i = 0; i < lhs.size(); ++i) {
2722  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2723  return false;
2724  }
2725  }
2726  return true;
2727  }
2728  return false;
2729 }
2730 
2731 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2732  return target_meta_info.get_type_info().is_geometry();
2733 }
2734 
2736  const RaExecutionSequence& seq,
2737  const CompilationOptions& co,
2738  const ExecutionOptions& eo,
2739  RenderInfo* render_info,
2740  const int64_t queue_time_ms) {
2741  auto timer = DEBUG_TIMER(__func__);
2742  if (!logical_union->isAll()) {
2743  throw std::runtime_error("UNION without ALL is not supported yet.");
2744  }
2745  // Will throw a std::runtime_error if types don't match.
2746  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2747  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2748  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2749  }
2750  auto work_unit = createUnionWorkUnit(logical_union, SortInfo(), eo);
2751  return executeWorkUnit(work_unit,
2752  logical_union->getOutputMetainfo(),
2753  false,
2755  eo,
2756  render_info,
2757  queue_time_ms);
2758 }
2759 
2761  const RelLogicalValues* logical_values,
2762  const ExecutionOptions& eo) {
2763  auto timer = DEBUG_TIMER(__func__);
2766 
2767  auto tuple_type = logical_values->getTupleType();
2768  for (size_t i = 0; i < tuple_type.size(); ++i) {
2769  auto& target_meta_info = tuple_type[i];
2770  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2771  // replace w/ bigint
2772  tuple_type[i] =
2773  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2774  }
2775  query_mem_desc.addColSlotInfo(
2776  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2777  }
2778  logical_values->setOutputMetainfo(tuple_type);
2779 
2780  std::vector<TargetInfo> target_infos;
2781  for (const auto& tuple_type_component : tuple_type) {
2782  target_infos.emplace_back(TargetInfo{false,
2783  kCOUNT,
2784  tuple_type_component.get_type_info(),
2785  SQLTypeInfo(kNULLT, false),
2786  false,
2787  false,
2788  /*is_varlen_projection=*/false});
2789  }
2790  std::shared_ptr<ResultSet> rs{
2791  ResultSetLogicalValuesBuilder{logical_values,
2792  target_infos,
2795  executor_->getRowSetMemoryOwner(),
2796  executor_}
2797  .build()};
2798 
2799  return {rs, tuple_type};
2800 }
2801 
2802 namespace {
2803 
2804 template <class T>
2805 int64_t insert_one_dict_str(T* col_data,
2806  const std::string& columnName,
2807  const SQLTypeInfo& columnType,
2808  const Analyzer::Constant* col_cv,
2809  const Catalog_Namespace::Catalog& catalog) {
2810  if (col_cv->get_is_null()) {
2811  *col_data = inline_fixed_encoding_null_val(columnType);
2812  } else {
2813  const int dict_id = columnType.get_comp_param();
2814  const auto col_datum = col_cv->get_constval();
2815  const auto& str = *col_datum.stringval;
2816  const auto dd = catalog.getMetadataForDict(dict_id);
2817  CHECK(dd && dd->stringDict);
2818  int32_t str_id = dd->stringDict->getOrAdd(str);
2819  if (!dd->dictIsTemp) {
2820  const auto checkpoint_ok = dd->stringDict->checkpoint();
2821  if (!checkpoint_ok) {
2822  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2823  columnName);
2824  }
2825  }
2826  const bool invalid = str_id > max_valid_int_value<T>();
2827  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2828  if (invalid) {
2829  LOG(ERROR) << "Could not encode string: " << str
2830  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2831  << " bits. Will store NULL instead.";
2832  }
2833  str_id = inline_fixed_encoding_null_val(columnType);
2834  }
2835  *col_data = str_id;
2836  }
2837  return *col_data;
2838 }
2839 
2840 template <class T>
2841 int64_t insert_one_dict_str(T* col_data,
2842  const ColumnDescriptor* cd,
2843  const Analyzer::Constant* col_cv,
2844  const Catalog_Namespace::Catalog& catalog) {
2845  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2846 }
2847 
2848 } // namespace
2849 
2851  const ExecutionOptions& eo) {
2852  auto timer = DEBUG_TIMER(__func__);
2853  if (eo.just_explain) {
2854  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2855  }
2856 
2857  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2860  executor_->getRowSetMemoryOwner(),
2861  executor_->blockSize(),
2862  executor_->gridSize());
2863 
2864  std::vector<TargetMetaInfo> empty_targets;
2865  return {rs, empty_targets};
2866 }
2867 
2869  const Analyzer::Query& query,
2871  const Catalog_Namespace::SessionInfo& session) {
2872  // Note: We currently obtain an executor for this method, but we do not need it.
2873  // Therefore, we skip the executor state setup in the regular execution path. In the
2874  // future, we will likely want to use the executor to evaluate expressions in the insert
2875  // statement.
2876 
2877  const auto& values_lists = query.get_values_lists();
2878  const int table_id = query.get_result_table_id();
2879  const auto& col_id_list = query.get_result_col_list();
2880  size_t rows_number = values_lists.size();
2881  size_t leaf_count = inserter.getLeafCount();
2882  const auto& catalog = session.getCatalog();
2883  const auto td = catalog.getMetadataForTable(table_id);
2884  CHECK(td);
2885  size_t rows_per_leaf = rows_number;
2886  if (td->nShards == 0) {
2887  rows_per_leaf =
2888  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2889  }
2890  auto max_number_of_rows_per_package =
2891  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2892 
2893  std::vector<const ColumnDescriptor*> col_descriptors;
2894  std::vector<int> col_ids;
2895  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2896  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2897  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2898  std::unordered_map<int, int> sequential_ids;
2899 
2900  for (const int col_id : col_id_list) {
2901  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2902  const auto col_enc = cd->columnType.get_compression();
2903  if (cd->columnType.is_string()) {
2904  switch (col_enc) {
2905  case kENCODING_NONE: {
2906  auto it_ok =
2907  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2908  CHECK(it_ok.second);
2909  break;
2910  }
2911  case kENCODING_DICT: {
2912  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2913  CHECK(dd);
2914  const auto it_ok = col_buffers.emplace(
2915  col_id,
2916  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2917  max_number_of_rows_per_package));
2918  CHECK(it_ok.second);
2919  break;
2920  }
2921  default:
2922  CHECK(false);
2923  }
2924  } else if (cd->columnType.is_geometry()) {
2925  auto it_ok =
2926  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2927  CHECK(it_ok.second);
2928  } else if (cd->columnType.is_array()) {
2929  auto it_ok =
2930  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2931  CHECK(it_ok.second);
2932  } else {
2933  const auto it_ok = col_buffers.emplace(
2934  col_id,
2935  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2936  max_number_of_rows_per_package]()));
2937  CHECK(it_ok.second);
2938  }
2939  col_descriptors.push_back(cd);
2940  sequential_ids[col_id] = col_ids.size();
2941  col_ids.push_back(col_id);
2942  }
2943 
2944  Executor::clearExternalCaches(true, td, catalog.getDatabaseId());
2945 
2946  size_t start_row = 0;
2947  size_t rows_left = rows_number;
2948  while (rows_left != 0) {
2949  // clear the buffers
2950  for (const auto& kv : col_buffers) {
2951  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2952  }
2953  for (auto& kv : str_col_buffers) {
2954  kv.second.clear();
2955  }
2956  for (auto& kv : arr_col_buffers) {
2957  kv.second.clear();
2958  }
2959 
2960  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2961  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2962  // more efficient to do the loops below column by column instead of row by row.
2963  // But for now I consider such a refactoring not worth investigating, as we have more
2964  // efficient ways to insert many rows anyway.
2965  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2966  const auto& values_list = values_lists[row_idx + start_row];
2967  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2968  CHECK(values_list.size() == col_descriptors.size());
2969  auto col_cv =
2970  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2971  if (!col_cv) {
2972  auto col_cast =
2973  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2974  CHECK(col_cast);
2975  CHECK_EQ(kCAST, col_cast->get_optype());
2976  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2977  }
2978  CHECK(col_cv);
2979  const auto cd = col_descriptors[col_idx];
2980  auto col_datum = col_cv->get_constval();
2981  auto col_type = cd->columnType.get_type();
2982  uint8_t* col_data_bytes{nullptr};
2983  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2984  (!cd->columnType.is_string() ||
2985  cd->columnType.get_compression() == kENCODING_DICT)) {
2986  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2987  CHECK(col_data_bytes_it != col_buffers.end());
2988  col_data_bytes = col_data_bytes_it->second.get();
2989  }
2990  switch (col_type) {
2991  case kBOOLEAN: {
2992  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2993  auto null_bool_val =
2994  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2995  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2996  ? inline_fixed_encoding_null_val(cd->columnType)
2997  : (col_datum.boolval ? 1 : 0);
2998  break;
2999  }
3000  case kTINYINT: {
3001  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3002  col_data[row_idx] = col_cv->get_is_null()
3003  ? inline_fixed_encoding_null_val(cd->columnType)
3004  : col_datum.tinyintval;
3005  break;
3006  }
3007  case kSMALLINT: {
3008  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3009  col_data[row_idx] = col_cv->get_is_null()
3010  ? inline_fixed_encoding_null_val(cd->columnType)
3011  : col_datum.smallintval;
3012  break;
3013  }
3014  case kINT: {
3015  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3016  col_data[row_idx] = col_cv->get_is_null()
3017  ? inline_fixed_encoding_null_val(cd->columnType)
3018  : col_datum.intval;
3019  break;
3020  }
3021  case kBIGINT:
3022  case kDECIMAL:
3023  case kNUMERIC: {
3024  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3025  col_data[row_idx] = col_cv->get_is_null()
3026  ? inline_fixed_encoding_null_val(cd->columnType)
3027  : col_datum.bigintval;
3028  break;
3029  }
3030  case kFLOAT: {
3031  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3032  col_data[row_idx] = col_datum.floatval;
3033  break;
3034  }
3035  case kDOUBLE: {
3036  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3037  col_data[row_idx] = col_datum.doubleval;
3038  break;
3039  }
3040  case kTEXT:
3041  case kVARCHAR:
3042  case kCHAR: {
3043  switch (cd->columnType.get_compression()) {
3044  case kENCODING_NONE:
3045  str_col_buffers[col_ids[col_idx]].push_back(
3046  col_datum.stringval ? *col_datum.stringval : "");
3047  break;
3048  case kENCODING_DICT: {
3049  switch (cd->columnType.get_size()) {
3050  case 1:
3052  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3053  cd,
3054  col_cv,
3055  catalog);
3056  break;
3057  case 2:
3059  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3060  cd,
3061  col_cv,
3062  catalog);
3063  break;
3064  case 4:
3066  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3067  cd,
3068  col_cv,
3069  catalog);
3070  break;
3071  default:
3072  CHECK(false);
3073  }
3074  break;
3075  }
3076  default:
3077  CHECK(false);
3078  }
3079  break;
3080  }
3081  case kTIME:
3082  case kTIMESTAMP:
3083  case kDATE: {
3084  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3085  col_data[row_idx] = col_cv->get_is_null()
3086  ? inline_fixed_encoding_null_val(cd->columnType)
3087  : col_datum.bigintval;
3088  break;
3089  }
3090  case kARRAY: {
3091  const auto is_null = col_cv->get_is_null();
3092  const auto size = cd->columnType.get_size();
3093  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3094  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3095  const auto is_point_coords =
3096  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3097  if (is_null && !is_point_coords) {
3098  if (size > 0) {
3099  int8_t* buf = (int8_t*)checked_malloc(size);
3100  put_null_array(static_cast<void*>(buf), elem_ti, "");
3101  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3102  p += elem_ti.get_size()) {
3103  put_null(static_cast<void*>(p), elem_ti, "");
3104  }
3105  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3106  } else {
3107  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3108  }
3109  break;
3110  }
3111  const auto l = col_cv->get_value_list();
3112  size_t len = l.size() * elem_ti.get_size();
3113  if (size > 0 && static_cast<size_t>(size) != len) {
3114  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3115  std::to_string(size / elem_ti.get_size()) +
3116  " values, " + "received " +
3117  std::to_string(l.size()));
3118  }
3119  if (elem_ti.is_string()) {
3120  CHECK(kENCODING_DICT == elem_ti.get_compression());
3121  CHECK(4 == elem_ti.get_size());
3122 
3123  int8_t* buf = (int8_t*)checked_malloc(len);
3124  int32_t* p = reinterpret_cast<int32_t*>(buf);
3125 
3126  int elemIndex = 0;
3127  for (auto& e : l) {
3128  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3129  CHECK(c);
3131  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3132  elemIndex++;
3133  }
3134  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3135 
3136  } else {
3137  int8_t* buf = (int8_t*)checked_malloc(len);
3138  int8_t* p = buf;
3139  for (auto& e : l) {
3140  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3141  CHECK(c);
3142  p = append_datum(p, c->get_constval(), elem_ti);
3143  CHECK(p);
3144  }
3145  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3146  }
3147  break;
3148  }
3149  case kPOINT:
3150  case kMULTIPOINT:
3151  case kLINESTRING:
3152  case kMULTILINESTRING:
3153  case kPOLYGON:
3154  case kMULTIPOLYGON:
3155  if (col_datum.stringval && col_datum.stringval->empty()) {
3156  throw std::runtime_error(
3157  "Empty values are not allowed for geospatial column \"" +
3158  cd->columnName + "\"");
3159  } else {
3160  str_col_buffers[col_ids[col_idx]].push_back(
3161  col_datum.stringval ? *col_datum.stringval : "");
3162  }
3163  break;
3164  default:
3165  CHECK(false);
3166  }
3167  }
3168  }
3169  start_row += package_size;
3170  rows_left -= package_size;
3171 
3173  insert_data.databaseId = catalog.getCurrentDB().dbId;
3174  insert_data.tableId = table_id;
3175  insert_data.data.resize(col_ids.size());
3176  insert_data.columnIds = col_ids;
3177  for (const auto& kv : col_buffers) {
3178  DataBlockPtr p;
3179  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3180  insert_data.data[sequential_ids[kv.first]] = p;
3181  }
3182  for (auto& kv : str_col_buffers) {
3183  DataBlockPtr p;
3184  p.stringsPtr = &kv.second;
3185  insert_data.data[sequential_ids[kv.first]] = p;
3186  }
3187  for (auto& kv : arr_col_buffers) {
3188  DataBlockPtr p;
3189  p.arraysPtr = &kv.second;
3190  insert_data.data[sequential_ids[kv.first]] = p;
3191  }
3192  insert_data.numRows = package_size;
3193  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3194  inserter.insertData(session, insert_data);
3195  }
3196 
3197  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3200  executor_->getRowSetMemoryOwner(),
3201  0,
3202  0);
3203  std::vector<TargetMetaInfo> empty_targets;
3204  return {rs, empty_targets};
3205 }
3206 
3207 namespace {
3208 
3209 size_t get_limit_value(std::optional<size_t> limit) {
3210  return limit ? *limit : 0;
3211 }
3212 
3213 size_t get_scan_limit(const RelAlgNode* ra, std::optional<size_t> limit) {
3214  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3215  if (aggregate) {
3216  return 0;
3217  }
3218  const auto compound = dynamic_cast<const RelCompound*>(ra);
3219  return (compound && compound->isAggregate()) ? 0 : get_limit_value(limit);
3220 }
3221 
3222 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3223  return !order_entries.empty() && order_entries.front().is_desc;
3224 }
3225 
3226 bool is_none_encoded_text(TargetMetaInfo const& target_meta_info) {
3227  return target_meta_info.get_type_info().is_none_encoded_string();
3228 }
3229 
3230 } // namespace
3231 
3233  const CompilationOptions& co,
3234  const ExecutionOptions& eo,
3235  RenderInfo* render_info,
3236  const int64_t queue_time_ms) {
3237  auto timer = DEBUG_TIMER(__func__);
3239  const auto source = sort->getInput(0);
3240  const bool is_aggregate = node_is_aggregate(source);
3241  auto it = leaf_results_.find(sort->getId());
3242  auto order_entries = sort->getOrderEntries();
3243  if (it != leaf_results_.end()) {
3244  // Add any transient string literals to the sdp on the agg
3245  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3246  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3247  executor_->row_set_mem_owner_);
3248  // Handle push-down for LIMIT for multi-node
3249  auto& aggregated_result = it->second;
3250  auto& result_rows = aggregated_result.rs;
3251  auto limit = sort->getLimit();
3252  const size_t offset = sort->getOffset();
3253  if (limit || offset) {
3254  if (!order_entries.empty()) {
3255  result_rows->sort(
3256  order_entries, get_limit_value(limit) + offset, co.device_type, executor_);
3257  }
3258  result_rows->dropFirstN(offset);
3259  if (limit) {
3260  result_rows->keepFirstN(get_limit_value(limit));
3261  }
3262  }
3263 
3264  if (render_info) {
3265  // We've hit a sort step that is the very last step
3266  // in a distributed render query. We'll fill in the render targets
3267  // since we have all that data needed to do so. This is normally
3268  // done in executeWorkUnit, but that is bypassed in this case.
3269  build_render_targets(*render_info,
3270  source_work_unit.exe_unit.target_exprs,
3271  aggregated_result.targets_meta);
3272  }
3273 
3274  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3275  sort->setOutputMetainfo(aggregated_result.targets_meta);
3276 
3277  return result;
3278  }
3279 
3280  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3281  bool is_desc{false};
3282  bool use_speculative_top_n_sort{false};
3283 
3284  auto execute_sort_query = [this,
3285  sort,
3286  &source,
3287  &is_aggregate,
3288  &eo,
3289  &co,
3290  render_info,
3291  queue_time_ms,
3292  &groupby_exprs,
3293  &is_desc,
3294  &order_entries,
3295  &use_speculative_top_n_sort]() -> ExecutionResult {
3296  std::optional<size_t> limit = sort->getLimit();
3297  const size_t offset = sort->getOffset();
3298  // check whether sort's input is cached
3299  auto source_node = sort->getInput(0);
3300  CHECK(source_node);
3301  ExecutionResult source_result{nullptr, {}};
3303  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3304  auto source_query_plan_dag = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
3305  source_node->getQueryPlanDagHash(), sort_info);
3306  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(source_node) &&
3307  !sort->isEmptyResult()) {
3308  if (auto cached_resultset =
3309  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
3310  source_query_plan_dag)) {
3311  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3312  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3313  << " from resultset cache";
3314  source_result =
3315  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3316  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3317  addTemporaryTable(-source_node->getId(), cached_resultset);
3318  }
3319  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3320  co.device_type == ExecutorDeviceType::GPU;
3321  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3322  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3323  }
3324  }
3325  if (!source_result.getDataPtr()) {
3326  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3327  is_desc = first_oe_is_desc(order_entries);
3328  ExecutionOptions eo_copy = eo;
3329  CompilationOptions co_copy = co;
3330  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
3331  if (hasStepForUnion() &&
3332  boost::algorithm::any_of(source->getOutputMetainfo(), is_none_encoded_text)) {
3334  VLOG(1) << "Punt sort's input query to CPU: detect union(-all) of none-encoded "
3335  "text column";
3336  }
3337 
3338  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3339  source_result = executeWorkUnit(source_work_unit,
3340  source->getOutputMetainfo(),
3341  is_aggregate,
3342  co_copy,
3343  eo_copy,
3344  render_info,
3345  queue_time_ms);
3346  use_speculative_top_n_sort =
3347  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3348  use_speculative_top_n(source_work_unit.exe_unit,
3349  source_result.getRows()->getQueryMemDesc());
3350  }
3351  if (render_info && render_info->isInSitu()) {
3352  return source_result;
3353  }
3354  if (source_result.isFilterPushDownEnabled()) {
3355  return source_result;
3356  }
3357  auto rows_to_sort = source_result.getRows();
3358  if (eo.just_explain) {
3359  return {rows_to_sort, {}};
3360  }
3361  auto const limit_val = get_limit_value(limit);
3362  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3363  !use_speculative_top_n_sort) {
3364  const size_t top_n = limit_val + offset;
3365  rows_to_sort->sort(order_entries, top_n, co.device_type, executor_);
3366  }
3367  if (limit || offset) {
3368  if (g_cluster && sort->collationCount() == 0) {
3369  if (offset >= rows_to_sort->rowCount()) {
3370  rows_to_sort->dropFirstN(offset);
3371  } else {
3372  rows_to_sort->keepFirstN(limit_val + offset);
3373  }
3374  } else {
3375  rows_to_sort->dropFirstN(offset);
3376  if (limit) {
3377  rows_to_sort->keepFirstN(limit_val);
3378  }
3379  }
3380  }
3381  return {rows_to_sort, source_result.getTargetsMeta()};
3382  };
3383 
3384  try {
3385  return execute_sort_query();
3386  } catch (const SpeculativeTopNFailed& e) {
3387  CHECK_EQ(size_t(1), groupby_exprs.size());
3388  CHECK(groupby_exprs.front());
3389  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3390  return execute_sort_query();
3391  }
3392 }
3393 
3395  const RelSort* sort,
3396  std::list<Analyzer::OrderEntry>& order_entries,
3397  const ExecutionOptions& eo) {
3398  const auto source = sort->getInput(0);
3399  auto limit = sort->getLimit();
3400  const size_t offset = sort->getOffset();
3401  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3402  const size_t scan_total_limit =
3403  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3404  size_t max_groups_buffer_entry_guess{
3405  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3407  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3408  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3409  const auto& source_exe_unit = source_work_unit.exe_unit;
3410 
3411  // we do not allow sorting geometry or array types
3412  for (auto order_entry : order_entries) {
3413  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3414  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3415  const auto& ti = get_target_info(te, false);
3416  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3417  throw std::runtime_error(
3418  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3419  }
3420  }
3421 
3422  if (source_exe_unit.groupby_exprs.size() == 1) {
3423  if (!source_exe_unit.groupby_exprs.front()) {
3424  sort_algorithm = SortAlgorithm::StreamingTopN;
3425  } else {
3426  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3427  first_oe_is_desc(order_entries))) {
3428  sort_algorithm = SortAlgorithm::Default;
3429  }
3430  }
3431  }
3432 
3433  sort->setOutputMetainfo(source->getOutputMetainfo());
3434  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3435  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3436  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3437  std::move(source_exe_unit.input_col_descs),
3438  source_exe_unit.simple_quals,
3439  source_exe_unit.quals,
3440  source_exe_unit.join_quals,
3441  source_exe_unit.groupby_exprs,
3442  source_exe_unit.target_exprs,
3443  source_exe_unit.target_exprs_original_type_infos,
3444  nullptr,
3445  {sort_info.order_entries, sort_algorithm, limit, offset},
3446  scan_total_limit,
3447  source_exe_unit.query_hint,
3448  source_exe_unit.query_plan_dag_hash,
3449  source_exe_unit.hash_table_build_plan_dag,
3450  source_exe_unit.table_id_to_node_map,
3451  source_exe_unit.use_bump_allocator,
3452  source_exe_unit.union_all,
3453  source_exe_unit.query_state},
3454  source,
3455  max_groups_buffer_entry_guess,
3456  std::move(source_work_unit.query_rewriter),
3457  source_work_unit.input_permutation,
3458  source_work_unit.left_deep_join_input_sizes};
3459 }
3460 
3461 namespace {
3462 
3469 std::pair<size_t, shared::TableKey> groups_approx_upper_bound(
3470  const std::vector<InputTableInfo>& table_infos) {
3471  CHECK(!table_infos.empty());
3472  const auto& first_table = table_infos.front();
3473  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3474  auto table_key = first_table.table_key;
3475  for (const auto& table_info : table_infos) {
3476  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3477  max_num_groups = table_info.info.getNumTuplesUpperBound();
3478  table_key = table_info.table_key;
3479  }
3480  }
3481  return std::make_pair(std::max(max_num_groups, size_t(1)), table_key);
3482 }
3483 
3484 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3485  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3486 }
3487 
3489  const RenderInfo* render_info,
3490  const RelAlgNode* body) {
3491  if (!is_projection(ra_exe_unit)) {
3492  return false;
3493  }
3494  if (render_info && render_info->isInSitu()) {
3495  return false;
3496  }
3497  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3498  // disable output columnar when we have top-sort node query
3499  return false;
3500  }
3501  bool flatbuffer_is_used = false;
3502  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3503  const auto ti = target_expr->get_type_info();
3504  // Usage of FlatBuffer memory layout implies columnar-only support.
3505  if (ti.usesFlatBuffer()) {
3506  flatbuffer_is_used = true;
3507  continue;
3508  }
3509  // We don't currently support varlen columnar projections, so
3510  // return false if we find one
3511  // TODO: notice that currently ti.supportsFlatBuffer() == ti.is_varlen()
3512  if (ti.is_varlen()) {
3513  return false;
3514  }
3515  }
3516  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3517  if (top_project->isRowwiseOutputForced()) {
3518  if (flatbuffer_is_used) {
3519  throw std::runtime_error(
3520  "Cannot force rowwise output when FlatBuffer layout is used.");
3521  }
3522  return false;
3523  }
3524  }
3525  return true;
3526 }
3527 
3529  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3530  if (target_expr->get_type_info().usesFlatBuffer()) {
3531  // using FlatBuffer memory layout implies columnar-only output
3532  return true;
3533  }
3534  }
3537 }
3538 
3546  for (const auto target_expr : ra_exe_unit.target_exprs) {
3547  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3548  return false;
3549  }
3550  }
3551  size_t preflight_count_query_threshold = g_preflight_count_query_threshold;
3553  preflight_count_query_threshold =
3555  VLOG(1) << "Set the pre-flight count query's threshold as "
3556  << preflight_count_query_threshold << " by a query hint";
3557  }
3558  if (is_projection(ra_exe_unit) &&
3559  (!ra_exe_unit.scan_limit ||
3560  ra_exe_unit.scan_limit > preflight_count_query_threshold)) {
3561  return true;
3562  }
3563  return false;
3564 }
3565 
3566 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3567  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3568  ra_exe_unit.simple_quals.empty());
3569 }
3570 
3572  const RelAlgExecutionUnit& ra_exe_unit_in,
3573  const std::vector<InputTableInfo>& table_infos,
3574  const Executor* executor,
3575  const ExecutorDeviceType device_type_in,
3576  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3577  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3578  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3579  const auto target_expr = ra_exe_unit.target_exprs[i];
3580  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3581  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3582  continue;
3583  }
3584  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3585  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3586  CHECK(arg);
3587  const auto& arg_ti = arg->get_type_info();
3588  // Avoid calling getExpressionRange for variable length types (string and array),
3589  // it'd trigger an assertion since that API expects to be called only for types
3590  // for which the notion of range is well-defined. A bit of a kludge, but the
3591  // logic to reject these types anyway is at lower levels in the stack and not
3592  // really worth pulling into a separate function for now.
3593  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3594  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3595  continue;
3596  }
3597  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3598  if (arg_range.getType() != ExpressionRangeType::Integer) {
3599  continue;
3600  }
3601  // When running distributed, the threshold for using the precise implementation
3602  // must be consistent across all leaves, otherwise we could have a mix of precise
3603  // and approximate bitmaps and we cannot aggregate them.
3604  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3605  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3606  const auto sub_bitmap_count =
3607  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3608  int64_t approx_bitmap_sz_bits{0};
3609  const auto error_rate_expr = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3610  if (error_rate_expr) {
3611  CHECK(error_rate_expr->get_type_info().get_type() == kINT);
3612  auto const error_rate =
3613  dynamic_cast<Analyzer::Constant const*>(error_rate_expr.get());
3614  CHECK(error_rate);
3615  CHECK_GE(error_rate->get_constval().intval, 1);
3616  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3617  } else {
3618  approx_bitmap_sz_bits = g_hll_precision_bits;
3619  }
3620  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3621  arg_range.getIntMin(),
3622  0,
3623  approx_bitmap_sz_bits,
3624  true,
3625  device_type,
3626  sub_bitmap_count};
3627  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3628  arg_range.getIntMin(),
3629  0,
3630  bitmap_sz_bits,
3631  false,
3632  device_type,
3633  sub_bitmap_count};
3634  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3635  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3636  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3637  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3638  target_exprs_owned.push_back(precise_count_distinct);
3639  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3640  }
3641  }
3642  return ra_exe_unit;
3643 }
3644 
3645 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3646  const CompilationOptions& co,
3647  const ExecutionOptions& eo) {
3649  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3650 }
3651 
3652 } // namespace
3653 
3655  const RelAlgExecutor::WorkUnit& work_unit,
3656  const std::vector<TargetMetaInfo>& targets_meta,
3657  const bool is_agg,
3658  const CompilationOptions& co_in,
3659  const ExecutionOptions& eo_in,
3660  RenderInfo* render_info,
3661  const int64_t queue_time_ms,
3662  const std::optional<size_t> previous_count) {
3664  auto timer = DEBUG_TIMER(__func__);
3665  auto query_exec_time_begin = timer_start();
3666 
3667  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3668  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3669 
3670  auto co = co_in;
3671  auto eo = eo_in;
3672  ColumnCacheMap column_cache;
3673  ScopeGuard clearWindowContextIfNecessary = [&]() {
3674  if (is_window_execution_unit(work_unit.exe_unit)) {
3676  }
3677  };
3678  if (is_window_execution_unit(work_unit.exe_unit)) {
3680  throw std::runtime_error("Window functions support is disabled");
3681  }
3682  co.device_type = ExecutorDeviceType::CPU;
3683  co.allow_lazy_fetch = false;
3684  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3685  }
3686  if (!eo.just_explain && eo.find_push_down_candidates) {
3687  // find potential candidates:
3688  VLOG(1) << "Try to find filter predicate push-down candidate.";
3689  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3690  if (!selected_filters.empty() || eo.just_calcite_explain) {
3691  VLOG(1) << "Found " << selected_filters.size()
3692  << " filter(s) to be pushed down. Re-create a query plan based on pushed "
3693  "filter predicate(s).";
3694  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3695  }
3696  VLOG(1) << "Continue with the current query plan";
3697  }
3698  if (render_info && render_info->isInSitu()) {
3699  co.allow_lazy_fetch = false;
3700  }
3701  const auto body = work_unit.body;
3702  CHECK(body);
3703  auto it = leaf_results_.find(body->getId());
3704  VLOG(3) << "body->getId()=" << body->getId()
3705  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3706  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3707  if (it != leaf_results_.end()) {
3708  executor_->addTransientStringLiterals(work_unit.exe_unit,
3709  executor_->row_set_mem_owner_);
3710  auto& aggregated_result = it->second;
3711  auto& result_rows = aggregated_result.rs;
3712  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3713  body->setOutputMetainfo(aggregated_result.targets_meta);
3714  if (render_info) {
3715  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3716  }
3717  return result;
3718  }
3719  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3720 
3722  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3723 
3724  // register query hint if query_dag_ is valid
3725  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3726  if (query_dag_) {
3727  auto candidate = query_dag_->getQueryHint(body);
3728  if (candidate) {
3729  ra_exe_unit.query_hint = *candidate;
3730  }
3731  }
3732 
3733  const auto& query_hints = ra_exe_unit.query_hint;
3734  ScopeGuard reset_cuda_block_grid_sizes = [&,
3735  orig_block_size = executor_->blockSize(),
3736  orig_grid_size = executor_->gridSize()]() {
3737  if (executor_->getDataMgr()->getCudaMgr()) {
3738  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3739  if (orig_block_size) {
3740  executor_->setBlockSize(orig_block_size);
3741  } else {
3742  executor_->resetBlockSize();
3743  }
3744  }
3745  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3746  if (orig_grid_size) {
3747  executor_->setGridSize(orig_grid_size);
3748  } else {
3749  executor_->resetGridSize();
3750  }
3751  }
3752  }
3753  };
3754 
3755  if (co.device_type == ExecutorDeviceType::GPU) {
3756  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3757  if (!executor_->getDataMgr()->getCudaMgr()) {
3758  VLOG(1) << "Skip CUDA grid size query hint: cannot detect CUDA device";
3759  } else {
3760  const auto num_sms = executor_->cudaMgr()->getMinNumMPsForAllDevices();
3761  const auto new_grid_size = static_cast<unsigned>(
3762  std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3763  const auto default_grid_size = executor_->gridSize();
3764  if (new_grid_size != default_grid_size) {
3765  VLOG(1) << "Change CUDA grid size: " << default_grid_size
3766  << " (default_grid_size) -> " << new_grid_size << " (# SMs * "
3767  << query_hints.cuda_grid_size_multiplier << ")";
3768  // todo (yoonmin): do we need to check a hard limit?
3769  executor_->setGridSize(new_grid_size);
3770  } else {
3771  VLOG(1) << "Skip CUDA grid size query hint: invalid grid size";
3772  }
3773  }
3774  }
3775  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3776  if (!executor_->getDataMgr()->getCudaMgr()) {
3777  VLOG(1) << "Skip CUDA block size query hint: cannot detect CUDA device";
3778  } else {
3779  int cuda_block_size = query_hints.cuda_block_size;
3780  int warp_size = executor_->warpSize();
3781  if (cuda_block_size >= warp_size) {
3782  cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3783  VLOG(1) << "Change CUDA block size w.r.t warp size (" << warp_size
3784  << "): " << executor_->blockSize() << " -> " << cuda_block_size;
3785  } else {
3786  VLOG(1) << "Change CUDA block size: " << executor_->blockSize() << " -> "
3787  << cuda_block_size;
3788  }
3789  executor_->setBlockSize(cuda_block_size);
3790  }
3791  }
3792  }
3793 
3794  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3795  if (is_window_execution_unit(ra_exe_unit)) {
3796  CHECK_EQ(table_infos.size(), size_t(1));
3797  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3798  max_groups_buffer_entry_guess =
3799  table_infos.front().info.fragments.front().getNumTuples();
3800  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3801  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3802  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3803  ra_exe_unit.scan_limit = *previous_count;
3804  } else {
3805  // TODO(adb): enable bump allocator path for render queries
3806  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3807  ra_exe_unit.scan_limit = 0;
3808  ra_exe_unit.use_bump_allocator = true;
3809  } else if (eo.executor_type == ::ExecutorType::Extern) {
3810  ra_exe_unit.scan_limit = 0;
3811  } else if (!eo.just_explain) {
3812  const auto filter_count_all = getFilteredCountAll(ra_exe_unit, true, co, eo);
3813  if (filter_count_all) {
3814  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3815  VLOG(1) << "Set a new scan limit from filtered_count_all: "
3816  << ra_exe_unit.scan_limit;
3817  auto const has_limit_value = ra_exe_unit.sort_info.limit.has_value();
3818  auto const top_k_sort_query =
3819  has_limit_value && !ra_exe_unit.sort_info.order_entries.empty();
3820  // top-k sort query needs to get a global result before sorting, so we cannot
3821  // apply LIMIT value at this point
3822  if (has_limit_value && !top_k_sort_query &&
3823  ra_exe_unit.scan_limit > ra_exe_unit.sort_info.limit.value()) {
3824  ra_exe_unit.scan_limit = ra_exe_unit.sort_info.limit.value();
3825  VLOG(1) << "Override scan limit to LIMIT value: " << ra_exe_unit.scan_limit;
3826  }
3827  }
3828  }
3829  }
3830  }
3831 
3832  // when output_columnar_hint is true here, it means either 1) columnar output
3833  // configuration is on or 2) a user hint is given but we have to disable it if some
3834  // requirements are not satisfied
3835  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3836  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3837  VLOG(1) << "Using columnar layout for projection as output size of "
3838  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3840  << " or some target uses FlatBuffer memory layout.";
3841  eo.output_columnar_hint = true;
3842  }
3843  } else {
3844  eo.output_columnar_hint = false;
3845  }
3846 
3847  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3848  co.device_type,
3850  nullptr,
3851  executor_->blockSize(),
3852  executor_->gridSize()),
3853  {}};
3854 
3855  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3856  const bool has_cardinality_estimation,
3857  const bool has_ndv_estimation) -> ExecutionResult {
3858  // Note that the groups buffer entry guess may be modified during query execution.
3859  // Create a local copy so we can track those changes if we need to attempt a retry
3860  // due to OOM
3861  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3862  try {
3863  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3864  is_agg,
3865  table_infos,
3866  ra_exe_unit,
3867  co,
3868  eo,
3869  render_info,
3870  has_cardinality_estimation,
3871  column_cache),
3872  targets_meta};
3873  } catch (const QueryExecutionError& e) {
3874  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3875  throw CardinalityEstimationRequired(/*range=*/0);
3876  }
3878  return handleOutOfMemoryRetry(
3879  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3880  column_cache,
3881  targets_meta,
3882  is_agg,
3883  co,
3884  eo,
3885  render_info,
3886  e,
3887  queue_time_ms);
3888  }
3889  };
3890 
3891  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3892  for (const auto& table_info : table_infos) {
3893  const auto db_id = table_info.table_key.db_id;
3894  if (db_id > 0) {
3895  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
3896  if (td && (td->isTemporaryTable() || td->isView)) {
3897  use_resultset_cache = false;
3898  if (eo.keep_result) {
3899  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3900  "temporary table or view";
3901  }
3902  }
3903  }
3904  }
3905 
3906  CardinalityCacheKey cache_key{ra_exe_unit};
3907  try {
3908  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3909  auto card = cached_cardinality.second;
3910  if (cached_cardinality.first && card >= 0) {
3911  VLOG(1) << "Use cached cardinality for max_groups_buffer_entry_guess: " << card;
3912  result = execute_and_handle_errors(
3913  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3914  } else {
3915  VLOG(1) << "Use default cardinality for max_groups_buffer_entry_guess: "
3916  << max_groups_buffer_entry_guess;
3917  result = execute_and_handle_errors(
3918  max_groups_buffer_entry_guess,
3919  groups_approx_upper_bound(table_infos).first <= g_big_group_threshold,
3920  /*has_ndv_estimation=*/false);
3921  }
3922  } catch (const CardinalityEstimationRequired& e) {
3923  // check the cardinality cache
3924  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3925  auto card = cached_cardinality.second;
3926  if (cached_cardinality.first && card >= 0) {
3927  VLOG(1) << "CardinalityEstimationRequired, Use cached cardinality for "
3928  "max_groups_buffer_entry_guess: "
3929  << card;
3930  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3931  } else {
3932  const auto ndv_groups_estimation =
3933  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3934  auto ndv_groups_estimator_multiplier = g_ndv_groups_estimator_multiplier;
3935  if (query_hints.isHintRegistered(QueryHint::kNDVGroupsEstimatorMultiplier)) {
3936  ndv_groups_estimator_multiplier = query_hints.ndv_groups_estimator_multiplier;
3937  VLOG(1) << "Modify NDV groups estimator multiplier: "
3939  << ndv_groups_estimator_multiplier;
3940  }
3941  const auto estimated_groups_buffer_entry_guess =
3942  ndv_groups_estimation > 0
3943  ? static_cast<size_t>(static_cast<double>(ndv_groups_estimation) *
3944  ndv_groups_estimator_multiplier)
3945  : std::min(groups_approx_upper_bound(table_infos).first,
3947  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3948  VLOG(1) << "CardinalityEstimationRequired, Use ndv_estimation: "
3949  << ndv_groups_estimation
3950  << ", cardinality for estimated_groups_buffer_entry_guess: "
3951  << estimated_groups_buffer_entry_guess;
3952  result = execute_and_handle_errors(
3953  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3954  if (!(eo.just_validate || eo.just_explain)) {
3955  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3956  }
3957  }
3958  }
3959 
3960  result.setQueueTime(queue_time_ms);
3961  if (render_info) {
3962  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3963  if (render_info->isInSitu()) {
3964  // return an empty result (with the same queue time, and zero render time)
3965  return {std::make_shared<ResultSet>(
3966  queue_time_ms,
3967  0,
3968  executor_->row_set_mem_owner_
3969  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3970  : nullptr),
3971  {}};
3972  }
3973  }
3974 
3975  for (auto& target_info : result.getTargetsMeta()) {
3976  if (target_info.get_type_info().is_string() &&
3977  !target_info.get_type_info().is_dict_encoded_string()) {
3978  // currently, we do not support resultset caching if non-encoded string is projected
3979  use_resultset_cache = false;
3980  if (eo.keep_result) {
3981  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3982  "string column projection";
3983  }
3984  }
3985  }
3986 
3987  const auto res = result.getDataPtr();
3988  if (use_resultset_cache && has_valid_query_plan_dag(body)) {
3989  auto query_exec_time = timer_stop(query_exec_time_begin);
3990  res->setExecTime(query_exec_time);
3991  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3992  res->setTargetMetaInfo(body->getOutputMetainfo());
3993  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3994  res->setInputTableKeys(std::move(input_table_keys));
3995  auto allow_auto_caching_resultset =
3996  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3997  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3998  if (eo.keep_result || allow_auto_caching_resultset) {
3999  if (allow_auto_caching_resultset) {
4000  VLOG(1) << "Automatically keep query resultset to recycler";
4001  }
4002  res->setUseSpeculativeTopNSort(
4003  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
4004  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
4005  ra_exe_unit.query_plan_dag_hash,
4006  res->getInputTableKeys(),
4007  res,
4008  res->getBufferSizeBytes(co.device_type),
4010  }
4011  } else {
4012  if (eo.keep_result) {
4013  if (g_cluster) {
4014  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
4015  "resultset recycling on distributed mode";
4016  } else if (hasStepForUnion()) {
4017  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
4018  "operator";
4019  } else if (render_info && render_info->isInSitu()) {
4020  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
4021  "a in-situ rendering query";
4022  } else if (is_validate_or_explain_query(eo)) {
4023  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
4024  "validate or explain query";
4025  } else {
4026  VLOG(1) << "Query hint \'keep_result\' is ignored";
4027  }
4028  }
4029  }
4030 
4031  return result;
4032 }
4033 
4035  std::vector<InputTableInfo> const& input_tables_info) const {
4036  return std::any_of(
4037  input_tables_info.begin(), input_tables_info.end(), [](InputTableInfo const& info) {
4038  auto const& table_key = info.table_key;
4039  if (table_key.db_id > 0) {
4040  auto catalog =
4042  CHECK(catalog);
4043  auto td = catalog->getMetadataForTable(table_key.table_id);
4044  CHECK(td);
4045  if (catalog->getDeletedColumnIfRowsDeleted(td)) {
4046  return true;
4047  }
4048  }
4049  return false;
4050  });
4051 }
4052 
4053 namespace {
4054 std::string get_table_name_from_table_key(shared::TableKey const& table_key) {
4055  std::string table_name{""};
4056  if (table_key.db_id > 0) {
4057  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(table_key.db_id);
4058  CHECK(catalog);
4059  auto td = catalog->getMetadataForTable(table_key.table_id);
4060  CHECK(td);
4061  table_name = td->tableName;
4062  }
4063  return table_name;
4064 }
4065 } // namespace
4066 
4068  const RelAlgExecutionUnit& ra_exe_unit,
4069  const bool is_agg,
4070  const CompilationOptions& co,
4071  const ExecutionOptions& eo) {
4072  auto const input_tables_info = get_table_infos(ra_exe_unit, executor_);
4073  if (is_projection(ra_exe_unit) && ra_exe_unit.simple_quals.empty() &&
4074  ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
4075  !hasDeletedRowInQuery(input_tables_info)) {
4076  auto const max_row_info = groups_approx_upper_bound(input_tables_info);
4077  auto const num_rows = max_row_info.first;
4078  auto const table_name = get_table_name_from_table_key(max_row_info.second);
4079  VLOG(1) << "Short-circuiting filtered count query for the projection query "
4080  "containing input table "
4081  << table_name << ": return its table cardinality " << num_rows << " instead";
4082  return std::make_optional<size_t>(num_rows);
4083  }
4084  const auto count =
4085  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
4086  kCOUNT,
4087  nullptr,
4088  false,
4089  nullptr);
4090  const auto count_all_exe_unit = ra_exe_unit.createCountAllExecutionUnit(count.get());
4091  size_t one{1};
4092  ResultSetPtr count_all_result;
4093  try {
4094  VLOG(1) << "Try to execute pre-flight counts query";
4095  ColumnCacheMap column_cache;
4096  ExecutionOptions copied_eo = eo;
4097  copied_eo.estimate_output_cardinality = true;
4098  count_all_result = executor_->executeWorkUnit(one,
4099  is_agg,
4100  input_tables_info,
4101  count_all_exe_unit,
4102  co,
4103  copied_eo,
4104  nullptr,
4105  false,
4106  column_cache);
4107  ra_exe_unit.per_device_cardinality = count_all_exe_unit.per_device_cardinality;
4108  } catch (const foreign_storage::ForeignStorageException& error) {
4109  throw;
4110  } catch (const QueryMustRunOnCpu&) {
4111  // force a retry of the top level query on CPU
4112  throw;
4113  } catch (const std::exception& e) {
4114  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
4115  return std::nullopt;
4116  }
4117  const auto count_row = count_all_result->getNextRow(false, false);
4118  CHECK_EQ(size_t(1), count_row.size());
4119  const auto& count_tv = count_row.front();
4120  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4121  CHECK(count_scalar_tv);
4122  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4123  CHECK(count_ptr);
4124  CHECK_GE(*count_ptr, 0);
4125  auto count_upper_bound = static_cast<size_t>(*count_ptr);
4126  return std::max(count_upper_bound, size_t(1));
4127 }
4128 
4129 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
4130  const auto& ra_exe_unit = work_unit.exe_unit;
4131  if (ra_exe_unit.input_descs.size() != 1) {
4132  return false;
4133  }
4134  const auto& table_desc = ra_exe_unit.input_descs.front();
4135  if (table_desc.getSourceType() != InputSourceType::TABLE) {
4136  return false;
4137  }
4138  const auto& table_key = table_desc.getTableKey();
4139  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
4140  const auto comp_expr =
4141  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4142  if (!comp_expr || comp_expr->get_optype() != kEQ) {
4143  return false;
4144  }
4145  const auto lhs = comp_expr->get_left_operand();
4146  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4147  if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4148  return false;
4149  }
4150  const auto rhs = comp_expr->get_right_operand();
4151  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4152  if (!rhs_const) {
4153  return false;
4154  }
4155  const auto cd = get_column_descriptor(
4156  {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4157  if (cd->isVirtualCol) {
4158  CHECK_EQ("rowid", cd->columnName);
4159  return true;
4160  }
4161  }
4162  return false;
4163 }
4164 
4166  const RelAlgExecutor::WorkUnit& work_unit,
4167  ColumnCacheMap& column_cache,
4168  const std::vector<TargetMetaInfo>& targets_meta,
4169  const bool is_agg,
4170  const CompilationOptions& co,
4171  const ExecutionOptions& eo,
4172  RenderInfo* render_info,
4173  const QueryExecutionError& e,
4174  const int64_t queue_time_ms) {
4175  // Disable the bump allocator
4176  // Note that this will have basically the same affect as using the bump allocator for
4177  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
4178  // path and the bump allocator path for kernel per fragment execution.
4179  auto ra_exe_unit_in = work_unit.exe_unit;
4180  ra_exe_unit_in.use_bump_allocator = false;
4181 
4182  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4183  co.device_type,
4185  nullptr,
4186  executor_->blockSize(),
4187  executor_->gridSize()),
4188  {}};
4189 
4190  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
4191  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
4192  auto eo_no_multifrag = eo;
4193  eo_no_multifrag.setNoExplainExecutionOptions(true);
4194  eo_no_multifrag.allow_multifrag = false;
4195  eo_no_multifrag.find_push_down_candidates = false;
4196  if (e.wasMultifragKernelLaunch()) {
4197  try {
4198  // Attempt to retry using the kernel per fragment path. The smaller input size
4199  // required may allow the entire kernel to execute in GPU memory.
4200  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
4201  "kernels disabled.";
4202  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
4203  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
4204  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4205  is_agg,
4206  table_infos,
4207  ra_exe_unit,
4208  co,
4209  eo_no_multifrag,
4210  nullptr,
4211  true,
4212  column_cache),
4213  targets_meta};
4214  result.setQueueTime(queue_time_ms);
4215  } catch (const QueryExecutionError& new_e) {
4217  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
4218  }
4219  }
4220 
4221  if (render_info) {
4222  render_info->forceNonInSitu();
4223  }
4224 
4225  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
4226  if (e.getErrorCode() < 0) {
4227  // Only reset the group buffer entry guess if we ran out of slots, which
4228  // suggests a highly pathological input which prevented a good estimation of distinct
4229  // tuple count. For projection queries, this will force a per-fragment scan limit,
4230  // which is compatible with the CPU path
4231  VLOG(1) << "Resetting max groups buffer entry guess.";
4232  max_groups_buffer_entry_guess = 0;
4233  }
4234 
4235  int iteration_ctr = -1;
4236  while (true) {
4237  iteration_ctr++;
4239  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
4240  try {
4241  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4242  is_agg,
4243  table_infos,
4244  ra_exe_unit,
4245  co_cpu,
4246  eo_no_multifrag,
4247  nullptr,
4248  true,
4249  column_cache),
4250  targets_meta};
4251  } catch (const QueryExecutionError& new_e) {
4252  // Ran out of slots
4253  if (new_e.getErrorCode() < 0) {
4254  // Even the conservative guess failed; it should only happen when we group
4255  // by a huge cardinality array. Maybe we should throw an exception instead?
4256  // Such a heavy query is entirely capable of exhausting all the host memory.
4257  CHECK(max_groups_buffer_entry_guess);
4258  // Only allow two iterations of increasingly large entry guesses up to a maximum
4259  // of 512MB per column per kernel
4260  if (g_enable_watchdog || iteration_ctr > 1) {
4261  throw std::runtime_error("Query ran out of output slots in the result");
4262  }
4263  max_groups_buffer_entry_guess *= 2;
4264  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4265  "groups buffer entry "
4266  "guess equal to "
4267  << max_groups_buffer_entry_guess;
4268  } else {
4270  }
4271  continue;
4272  }
4273  result.setQueueTime(queue_time_ms);
4274  return result;
4275  }
4276  return result;
4277 }
4278 
4280  LOG(ERROR) << "Query execution failed with error "
4281  << getErrorMessageFromCode(error_code);
4282  if (error_code == int32_t(ErrorCode::OUT_OF_GPU_MEM)) {
4283  // We ran out of GPU memory, this doesn't count as an error if the query is
4284  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4285  // --allow-cpu-retry.
4286  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4287  if (!g_allow_cpu_retry) {
4288  throw std::runtime_error(
4289  "Query ran out of GPU memory, unable to automatically retry on CPU");
4290  }
4291  return;
4292  }
4293  throw std::runtime_error(getErrorMessageFromCode(error_code));
4294 }
4295 
4296 namespace {
4297 struct ErrorInfo {
4298  const char* code{nullptr};
4299  const char* description{nullptr};
4300 };
4302  // 'designated initializers' don't compile on Windows for std 17
4303  // They require /std:c++20. They been removed for the windows port.
4304  if (0 < error_code && error_code < int32_t(ErrorCode::N_)) {
4305  auto const ec = static_cast<ErrorCode>(error_code);
4306  return {to_string(ec), to_description(ec)};
4307  }
4308  return {nullptr, nullptr};
4309 }
4310 } // namespace
4311 
4313  if (error_code < 0) {
4314  return "Ran out of slots in the query output buffer";
4315  }
4316  const auto errorInfo = getErrorDescription(error_code);
4317 
4318  if (errorInfo.code) {
4319  return errorInfo.code + ": "s + errorInfo.description;
4320  } else {
4321  return "Other error: code "s + std::to_string(error_code);
4322  }
4323 }
4324 
4327  VLOG(1) << "Running post execution callback.";
4328  (*post_execution_callback_)();
4329  }
4330 }
4331 
4333  const SortInfo& sort_info,
4334  const ExecutionOptions& eo) {
4335  const auto compound = dynamic_cast<const RelCompound*>(node);
4336  if (compound) {
4337  return createCompoundWorkUnit(compound, sort_info, eo);
4338  }
4339  const auto project = dynamic_cast<const RelProject*>(node);
4340  if (project) {
4341  return createProjectWorkUnit(project, sort_info, eo);
4342  }
4343  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4344  if (aggregate) {
4345  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4346  }
4347  const auto filter = dynamic_cast<const RelFilter*>(node);
4348  if (filter) {
4349  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4350  }
4351  LOG(FATAL) << "Unhandled node type: "
4353  return {};
4354 }
4355 
4356 namespace {
4357 
4359  auto sink = get_data_sink(ra);
4360  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4361  return join->getJoinType();
4362  }
4363  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4364  return JoinType::INNER;
4365  }
4366 
4367  return JoinType::INVALID;
4368 }
4369 
4370 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4371  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4372  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4373  return nullptr;
4374  }
4375  const auto equi_join_condition =
4376  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4377  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4378  return nullptr;
4379  }
4380  const auto both_are_null_condition =
4381  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4382  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4383  both_are_null_condition->size() != 2) {
4384  return nullptr;
4385  }
4386  const auto lhs_is_null =
4387  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4388  const auto rhs_is_null =
4389  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4390  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4391  rhs_is_null->getOperator() != kISNULL) {
4392  return nullptr;
4393  }
4394  CHECK_EQ(size_t(1), lhs_is_null->size());
4395  CHECK_EQ(size_t(1), rhs_is_null->size());
4396  CHECK_EQ(size_t(2), equi_join_condition->size());
4397  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4398  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4399  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4400  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4401  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4402  return nullptr;
4403  }
4404  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4405  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4406  RexDeepCopyVisitor deep_copy_visitor;
4407  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4408  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4409  eq_operands.emplace_back(lhs_op_copy.release());
4410  eq_operands.emplace_back(rhs_op_copy.release());
4411  return boost::make_unique<const RexOperator>(
4412  kBW_EQ, eq_operands, equi_join_condition->getType());
4413  }
4414  return nullptr;
4415 }
4416 
4417 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4418  const RexScalar* scalar) {
4419  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4420  if (condition && condition->getOperator() == kAND) {
4421  CHECK_GE(condition->size(), size_t(2));
4422  auto acc = get_bitwise_equals(condition->getOperand(0));
4423  if (!acc) {
4424  return nullptr;
4425  }
4426  for (size_t i = 1; i < condition->size(); ++i) {
4427  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4428  and_operands.emplace_back(std::move(acc));
4429  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4430  acc =
4431  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4432  }
4433  return acc;
4434  }
4435  return get_bitwise_equals(scalar);
4436 }
4437 
4438 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4439  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4440  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4441  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4442  ++nesting_level) {
4443  if (left_deep_join->getOuterCondition(nesting_level)) {
4444  join_types[nesting_level - 1] = JoinType::LEFT;
4445  }
4446  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4447  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4448  join_types[nesting_level - 1] = cur_level_join_type;
4449  }
4450  }
4451  return join_types;
4452 }
4453 
4454 template <class RA>
4455 std::vector<size_t> do_table_reordering(
4456  std::vector<InputDescriptor>& input_descs,
4457  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4458  const JoinQualsPerNestingLevel& left_deep_join_quals,
4459  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4460  const RA* node,
4461  const std::vector<InputTableInfo>& query_infos,
4462  const Executor* executor) {
4463  if (g_cluster) {
4464  // Disable table reordering in distributed mode. The aggregator does not have enough
4465  // information to break ties
4466  return {};
4467  }
4468  if (node->isUpdateViaSelect() || node->isDeleteViaSelect()) {
4469  // Do not reorder tables for UPDATE and DELETE queries, since the outer table always
4470  // has to be the physical table.
4471  return {};
4472  }
4473  for (const auto& table_info : query_infos) {
4474  if (table_info.table_key.table_id < 0) {
4475  continue;
4476  }
4477  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
4478  CHECK(td);
4479  if (table_is_replicated(td)) {
4480  return {};
4481  }
4482  }
4483  const auto input_permutation =
4484  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4485  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4486  std::tie(input_descs, input_col_descs, std::ignore) =
4487  get_input_desc(node, input_to_nest_level, input_permutation);
4488  return input_permutation;
4489 }
4490 
4492  const RelLeftDeepInnerJoin* left_deep_join) {
4493  std::vector<size_t> input_sizes;
4494  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4495  const auto inputs = get_node_output(left_deep_join->getInput(i));
4496  input_sizes.push_back(inputs.size());
4497  }
4498  return input_sizes;
4499 }
4500 
4501 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4502  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4503  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4504  for (const auto& qual : quals) {
4505  const auto rewritten_qual = rewrite_expr(qual.get());
4506  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4507  }
4508  return rewritten_quals;
4509 }
4510 
4511 } // namespace
4512 
4514  const RelCompound* compound,
4515  const SortInfo& sort_info,
4516  const ExecutionOptions& eo) {
4517  std::vector<InputDescriptor> input_descs;
4518  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4519  auto input_to_nest_level = get_input_nest_levels(compound, {});
4520  std::tie(input_descs, input_col_descs, std::ignore) =
4521  get_input_desc(compound, input_to_nest_level, {});
4522  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4523  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4524  auto query_infos = get_table_infos(input_descs, executor_);
4525  CHECK_EQ(size_t(1), compound->inputCount());
4526  const auto left_deep_join =
4527  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4528  JoinQualsPerNestingLevel left_deep_join_quals;
4529  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4530  : std::vector<JoinType>{get_join_type(compound)};
4531  std::vector<size_t> input_permutation;
4532  std::vector<size_t> left_deep_join_input_sizes;
4533  std::optional<unsigned> left_deep_tree_id;
4534  if (left_deep_join) {
4535  left_deep_tree_id = left_deep_join->getId();
4536  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4537  left_deep_join_quals = translateLeftDeepJoinFilter(
4538  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4539  if (eo.table_reordering &&
4540  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4541  join_types.end()) {
4542  input_permutation = do_table_reordering(input_descs,
4543  input_col_descs,
4544  left_deep_join_quals,
4545  input_to_nest_level,
4546  compound,
4547  query_infos,
4548  executor_);
4549  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4550  std::tie(input_descs, input_col_descs, std::ignore) =
4551  get_input_desc(compound, input_to_nest_level, input_permutation);
4552  left_deep_join_quals = translateLeftDeepJoinFilter(
4553  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4554  }
4555  }
4556  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4557  input_descs,
4558  input_to_nest_level,
4559  input_permutation,
4560  input_col_descs,
4561  executor_);
4562  if (bbox_intersect_qual_info.is_reordered) {
4563  query_infos = get_table_infos(input_descs, executor_);
4564  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4565  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4566  }
4567  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4568  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4569  }
4570  RelAlgTranslator translator(
4571  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4572  const auto quals_cf = translate_quals(compound, translator);
4573  const auto quals = rewrite_quals(quals_cf.quals);
4574  const auto scalar_sources =
4575  translate_scalar_sources(compound, translator, eo.executor_type);
4576  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4577  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4578  const auto target_exprs = translate_targets(target_exprs_owned_,
4579  target_exprs_type_infos,
4580  scalar_sources,
4581  groupby_exprs,
4582  compound,
4583  translator,
4584  eo.executor_type);
4585 
4586  auto query_hint = RegisteredQueryHint::defaults();
4587  if (query_dag_) {
4588  auto candidate = query_dag_->getQueryHint(compound);
4589  if (candidate) {
4590  query_hint = *candidate;
4591  }
4592  }
4593  CHECK_EQ(compound->size(), target_exprs.size());
4594  const RelAlgExecutionUnit exe_unit = {input_descs,
4595  input_col_descs,
4596  quals_cf.simple_quals,
4597  quals,
4598  left_deep_join_quals,
4599  groupby_exprs,
4600  target_exprs,
4601  target_exprs_type_infos,
4602  nullptr,
4603  sort_info,
4604  0,
4605  query_hint,
4607  compound->getQueryPlanDagHash(), sort_info),
4608  {},
4609  {},
4610  false,
4611  std::nullopt,
4612  query_state_};
4613  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4614  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4615  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4616  compound->setOutputMetainfo(targets_meta);
4617  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4618  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4619  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4620  rewritten_exe_unit.join_quals);
4621  }
4622  if (has_valid_query_plan_dag(compound)) {
4623  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4624  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4625  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4626  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4627  }
4628  return {rewritten_exe_unit,
4629  compound,
4631  std::move(query_rewriter),
4632  input_permutation,
4633  left_deep_join_input_sizes};
4634 }
4635 
4636 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4637  const RelAlgNode* node) {
4638  auto input_to_nest_level = get_input_nest_levels(node, {});
4639  const auto left_deep_join =
4640  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4641  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4642  : std::vector<JoinType>{get_join_type(node)};
4643  return std::make_shared<RelAlgTranslator>(
4644  query_state_, executor_, input_to_nest_level, join_types, now_, false);
4645 }
4646 
4647 namespace {
4648 
4649 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4650  CHECK(qual_expr);
4651  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4652  if (!bin_oper || bin_oper->getOperator() != kAND) {
4653  return {qual_expr};
4654  }
4655  CHECK_GE(bin_oper->size(), size_t(2));
4656  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4657  for (size_t i = 1; i < bin_oper->size(); ++i) {
4658  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4659  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4660  }
4661  return lhs_cf;
4662 }
4663 
4664 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4665  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4666  const SQLOps sql_op) {
4667  CHECK(!factors.empty());
4668  auto acc = factors.front();
4669  for (size_t i = 1; i < factors.size(); ++i) {
4670  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4671  }
4672  return acc;
4673 }
4674 
4675 template <class QualsList>
4676 bool list_contains_expression(const QualsList& haystack,
4677  const std::shared_ptr<Analyzer::Expr>& needle) {
4678  for (const auto& qual : haystack) {
4679  if (*qual == *needle) {
4680  return true;
4681  }
4682  }
4683  return false;
4684 }
4685 
4686 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4687 // evaluations of `p` and allows use of the original form in joins if `p`
4688 // can be used for hash joins.
4689 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4690  const std::shared_ptr<Analyzer::Expr>& expr) {
4691  const auto expr_terms = qual_to_disjunctive_form(expr);
4692  CHECK_GE(expr_terms.size(), size_t(1));
4693  const auto& first_term = expr_terms.front();
4694  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4695  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4696  // First, collect the conjunctive components common to all the disjunctive components.
4697  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4698  for (const auto& first_term_factor : first_term_factors.quals) {
4699  bool is_common =
4700  expr_terms.size() > 1; // Only report common factors for disjunction.
4701  for (size_t i = 1; i < expr_terms.size(); ++i) {
4702  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4703  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4704  is_common = false;
4705  break;
4706  }
4707  }
4708  if (is_common) {
4709  common_factors.push_back(first_term_factor);
4710  }
4711  }
4712  if (common_factors.empty()) {
4713  return expr;
4714  }
4715  // Now that the common expressions are known, collect the remaining expressions.
4716  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4717  for (const auto& term : expr_terms) {
4718  const auto term_cf = qual_to_conjunctive_form(term);
4719  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4720  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4721  for (const auto& qual : term_cf.quals) {
4722  if (!list_contains_expression(common_factors, qual)) {
4723  remaining_quals.push_back(qual);
4724  }
4725  }
4726  if (!remaining_quals.empty()) {
4727  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4728  }
4729  }
4730  // Reconstruct the expression with the transformation applied.
4731  const auto common_expr = build_logical_expression(common_factors, kAND);
4732  if (remaining_terms.empty()) {
4733  return common_expr;
4734  }
4735  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4736  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4737 }
4738 
4739 } // namespace
4740 
4741 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4742  const RexScalar* join_condition,
4743  const std::vector<JoinType>& join_types,
4744  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4745  const bool just_explain) const {
4746  RelAlgTranslator translator(
4747  query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4748  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4749  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4750  for (const auto rex_condition_component : rex_condition_cf) {
4751  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4752  const auto join_condition = reverse_logical_distribution(
4753  translator.translate(bw_equals ? bw_equals.get() : rex_condition_component));
4754  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4755 
4756  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4757  for (const auto& cf_qual : cf_quals) {
4758  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4759  }
4760  };
4761 
4762  append_folded_cf_quals(join_condition_cf.quals);
4763  append_folded_cf_quals(join_condition_cf.simple_quals);
4764  }
4765  return combine_equi_join_conditions(join_condition_quals);
4766 }
4767 
4768 // Translate left deep join filter and separate the conjunctive form qualifiers
4769 // per nesting level. The code generated for hash table lookups on each level
4770 // must dominate its uses in deeper nesting levels.
4772  const RelLeftDeepInnerJoin* join,
4773  const std::vector<InputDescriptor>& input_descs,
4774  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4775  const bool just_explain) {
4776  const auto join_types = left_deep_join_types(join);
4777  const auto join_condition_quals = makeJoinQuals(
4778  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4779  MaxRangeTableIndexVisitor rte_idx_visitor;
4780  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4781  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4782  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4783  const auto outer_condition = join->getOuterCondition(rte_idx);
4784  if (outer_condition) {
4785  result[rte_idx - 1].quals =
4786  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4787  CHECK_LE(rte_idx, join_types.size());
4788  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4789  result[rte_idx - 1].type = JoinType::LEFT;
4790  continue;
4791  }
4792  for (const auto& qual : join_condition_quals) {
4793  if (visited_quals.count(qual)) {
4794  continue;
4795  }
4796  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4797  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4798  const auto it_ok = visited_quals.emplace(qual);
4799  CHECK(it_ok.second);
4800  result[rte_idx - 1].quals.push_back(qual);
4801  }
4802  }
4803  CHECK_LE(rte_idx, join_types.size());
4804  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4805  join_types[rte_idx - 1] == JoinType::SEMI ||
4806  join_types[rte_idx - 1] == JoinType::ANTI);
4807  result[rte_idx - 1].type = join_types[rte_idx - 1];
4808  }
4809  return result;
4810 }
4811 
4812 namespace {
4813 
4814 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4815  const RelAlgNode* ra_node,
4816  const size_t nest_level,
4817  const std::vector<TargetMetaInfo>& in_metainfo,
4818  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4819  CHECK_LE(size_t(1), ra_node->inputCount());
4820  CHECK_GE(size_t(2), ra_node->inputCount());
4821  const auto input = ra_node->getInput(nest_level);
4822  const auto it_rte_idx = input_to_nest_level.find(input);
4823  CHECK(it_rte_idx != input_to_nest_level.end());
4824  const int rte_idx = it_rte_idx->second;
4825  const auto table_key = table_key_from_ra(input);
4826  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4827  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4828  int input_idx = 0;
4829  for (const auto& input_meta : in_metainfo) {
4830  inputs.push_back(std::make_shared<Analyzer::ColumnVar>(
4831  input_meta.get_type_info(),
4832  shared::ColumnKey{table_key, scan_ra ? input_idx + 1 : input_idx},
4833  rte_idx));
4834  ++input_idx;
4835  }
4836  return inputs;
4837 }
4838 
4839 std::vector<Analyzer::Expr*> get_raw_pointers(
4840  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4841  std::vector<Analyzer::Expr*> output(input.size());
4842  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4843  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4844  return output;
4845 }
4846 
4847 } // namespace
4848 
4850  const RelAggregate* aggregate,
4851  const SortInfo& sort_info,
4852  const bool just_explain) {
4853  std::vector<InputDescriptor> input_descs;
4854  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4855  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4856  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4857  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4858  get_input_desc(aggregate, input_to_nest_level, {});
4859  const auto join_type = get_join_type(aggregate);
4860 
4861  RelAlgTranslator translator(
4862  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4863  CHECK_EQ(size_t(1), aggregate->inputCount());
4864  const auto source = aggregate->getInput(0);
4865  const auto& in_metainfo = source->getOutputMetainfo();
4866  const auto scalar_sources =
4867  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4868  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4869  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4870  const auto target_exprs = translate_targets(target_exprs_owned_,
4871  target_exprs_type_infos,
4872  scalar_sources,
4873  groupby_exprs,
4874  aggregate,
4875  translator);
4876 
4877  const auto query_infos = get_table_infos(input_descs, executor_);
4878 
4879  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4880  aggregate->setOutputMetainfo(targets_meta);
4881  auto query_hint = RegisteredQueryHint::defaults();
4882  if (query_dag_) {
4883  auto candidate = query_dag_->getQueryHint(aggregate);
4884  if (candidate) {
4885  query_hint = *candidate;
4886  }
4887  }
4888  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4889  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4890  return {RelAlgExecutionUnit{input_descs,
4891  input_col_descs,
4892  {},
4893  {},
4894  {},
4895  groupby_exprs,
4896  target_exprs,
4897  target_exprs_type_infos,
4898  nullptr,
4899  sort_info,
4900  0,
4901  query_hint,
4903  aggregate->getQueryPlanDagHash(), sort_info),
4904  join_info.hash_table_plan_dag,
4905  join_info.table_id_to_node_map,
4906  false,
4907  std::nullopt,
4908  query_state_},
4909  aggregate,
4911  nullptr};
4912 }
4913 
4915  const RelProject* project,
4916  const SortInfo& sort_info,
4917  const ExecutionOptions& eo) {
4918  std::vector<InputDescriptor> input_descs;
4919  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4920  auto input_to_nest_level = get_input_nest_levels(project, {});
4921  std::tie(input_descs, input_col_descs, std::ignore) =
4922  get_input_desc(project, input_to_nest_level, {});
4923  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4924  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4925  auto query_infos = get_table_infos(input_descs, executor_);
4926  const auto left_deep_join =
4927  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4928  JoinQualsPerNestingLevel left_deep_join_quals;
4929  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4930  : std::vector<JoinType>{get_join_type(project)};
4931  std::vector<size_t> input_permutation;
4932  std::vector<size_t> left_deep_join_input_sizes;
4933  std::optional<unsigned> left_deep_tree_id;
4934  if (left_deep_join) {
4935  left_deep_tree_id = left_deep_join->getId();
4936  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4937  left_deep_join_quals = translateLeftDeepJoinFilter(
4938  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4939  if (eo.table_reordering) {
4940  input_permutation = do_table_reordering(input_descs,
4941  input_col_descs,
4942  left_deep_join_quals,
4943  input_to_nest_level,
4944  project,
4945  query_infos,
4946  executor_);
4947  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4948  std::tie(input_descs, input_col_descs, std::ignore) =
4949  get_input_desc(project, input_to_nest_level, input_permutation);
4950  left_deep_join_quals = translateLeftDeepJoinFilter(
4951  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4952  }
4953  }
4954  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4955  input_descs,
4956  input_to_nest_level,
4957  input_permutation,
4958  input_col_descs,
4959  executor_);
4960  if (bbox_intersect_qual_info.is_reordered) {
4961  query_infos = get_table_infos(input_descs, executor_);
4962  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4963  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4964  }
4965  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4966  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4967  }
4968  RelAlgTranslator translator(
4969  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4970  const auto target_exprs_owned =
4971  translate_scalar_sources(project, translator, eo.executor_type);
4972 
4973  target_exprs_owned_.insert(
4974  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4975  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4976  auto query_hint = RegisteredQueryHint::defaults();
4977  if (query_dag_) {
4978  auto candidate = query_dag_->getQueryHint(project);
4979  if (candidate) {
4980  query_hint = *candidate;
4981  }
4982  }
4983  const RelAlgExecutionUnit exe_unit = {input_descs,
4984  input_col_descs,
4985  {},
4986  {},
4987  left_deep_join_quals,
4988  {nullptr},
4989  target_exprs,
4990  {},
4991  nullptr,
4992  sort_info,
4993  0,
4994  query_hint,
4996  project->getQueryPlanDagHash(), sort_info),
4997  {},
4998  {},
4999  false,
5000  std::nullopt,
5001  query_state_};
5002  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5003  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5004  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
5005  project->setOutputMetainfo(targets_meta);
5006  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
5007  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
5008  left_deep_trees_info.emplace(left_deep_tree_id.value(),
5009  rewritten_exe_unit.join_quals);
5010  }
5011  if (has_valid_query_plan_dag(project)) {
5012  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5013  project, left_deep_tree_id, left_deep_trees_info, executor_);
5014  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
5015  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
5016  }
5017  return {rewritten_exe_unit,
5018  project,
5020  std::move(query_rewriter),
5021  input_permutation,
5022  left_deep_join_input_sizes};
5023 }
5024 
5025 namespace {
5026 
5027 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
5028  RelAlgNode const* input_node) {
5029  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
5030  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
5031  const int negative_node_id = -input_node->getId();
5032  int32_t db_id{0};
5033  if (auto rel_scan = dynamic_cast<const RelScan*>(input_node)) {
5034  db_id = rel_scan->getCatalog().getDatabaseId();
5035  }
5036  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
5037  target_exprs.reserve(tmis.size());
5038  for (size_t i = 0; i < tmis.size(); ++i) {
5039  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
5040  tmis[i].get_type_info(),
5041  shared::ColumnKey{db_id, negative_node_id, int32_t(i)},
5042  0));
5043  }
5044  return target_exprs;
5045 }
5046 
5047 } // namespace
5048 
5050  const RelLogicalUnion* logical_union,
5051  const SortInfo& sort_info,
5052  const ExecutionOptions& eo) {
5053  std::vector<InputDescriptor> input_descs;
5054  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5055  // Map ra input ptr to index (0, 1).
5056  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
5057  std::tie(input_descs, input_col_descs, std::ignore) =
5058  get_input_desc(logical_union, input_to_nest_level, {});
5059  const auto query_infos = get_table_infos(input_descs, executor_);
5060  auto const max_num_tuples =
5061  std::accumulate(query_infos.cbegin(),
5062  query_infos.cend(),
5063  size_t(0),
5064  [](auto max, auto const& query_info) {
5065  return std::max(max, query_info.info.getNumTuples());
5066  });
5067 
5068  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
5069  for (auto& pair : input_to_nest_level) {
5070  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
5071  << pair.second << ')';
5072  }
5073 
5074  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5075  // may differ on StringDictionaries.
5076  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5077  for (unsigned i = 0; i < 2; ++i) {
5078  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5079  CHECK(!input_exprs_owned.empty())
5080  << "No metainfo found for input node(" << i << ") "
5081  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5082  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5083  for (auto& input_expr : input_exprs_owned) {
5084  VLOG(3) << " " << input_expr->toString();
5085  }
5086  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5087  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5088  }
5089 
5090  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5091  << " input_col_descs=" << shared::printContainer(input_col_descs)
5092  << " target_exprs.size()=" << target_exprs_pair[0].size()
5093  << " max_num_tuples=" << max_num_tuples;
5094 
5095  const RelAlgExecutionUnit exe_unit = {input_descs,
5096  input_col_descs,
5097  {}, // quals_cf.simple_quals,
5098  {}, // rewrite_quals(quals_cf.quals),
5099  {},
5100  {nullptr},
5101  target_exprs_pair[0],
5102  {},
5103  nullptr,
5104  sort_info,
5105  max_num_tuples,
5108  {},
5109  {},
5110  false,
5111  logical_union->isAll(),
5112  query_state_,
5113  target_exprs_pair[1]};
5114  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5115  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5116 
5117  RelAlgNode const* input0 = logical_union->getInput(0);
5118  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5119  logical_union->setOutputMetainfo(
5120  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5121  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5122  logical_union->setOutputMetainfo(
5123  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5124  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5125  logical_union->setOutputMetainfo(
5126  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5127  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5128  logical_union->setOutputMetainfo(
5129  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5130  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5131  logical_union->setOutputMetainfo(
5132  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5133  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5134  logical_union->setOutputMetainfo(
5135  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5136  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5137  logical_union->setOutputMetainfo(
5138  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5139  } else if (dynamic_cast<const RelSort*>(input0)) {
5140  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5141  } else {
5142  throw QueryNotSupported("Unsupported input type: " +
5144  }
5145  VLOG(3) << "logical_union->getOutputMetainfo()="
5146  << shared::printContainer(logical_union->getOutputMetainfo())
5147  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5148  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5149 
5150  return {rewritten_exe_unit,
5151  logical_union,
5153  std::move(query_rewriter)};
5154 }
5155 
5157  const RelTableFunction* rel_table_func,
5158  const bool just_explain,
5159  const bool is_gpu) {
5160  std::vector<InputDescriptor> input_descs;
5161  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5162  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5163  std::tie(input_descs, input_col_descs, std::ignore) =
5164  get_input_desc(rel_table_func, input_to_nest_level, {});
5165  const auto query_infos = get_table_infos(input_descs, executor_);
5166  RelAlgTranslator translator(
5167  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5168  auto input_exprs_owned = translate_scalar_sources(
5169  rel_table_func, translator, ::ExecutorType::TableFunctions);
5170  target_exprs_owned_.insert(
5171  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5172 
5173  const auto table_function_impl_and_type_infos = [=]() {
5174  if (is_gpu) {
5175  try {
5176  return bind_table_function(
5177  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5178  } catch (ExtensionFunctionBindingError& e) {
5179  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5180  << " Redirecting " << rel_table_func->getFunctionName()
5181  << " step to run on CPU.";
5182  throw QueryMustRunOnCpu();
5183  }
5184  } else {
5185  try {
5186  return bind_table_function(
5187  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5188  } catch (ExtensionFunctionBindingError& e) {
5189  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5190  throw;
5191  }
5192  }
5193  }();
5194  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5195  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5196  size_t output_row_sizing_param = 0;
5197  if (table_function_impl
5198  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5199  const auto parameter_index =
5200  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5201  CHECK_GT(parameter_index, size_t(0));
5202  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5203  const auto parameter_expr =
5204  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5205  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5206  if (!parameter_expr_literal) {
5207  throw std::runtime_error(
5208  "Provided output buffer sizing parameter is not a literal. Only literal "
5209  "values are supported with output buffer sizing configured table "
5210  "functions.");
5211  }
5212  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5213  if (literal_val < 0) {
5214  throw std::runtime_error("Provided output sizing parameter " +
5215  std::to_string(literal_val) +
5216  " must be positive integer.");
5217  }
5218  output_row_sizing_param = static_cast<size_t>(literal_val);
5219  } else {
5220  // RowMultiplier not specified in the SQL query. Set it to 1
5221  output_row_sizing_param = 1; // default value for RowMultiplier
5222  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5223  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5224  makeExpr<Analyzer::Constant>(kINT, false, d);
5225  // Push the constant 1 to input_exprs
5226  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5227  DEFAULT_ROW_MULTIPLIER_EXPR);
5228  }
5229  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5230  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5231  } else {
5232  UNREACHABLE();
5233  }
5234 
5235  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5236  size_t input_index = 0;
5237  size_t arg_index = 0;
5238  const auto table_func_args = table_function_impl.getInputArgs();
5239  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5240  for (const auto& ti : table_function_type_infos) {
5241  if (ti.is_column_list()) {
5242  for (int i = 0; i < ti.get_dimension(); i++) {
5243  auto& input_expr = input_exprs_owned[input_index];
5244  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5245  CHECK(col_var);
5246 
5247  // avoid setting type info to ti here since ti doesn't have all the
5248  // properties correctly set
5249  auto type_info = input_expr->get_type_info();
5250  if (ti.is_column_array()) {