OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
21 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RelAlgDag.h"
38 #include "QueryEngine/RexVisitor.h"
42 #include "Shared/measure.h"
43 #include "Shared/misc.h"
44 #include "Shared/shard_key.h"
45 
46 #include <boost/algorithm/cxx11/any_of.hpp>
47 #include <boost/range/adaptor/reversed.hpp>
48 
49 #include <algorithm>
50 #include <functional>
51 #include <numeric>
52 
54 bool g_enable_interop{false};
55 bool g_enable_union{true}; // DEPRECATED
59 
60 extern bool g_enable_watchdog;
62 extern bool g_enable_bump_allocator;
64 extern bool g_enable_system_tables;
65 
66 namespace {
67 
68 bool node_is_aggregate(const RelAlgNode* ra) {
69  const auto compound = dynamic_cast<const RelCompound*>(ra);
70  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
71  return ((compound && compound->isAggregate()) || aggregate);
72 }
73 
74 std::unordered_set<PhysicalInput> get_physical_inputs(
76  const RelAlgNode* ra) {
77  auto phys_inputs = get_physical_inputs(ra);
78  std::unordered_set<PhysicalInput> phys_inputs2;
79  for (auto& phi : phys_inputs) {
80  phys_inputs2.insert(
81  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
82  }
83  return phys_inputs2;
84 }
85 
86 void set_parallelism_hints(const RelAlgNode& ra_node,
87  const Catalog_Namespace::Catalog& catalog) {
88  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
89  parallelism_hints_per_table;
90  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
91  int table_id = physical_input.table_id;
92  auto table = catalog.getMetadataForTable(table_id, false);
93  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
94  !table->is_in_memory_system_table) {
95  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
96  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
97  auto foreign_table = catalog.getForeignTable(table_id);
98  for (const auto& fragment :
99  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
100  Chunk_NS::Chunk chunk{col_desc};
101  ChunkKey chunk_key = {
102  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
103 
104  // Parallelism hints should not include fragments that are not mapped to the
105  // current node, otherwise we will try to prefetch them and run into trouble.
107  continue;
108  }
109 
110  // do not include chunk hints that are in CPU memory
111  if (!chunk.isChunkOnDevice(
112  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
113  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
115  fragment.fragmentId});
116  }
117  }
118  }
119  }
120  if (!parallelism_hints_per_table.empty()) {
121  auto foreign_storage_mgr =
123  CHECK(foreign_storage_mgr);
124  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
125  }
126 }
127 
129  const Catalog_Namespace::Catalog& catalog) {
130  for (const auto [col_id, table_id] : get_physical_inputs(&ra_node)) {
131  auto table = catalog.getMetadataForTable(table_id, false);
132  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
133  auto spi_col_id = catalog.getColumnIdBySpi(table_id, col_id);
134  foreign_storage::populate_string_dictionary(table_id, spi_col_id, catalog);
135  }
136  }
137 }
138 
140  const Catalog_Namespace::Catalog& catalog) {
141  // Iterate through ra_node inputs for types that need to be loaded pre-execution
142  // If they do not have valid metadata, load them into CPU memory to generate
143  // the metadata and leave them ready to be used by the query
144  set_parallelism_hints(ra_node, catalog);
145  prepare_string_dictionaries(ra_node, catalog);
146 }
147 
149  const Catalog_Namespace::Catalog& catalog,
150  const CompilationOptions& co) {
152  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
153  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
154  int table_id = physical_input.table_id;
155  auto table = catalog.getMetadataForTable(table_id, false);
156  if (table && table->is_in_memory_system_table) {
157  auto column_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
158  system_table_columns_by_table_id[table_id].emplace_back(column_id);
159  }
160  }
161  // Execute on CPU for queries involving system tables
162  if (!system_table_columns_by_table_id.empty() &&
164  throw QueryMustRunOnCpu();
165  }
166 
167  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
168  // Clear any previously cached data, since system tables depend on point in
169  // time data snapshots.
171  ChunkKey{catalog.getDatabaseId(), table_id}, Data_Namespace::CPU_LEVEL);
172 
173  // TODO(Misiu): This prefetching can be removed if we can add support for
174  // ExpressionRanges to reduce invalid with valid ranges (right now prefetching
175  // causes us to fetch the chunks twice). Right now if we do not prefetch (i.e. if
176  // we remove the code below) some nodes will return valid ranges and others will
177  // return unknown because they only use placeholder metadata and the LeafAggregator
178  // has no idea how to reduce the two.
179  auto td = catalog.getMetadataForTable(table_id);
180  CHECK(td);
181  CHECK(td->fragmenter);
182  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
183  CHECK_LE(fragment_count, static_cast<size_t>(1))
184  << "In-memory system tables are expected to have a single fragment.";
185  if (fragment_count > 0) {
186  for (auto column_id : column_ids) {
187  // Prefetch system table chunks in order to force chunk statistics metadata
188  // computation.
189  auto cd = catalog.getMetadataForColumn(table_id, column_id);
190  ChunkKey chunk_key{catalog.getDatabaseId(), table_id, column_id, 0};
192  cd, &(catalog.getDataMgr()), chunk_key, Data_Namespace::CPU_LEVEL, 0, 0, 0);
193  }
194  }
195  }
196  }
197 }
198 
201 }
202 
203 // TODO(alex): Once we're fully migrated to the relational algebra model, change
204 // the executor interface to use the collation directly and remove this conversion.
205 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
206  std::list<Analyzer::OrderEntry> result;
207  for (size_t i = 0; i < sort->collationCount(); ++i) {
208  const auto sort_field = sort->getCollation(i);
209  result.emplace_back(sort_field.getField() + 1,
210  sort_field.getSortDir() == SortDirection::Descending,
211  sort_field.getNullsPosition() == NullSortedPosition::First);
212  }
213  return result;
214 }
215 
217  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
218  const std::vector<TargetMetaInfo>& targets_meta) {
219  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
220  render_info.targets.clear();
221  for (size_t i = 0; i < targets_meta.size(); ++i) {
222  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
223  targets_meta[i].get_resname(),
224  work_unit_target_exprs[i]->get_shared_ptr(),
225  false));
226  }
227 }
228 
230  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
231 }
232 
233 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
234  public:
235  std::vector<unsigned> visitLeftDeepInnerJoin(
236  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
237  return {left_deep_join_tree->getId()};
238  }
239 
240  protected:
241  std::vector<unsigned> aggregateResult(
242  const std::vector<unsigned>& aggregate,
243  const std::vector<unsigned>& next_result) const override {
244  auto result = aggregate;
245  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
246  return result;
247  }
248 };
249 
253  TextEncodingCastCounts() : text_decoding_casts(0UL), text_encoding_casts(0UL) {}
254  TextEncodingCastCounts(const size_t text_decoding_casts,
255  const size_t text_encoding_casts)
256  : text_decoding_casts(text_decoding_casts)
257  , text_encoding_casts(text_encoding_casts) {}
258 };
259 
260 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
261  public:
262  TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
263  : default_disregard_casts_to_none_encoding_(
264  default_disregard_casts_to_none_encoding) {}
265 
266  protected:
267  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
268  TextEncodingCastCounts result = defaultResult();
269  // Save state of input disregard_casts_to_none_encoding_ as child node traversal
270  // will reset it
271  const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
272  result = aggregateResult(result, visit(u_oper->get_operand()));
273  if (u_oper->get_optype() != kCAST) {
274  return result;
275  }
276  const auto& operand_ti = u_oper->get_operand()->get_type_info();
277  const auto& casted_ti = u_oper->get_type_info();
278  if (!operand_ti.is_string() || !casted_ti.is_string()) {
279  return result;
280  }
281  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
282  if (literals_only) {
283  return result;
284  }
285  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
286  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
287  }
288  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
289  if (!disregard_casts_to_none_encoding) {
290  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
291  } else {
292  return result;
293  }
294  }
295  return result;
296  }
297 
298  TextEncodingCastCounts visitBinOper(const Analyzer::BinOper* bin_oper) const override {
299  TextEncodingCastCounts result = defaultResult();
300  // Currently the join framework handles casts between string types, and
301  // casts to none-encoded strings should be considered spurious, except
302  // when the join predicate is not a =/<> operator, in which case
303  // for both join predicates and all other instances we have to decode
304  // to a none-encoded string to do the comparison. The logic below essentially
305  // overrides the logic such as to always count none-encoded casts on strings
306  // that are children of binary operators other than =/<>
307  if (bin_oper->get_optype() != kEQ && bin_oper->get_optype() != kNE) {
308  // Override the global override so that join opers don't skip
309  // the check when there is an actual cast to none-encoded string
310  const auto prev_disregard_casts_to_none_encoding_state =
311  disregard_casts_to_none_encoding_;
312  const auto left_u_oper =
313  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
314  if (left_u_oper && left_u_oper->get_optype() == kCAST) {
315  disregard_casts_to_none_encoding_ = false;
316  result = aggregateResult(result, visitUOper(left_u_oper));
317  } else {
318  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
319  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
320  }
321 
322  const auto right_u_oper =
323  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
324  if (right_u_oper && right_u_oper->get_optype() == kCAST) {
325  disregard_casts_to_none_encoding_ = false;
326  result = aggregateResult(result, visitUOper(right_u_oper));
327  } else {
328  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
329  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
330  }
331  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
332  } else {
333  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
334  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
335  }
336  return result;
337  }
338 
340  TextEncodingCastCounts result = defaultResult();
341  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
342  const auto prev_disregard_casts_to_none_encoding_state =
343  disregard_casts_to_none_encoding_;
344  if (u_oper && u_oper->get_optype() == kCAST) {
345  disregard_casts_to_none_encoding_ = true;
346  result = aggregateResult(result, visitUOper(u_oper));
347  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
348  } else {
349  result = aggregateResult(result, visit(like->get_arg()));
350  }
351  result = aggregateResult(result, visit(like->get_like_expr()));
352  if (like->get_escape_expr()) {
353  result = aggregateResult(result, visit(like->get_escape_expr()));
354  }
355  return result;
356  }
357 
359  const TextEncodingCastCounts& aggregate,
360  const TextEncodingCastCounts& next_result) const override {
361  auto result = aggregate;
362  result.text_decoding_casts += next_result.text_decoding_casts;
363  result.text_encoding_casts += next_result.text_encoding_casts;
364  return result;
365  }
366 
367  void visitBegin() const override {
368  disregard_casts_to_none_encoding_ = default_disregard_casts_to_none_encoding_;
369  }
370 
372  return TextEncodingCastCounts();
373  }
374 
375  private:
376  mutable bool disregard_casts_to_none_encoding_ = false;
378 };
379 
381  TextEncodingCastCounts cast_counts;
382 
383  auto check_node_for_text_casts = [&cast_counts](
384  const Analyzer::Expr* expr,
385  const bool disregard_casts_to_none_encoding) {
386  if (!expr) {
387  return;
388  }
389  TextEncodingCastCountVisitor visitor(disregard_casts_to_none_encoding);
390  const auto this_node_cast_counts = visitor.visit(expr);
391  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
392  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
393  };
394 
395  for (const auto& qual : ra_exe_unit.quals) {
396  check_node_for_text_casts(qual.get(), false);
397  }
398  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
399  check_node_for_text_casts(simple_qual.get(), false);
400  }
401  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
402  check_node_for_text_casts(groupby_expr.get(), false);
403  }
404  for (const auto& target_expr : ra_exe_unit.target_exprs) {
405  check_node_for_text_casts(target_expr, false);
406  }
407  for (const auto& join_condition : ra_exe_unit.join_quals) {
408  for (const auto& join_qual : join_condition.quals) {
409  // We currently need to not count casts to none-encoded strings for join quals,
410  // as analyzer will generate these but our join framework disregards them.
411  // Some investigation was done on having analyzer issue the correct inter-string
412  // dictionary casts, but this actually causes them to get executed and so the same
413  // work gets done twice.
414  check_node_for_text_casts(join_qual.get(),
415  true /* disregard_casts_to_none_encoding */);
416  }
417  }
418  return cast_counts;
419 }
420 
422  const std::vector<InputTableInfo>& query_infos,
423  const RelAlgExecutionUnit& ra_exe_unit) {
424  if (!g_enable_watchdog) {
425  return;
426  }
427  auto const tuples_upper_bound =
428  std::accumulate(query_infos.cbegin(),
429  query_infos.cend(),
430  size_t(0),
431  [](auto max, auto const& query_info) {
432  return std::max(max, query_info.info.getNumTuples());
433  });
434  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
435  return;
436  }
437 
438  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
439  const bool has_text_casts =
440  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
441 
442  if (!has_text_casts) {
443  return;
444  }
445  std::ostringstream oss;
446  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
447  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
448  << "exceeds the configured watchdog none-encoded string translation limit of "
450  throw std::runtime_error(oss.str());
451 }
452 
453 } // namespace
454 
456  RenderInfo* render_info) const {
457  auto validate_or_explain_query = is_validate_or_explain_query(eo);
458  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
460  !validate_or_explain_query && !hasStepForUnion() &&
461  !query_for_partial_outer_frag &&
462  (!render_info || (render_info && !render_info->isInSitu()));
463 }
464 
466  const ExecutionOptions& eo) {
467  if (eo.find_push_down_candidates) {
468  return 0;
469  }
470 
471  if (eo.just_explain) {
472  return 0;
473  }
474 
475  CHECK(query_dag_);
476 
477  query_dag_->resetQueryExecutionState();
478  const auto& ra = query_dag_->getRootNode();
479 
480  auto lock = executor_->acquireExecuteMutex();
481  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
482  setupCaching(&ra);
483 
484  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
485  auto ed_seq = RaExecutionSequence(&ra, executor_);
486 
487  if (!getSubqueries().empty()) {
488  return 0;
489  }
490 
491  CHECK(!ed_seq.empty());
492  if (ed_seq.size() > 1) {
493  return 0;
494  }
495 
498  executor_->setCatalog(&cat_);
499  executor_->temporary_tables_ = &temporary_tables_;
500 
501  auto exec_desc_ptr = ed_seq.getDescriptor(0);
502  CHECK(exec_desc_ptr);
503  auto& exec_desc = *exec_desc_ptr;
504  const auto body = exec_desc.getBody();
505  if (body->isNop()) {
506  return 0;
507  }
508 
509  const auto project = dynamic_cast<const RelProject*>(body);
510  if (project) {
511  auto work_unit =
512  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
513 
514  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
515  executor_);
516  }
517 
518  const auto compound = dynamic_cast<const RelCompound*>(body);
519  if (compound) {
520  if (compound->isDeleteViaSelect()) {
521  return 0;
522  } else if (compound->isUpdateViaSelect()) {
523  return 0;
524  } else {
525  if (compound->isAggregate()) {
526  return 0;
527  }
528 
529  const auto work_unit =
530  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
531 
532  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
533  executor_);
534  }
535  }
536 
537  return 0;
538 }
539 
541  const ExecutionOptions& eo,
542  const bool just_explain_plan,
543  RenderInfo* render_info) {
544  CHECK(query_dag_);
546  << static_cast<int>(query_dag_->getBuildState());
547 
548  auto timer = DEBUG_TIMER(__func__);
550 
551  auto run_query = [&](const CompilationOptions& co_in) {
552  auto execution_result =
553  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
554 
555  constexpr bool vlog_result_set_summary{false};
556  if constexpr (vlog_result_set_summary) {
557  VLOG(1) << execution_result.getRows()->summaryToString();
558  }
559 
561  VLOG(1) << "Running post execution callback.";
562  (*post_execution_callback_)();
563  }
564  return execution_result;
565  };
566 
567  try {
568  return run_query(co);
569  } catch (const QueryMustRunOnCpu&) {
570  if (!g_allow_cpu_retry) {
571  throw;
572  }
573  }
574  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
575  auto co_cpu = CompilationOptions::makeCpuOnly(co);
576 
577  if (render_info) {
578  render_info->forceNonInSitu();
579  }
580  return run_query(co_cpu);
581 }
582 
584  const ExecutionOptions& eo,
585  const bool just_explain_plan,
586  RenderInfo* render_info) {
588  auto timer = DEBUG_TIMER(__func__);
589  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
590 
591  query_dag_->resetQueryExecutionState();
592  const auto& ra = query_dag_->getRootNode();
593 
594  // capture the lock acquistion time
595  auto clock_begin = timer_start();
597  executor_->resetInterrupt();
598  }
599  std::string query_session{""};
600  std::string query_str{"N/A"};
601  std::string query_submitted_time{""};
602  // gather necessary query's info
603  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
604  query_session = query_state_->getConstSessionInfo()->get_session_id();
605  query_str = query_state_->getQueryStr();
606  query_submitted_time = query_state_->getQuerySubmittedTime();
607  }
608 
609  auto validate_or_explain_query =
610  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
611  auto interruptable = !render_info && !query_session.empty() &&
612  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
613  if (interruptable) {
614  // if we reach here, the current query which was waiting an idle executor
615  // within the dispatch queue is now scheduled to the specific executor
616  // (not UNITARY_EXECUTOR)
617  // so we update the query session's status with the executor that takes this query
618  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
619  query_session, query_str, query_submitted_time);
620 
621  // now the query is going to be executed, so update the status as
622  // "RUNNING_QUERY_KERNEL"
623  executor_->updateQuerySessionStatus(
624  query_session,
625  query_submitted_time,
626  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
627  }
628 
629  // so it should do cleanup session info after finishing its execution
630  ScopeGuard clearQuerySessionInfo =
631  [this, &query_session, &interruptable, &query_submitted_time] {
632  // reset the runtime query interrupt status after the end of query execution
633  if (interruptable) {
634  // cleanup running session's info
635  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
636  }
637  };
638 
639  auto acquire_execute_mutex = [](Executor * executor) -> auto {
640  auto ret = executor->acquireExecuteMutex();
641  return ret;
642  };
643  // now we acquire executor lock in here to make sure that this executor holds
644  // all necessary resources and at the same time protect them against other executor
645  auto lock = acquire_execute_mutex(executor_);
646 
647  if (interruptable) {
648  // check whether this query session is "already" interrupted
649  // this case occurs when there is very short gap between being interrupted and
650  // taking the execute lock
651  // if so we have to remove "all" queries initiated by this session and we do in here
652  // without running the query
653  try {
654  executor_->checkPendingQueryStatus(query_session);
655  } catch (QueryExecutionError& e) {
657  throw std::runtime_error("Query execution has been interrupted (pending query)");
658  }
659  throw e;
660  } catch (...) {
661  throw std::runtime_error("Checking pending query status failed: unknown error");
662  }
663  }
664  int64_t queue_time_ms = timer_stop(clock_begin);
665 
667 
668  // Notify foreign tables to load prior to caching
670 
671  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
672  setupCaching(&ra);
673 
674  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
675  auto ed_seq = RaExecutionSequence(&ra, executor_);
676 
677  if (just_explain_plan) {
678  std::stringstream ss;
679  std::vector<const RelAlgNode*> nodes;
680  for (size_t i = 0; i < ed_seq.size(); i++) {
681  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
682  }
683  size_t ctr_node_id_in_plan = nodes.size();
684  for (auto& body : boost::adaptors::reverse(nodes)) {
685  // we set each node's id in the query plan in advance before calling toString
686  // method to properly represent the query plan
687  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
688  body->setIdInPlanTree(node_id_in_plan_tree);
689  }
690  size_t ctr = nodes.size();
691  size_t tab_ctr = 0;
692  RelRexToStringConfig config;
693  config.skip_input_nodes = true;
694  for (auto& body : boost::adaptors::reverse(nodes)) {
695  const auto index = ctr--;
696  const auto tabs = std::string(tab_ctr++, '\t');
697  CHECK(body);
698  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
699  if (auto sort = dynamic_cast<const RelSort*>(body)) {
700  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
701  }
702  if (dynamic_cast<const RelProject*>(body) ||
703  dynamic_cast<const RelCompound*>(body)) {
704  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
705  ss << tabs << " : " << join->toString(config) << "\n";
706  }
707  }
708  }
709  const auto& subqueries = getSubqueries();
710  if (!subqueries.empty()) {
711  ss << "Subqueries: "
712  << "\n";
713  for (const auto& subquery : subqueries) {
714  const auto ra = subquery->getRelAlg();
715  ss << "\t" << ra->toString(config) << "\n";
716  }
717  }
718  auto rs = std::make_shared<ResultSet>(ss.str());
719  return {rs, {}};
720  }
721 
722  if (eo.find_push_down_candidates) {
723  // this extra logic is mainly due to current limitations on multi-step queries
724  // and/or subqueries.
726  ed_seq, co, eo, render_info, queue_time_ms);
727  }
728  timer_setup.stop();
729 
730  // Dispatch the subqueries first
731  for (auto subquery : getSubqueries()) {
732  const auto subquery_ra = subquery->getRelAlg();
733  CHECK(subquery_ra);
734  if (subquery_ra->hasContextData()) {
735  continue;
736  }
737  // Execute the subquery and cache the result.
738  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
739  RaExecutionSequence subquery_seq(subquery_ra, executor_);
740  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
741  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
742  }
743  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
744 }
745 
747  AggregatedColRange agg_col_range_cache;
748  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
749  return executor_->computeColRangesCache(phys_inputs);
750 }
751 
753  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
754  return executor_->computeStringDictionaryGenerations(phys_inputs);
755 }
756 
758  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
759  return executor_->computeTableGenerations(phys_table_ids);
760 }
761 
762 Executor* RelAlgExecutor::getExecutor() const {
763  return executor_;
764 }
765 
767  CHECK(executor_);
768  executor_->row_set_mem_owner_ = nullptr;
769 }
770 
771 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
773  auto sort_node = dynamic_cast<const RelSort*>(root_node);
774  if (sort_node) {
775  // we assume that test query that needs join info does not contain any sort node
776  return {};
777  }
778  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
779  RelLeftDeepTreeIdsCollector visitor;
780  auto left_deep_tree_ids = visitor.visit(root_node);
781  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
782 }
783 
784 namespace {
785 
787  CHECK_EQ(size_t(1), sort->inputCount());
788  const auto source = sort->getInput(0);
789  if (dynamic_cast<const RelSort*>(source)) {
790  throw std::runtime_error("Sort node not supported as input to another sort");
791  }
792 }
793 
794 } // namespace
795 
797  const RaExecutionSequence& seq,
798  const size_t step_idx,
799  const CompilationOptions& co,
800  const ExecutionOptions& eo,
801  RenderInfo* render_info) {
802  INJECT_TIMER(executeRelAlgQueryStep);
803 
804  auto exe_desc_ptr = seq.getDescriptor(step_idx);
805  CHECK(exe_desc_ptr);
806  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
807 
808  size_t shard_count{0};
809  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
810  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
811  };
812 
813  if (sort) {
815  auto order_entries = get_order_entries(sort);
816  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
818  source_work_unit.exe_unit, *executor_->getCatalog());
819  if (!shard_count) {
820  // No point in sorting on the leaf, only execute the input to the sort node.
821  CHECK_EQ(size_t(1), sort->inputCount());
822  const auto source = sort->getInput(0);
823  if (sort->collationCount() || node_is_aggregate(source)) {
824  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
825  CHECK_EQ(temp_seq.size(), size_t(1));
826  ExecutionOptions eo_copy = {
828  eo.keep_result,
829  eo.allow_multifrag,
830  eo.just_explain,
831  eo.allow_loop_joins,
832  eo.with_watchdog,
833  eo.jit_debug,
834  eo.just_validate || sort->isEmptyResult(),
843  eo.executor_type,
844  };
845  // Use subseq to avoid clearing existing temporary tables
846  return {
847  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
848  merge_type(source),
849  source->getId(),
850  false};
851  }
852  }
853  }
856  std::make_pair(step_idx, step_idx + 1),
857  co,
858  eo,
859  render_info,
861  merge_type(exe_desc_ptr->getBody()),
862  exe_desc_ptr->getBody()->getId(),
863  false};
865  VLOG(1) << "Running post execution callback.";
866  (*post_execution_callback_)();
867  }
868  return result;
869 }
870 
872  const AggregatedColRange& agg_col_range,
873  const StringDictionaryGenerations& string_dictionary_generations,
874  const TableGenerations& table_generations) {
875  // capture the lock acquistion time
876  auto clock_begin = timer_start();
878  executor_->resetInterrupt();
879  }
880  queue_time_ms_ = timer_stop(clock_begin);
881  executor_->row_set_mem_owner_ =
882  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
883  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
884  executor_->table_generations_ = table_generations;
885  executor_->agg_col_range_cache_ = agg_col_range;
886 }
887 
889  const CompilationOptions& co,
890  const ExecutionOptions& eo,
891  RenderInfo* render_info,
892  const int64_t queue_time_ms,
893  const bool with_existing_temp_tables) {
895  auto timer = DEBUG_TIMER(__func__);
896  if (!with_existing_temp_tables) {
898  }
901  executor_->setCatalog(&cat_);
902  executor_->temporary_tables_ = &temporary_tables_;
903 
904  time(&now_);
905  CHECK(!seq.empty());
906 
907  auto get_descriptor_count = [&seq, &eo]() -> size_t {
908  if (eo.just_explain) {
909  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
910  // run the logical values descriptor to generate the result set, then the next
911  // descriptor to generate the explain
912  CHECK_GE(seq.size(), size_t(2));
913  return 2;
914  } else {
915  return 1;
916  }
917  } else {
918  return seq.size();
919  }
920  };
921 
922  const auto exec_desc_count = get_descriptor_count();
923  auto eo_copied = eo;
924  if (seq.hasQueryStepForUnion()) {
925  // we currently do not support resultset recycling when an input query
926  // contains union (all) operation
927  eo_copied.keep_result = false;
928  }
929 
930  // we have to register resultset(s) of the skipped query step(s) as temporary table
931  // before executing the remaining query steps
932  // since they may be required during the query processing
933  // i.e., get metadata of the target expression from the skipped query step
935  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
936  const auto cached_res =
937  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
938  CHECK(cached_res);
939  addTemporaryTable(kv.first, cached_res);
940  }
941  }
942 
943  const auto num_steps = exec_desc_count - 1;
944  for (size_t i = 0; i < exec_desc_count; i++) {
945  VLOG(1) << "Executing query step " << i << " / " << num_steps;
946  try {
948  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
949  } catch (const QueryMustRunOnCpu&) {
950  // Do not allow per-step retry if flag is off or in distributed mode
951  // TODO(todd): Determine if and when we can relax this restriction
952  // for distributed
955  throw;
956  }
957  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
958  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
959  if (render_info && i == num_steps) {
960  // only render on the last step
961  render_info->forceNonInSitu();
962  }
963  executeRelAlgStep(seq,
964  i,
965  co_cpu,
966  eo_copied,
967  (i == num_steps) ? render_info : nullptr,
968  queue_time_ms);
969  } catch (const NativeExecutionError&) {
970  if (!g_enable_interop) {
971  throw;
972  }
973  auto eo_extern = eo_copied;
974  eo_extern.executor_type = ::ExecutorType::Extern;
975  auto exec_desc_ptr = seq.getDescriptor(i);
976  const auto body = exec_desc_ptr->getBody();
977  const auto compound = dynamic_cast<const RelCompound*>(body);
978  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979  LOG(INFO) << "Also failed to run the query using interoperability";
980  throw;
981  }
983  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
984  }
985  }
986 
987  return seq.getDescriptor(num_steps)->getResult();
988 }
989 
991  const RaExecutionSequence& seq,
992  const std::pair<size_t, size_t> interval,
993  const CompilationOptions& co,
994  const ExecutionOptions& eo,
995  RenderInfo* render_info,
996  const int64_t queue_time_ms) {
998  executor_->setCatalog(&cat_);
999  executor_->temporary_tables_ = &temporary_tables_;
1001  time(&now_);
1002  for (size_t i = interval.first; i < interval.second; i++) {
1003  // only render on the last step
1004  try {
1005  executeRelAlgStep(seq,
1006  i,
1007  co,
1008  eo,
1009  (i == interval.second - 1) ? render_info : nullptr,
1010  queue_time_ms);
1011  } catch (const QueryMustRunOnCpu&) {
1012  // Do not allow per-step retry if flag is off or in distributed mode
1013  // TODO(todd): Determine if and when we can relax this restriction
1014  // for distributed
1017  throw;
1018  }
1019  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1020  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1021  if (render_info && i == interval.second - 1) {
1022  render_info->forceNonInSitu();
1023  }
1024  executeRelAlgStep(seq,
1025  i,
1026  co_cpu,
1027  eo,
1028  (i == interval.second - 1) ? render_info : nullptr,
1029  queue_time_ms);
1030  }
1031  }
1032 
1033  return seq.getDescriptor(interval.second - 1)->getResult();
1034 }
1035 
1037  const size_t step_idx,
1038  const CompilationOptions& co,
1039  const ExecutionOptions& eo,
1040  RenderInfo* render_info,
1041  const int64_t queue_time_ms) {
1043  auto timer = DEBUG_TIMER(__func__);
1044  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1045  CHECK(exec_desc_ptr);
1046  auto& exec_desc = *exec_desc_ptr;
1047  const auto body = exec_desc.getBody();
1048  if (body->isNop()) {
1049  handleNop(exec_desc);
1050  return;
1051  }
1052 
1053  const ExecutionOptions eo_work_unit{
1055  eo.keep_result,
1056  eo.allow_multifrag,
1057  eo.just_explain,
1058  eo.allow_loop_joins,
1059  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
1060  eo.jit_debug,
1061  eo.just_validate,
1070  eo.executor_type,
1071  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
1072 
1073  auto handle_hint = [co,
1074  eo_work_unit,
1075  body,
1076  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1077  ExecutionOptions eo_hint_applied = eo_work_unit;
1078  CompilationOptions co_hint_applied = co;
1079  auto target_node = body;
1080  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1081  target_node = sort_body->getInput(0);
1082  }
1083  auto query_hints = getParsedQueryHint(target_node);
1084  auto columnar_output_hint_enabled = false;
1085  auto rowwise_output_hint_enabled = false;
1086  if (query_hints) {
1087  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1088  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1089  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1090  }
1091  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1092  if (!g_enable_data_recycler) {
1093  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1094  "recycler is disabled";
1095  }
1097  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1098  "resultset recycler is disabled";
1099  } else {
1100  VLOG(1) << "A user enables keeping query resultset";
1101  eo_hint_applied.keep_result = true;
1102  }
1103  }
1104  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1105  // we use this hint within the function 'executeTableFunction`
1106  if (!g_enable_data_recycler) {
1107  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1108  "since data recycler is disabled";
1109  }
1111  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1112  "since query resultset recycler is disabled";
1113  } else {
1114  VLOG(1) << "A user enables keeping table function's resultset";
1115  eo_hint_applied.keep_result = true;
1116  }
1117  }
1118  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1119  VLOG(1) << "A user forces the query to run with columnar output";
1120  columnar_output_hint_enabled = true;
1121  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1122  VLOG(1) << "A user forces the query to run with rowwise output";
1123  rowwise_output_hint_enabled = true;
1124  }
1125  }
1126  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1127  ? !rowwise_output_hint_enabled
1128  : columnar_output_hint_enabled;
1129  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1130  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1131  "output layout in distributed mode.";
1132  }
1133  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1134  return std::make_pair(co_hint_applied, eo_hint_applied);
1135  };
1136 
1137  auto hint_applied = handle_hint();
1139 
1140  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1141  if (auto cached_resultset =
1142  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1143  body->getQueryPlanDagHash())) {
1144  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1145  << " from resultset cache";
1146  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1147  if (render_info) {
1148  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1149  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1150  body->getQueryPlanDagHash());
1151  std::vector<Analyzer::Expr*> copied_target_exprs;
1152  for (const auto& expr : cached_target_exprs) {
1153  copied_target_exprs.push_back(expr.get());
1154  }
1156  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1157  }
1158  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1159  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1160  return;
1161  }
1162  }
1163 
1164  const auto compound = dynamic_cast<const RelCompound*>(body);
1165  if (compound) {
1166  if (compound->isDeleteViaSelect()) {
1167  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1168  } else if (compound->isUpdateViaSelect()) {
1169  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1170  } else {
1171  exec_desc.setResult(executeCompound(
1172  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1173  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1174  << static_cast<int>(-compound->getId()) << ", ...)"
1175  << " exec_desc.getResult().getDataPtr()->rowCount()="
1176  << exec_desc.getResult().getDataPtr()->rowCount();
1177  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1178  return;
1179  }
1180  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1181  }
1182  return;
1183  }
1184  const auto project = dynamic_cast<const RelProject*>(body);
1185  if (project) {
1186  if (project->isDeleteViaSelect()) {
1187  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1188  } else if (project->isUpdateViaSelect()) {
1189  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1190  } else {
1191  std::optional<size_t> prev_count;
1192  // Disabling the intermediate count optimization in distributed, as the previous
1193  // execution descriptor will likely not hold the aggregated result.
1194  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1195  // If the previous node produced a reliable count, skip the pre-flight count.
1196  RelAlgNode const* const prev_body = project->getInput(0);
1197  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1198  if (RaExecutionDesc const* const prev_exec_desc =
1199  prev_body->hasContextData()
1200  ? prev_body->getContextData()
1201  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1202  const auto& prev_exe_result = prev_exec_desc->getResult();
1203  const auto prev_result = prev_exe_result.getRows();
1204  if (prev_result) {
1205  prev_count = prev_result->rowCount();
1206  VLOG(3) << "Setting output row count for projection node to previous node ("
1207  << prev_exec_desc->getBody()->toString(
1209  << ") to " << *prev_count;
1210  }
1211  }
1212  }
1213  }
1214  exec_desc.setResult(executeProject(project,
1215  hint_applied.first,
1216  hint_applied.second,
1217  render_info,
1218  queue_time_ms,
1219  prev_count));
1220  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1221  << static_cast<int>(-project->getId()) << ", ...)"
1222  << " exec_desc.getResult().getDataPtr()->rowCount()="
1223  << exec_desc.getResult().getDataPtr()->rowCount();
1224  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1225  return;
1226  }
1227  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1228  }
1229  return;
1230  }
1231  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1232  if (aggregate) {
1233  exec_desc.setResult(executeAggregate(
1234  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1235  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1236  return;
1237  }
1238  const auto filter = dynamic_cast<const RelFilter*>(body);
1239  if (filter) {
1240  exec_desc.setResult(executeFilter(
1241  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1242  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1243  return;
1244  }
1245  const auto sort = dynamic_cast<const RelSort*>(body);
1246  if (sort) {
1247  exec_desc.setResult(executeSort(
1248  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1249  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1250  return;
1251  }
1252  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1253  return;
1254  }
1255  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1256  if (logical_values) {
1257  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1258  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1259  return;
1260  }
1261  const auto modify = dynamic_cast<const RelModify*>(body);
1262  if (modify) {
1263  exec_desc.setResult(executeModify(modify, hint_applied.second));
1264  return;
1265  }
1266  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1267  if (logical_union) {
1268  exec_desc.setResult(executeUnion(logical_union,
1269  seq,
1270  hint_applied.first,
1271  hint_applied.second,
1272  render_info,
1273  queue_time_ms));
1274  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1275  return;
1276  }
1277  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1278  if (table_func) {
1279  exec_desc.setResult(executeTableFunction(
1280  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1281  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1282  return;
1283  }
1284  LOG(FATAL) << "Unhandled body type: "
1285  << body->toString(RelRexToStringConfig::defaults());
1286 }
1287 
1289  // just set the result of the previous node as the result of no op
1290  auto body = ed.getBody();
1291  CHECK(dynamic_cast<const RelAggregate*>(body));
1292  CHECK_EQ(size_t(1), body->inputCount());
1293  const auto input = body->getInput(0);
1294  body->setOutputMetainfo(input->getOutputMetainfo());
1295  const auto it = temporary_tables_.find(-input->getId());
1296  CHECK(it != temporary_tables_.end());
1297  // set up temp table as it could be used by the outer query or next step
1298  addTemporaryTable(-body->getId(), it->second);
1299 
1300  ed.setResult({it->second, input->getOutputMetainfo()});
1301 }
1302 
1303 namespace {
1304 
1305 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1306  public:
1308 
1309  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1310  return synthesized_physical_inputs_owned;
1311  }
1312 
1313  std::unordered_set<const RexInput*> visitInput(
1314  const RexInput* rex_input) const override {
1315  const auto input_ra = rex_input->getSourceNode();
1316  CHECK(input_ra);
1317  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1318  if (scan_ra) {
1319  const auto td = scan_ra->getTableDescriptor();
1320  if (td) {
1321  const auto col_id = rex_input->getIndex();
1322  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
1323  if (cd && cd->columnType.get_physical_cols() > 0) {
1324  CHECK(IS_GEO(cd->columnType.get_type()));
1325  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1326  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1327  auto physical_input =
1328  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1329  synthesized_physical_inputs_owned.emplace_back(physical_input);
1330  synthesized_physical_inputs.insert(physical_input);
1331  }
1332  return synthesized_physical_inputs;
1333  }
1334  }
1335  }
1336  return {rex_input};
1337  }
1338 
1339  protected:
1340  std::unordered_set<const RexInput*> aggregateResult(
1341  const std::unordered_set<const RexInput*>& aggregate,
1342  const std::unordered_set<const RexInput*>& next_result) const override {
1343  auto result = aggregate;
1344  result.insert(next_result.begin(), next_result.end());
1345  return result;
1346  }
1347 
1348  private:
1349  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1351 };
1352 
1353 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1354  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1355  return table_func;
1356  }
1357  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1358  CHECK_EQ(size_t(2), join->inputCount());
1359  return join;
1360  }
1361  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1362  CHECK_EQ(size_t(1), ra_node->inputCount());
1363  }
1364  auto only_src = ra_node->getInput(0);
1365  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1366  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1367  return is_join ? only_src : ra_node;
1368 }
1369 
1370 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1372  RexUsedInputsVisitor visitor(cat);
1373  const auto filter_expr = compound->getFilterExpr();
1374  std::unordered_set<const RexInput*> used_inputs =
1375  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1376  const auto sources_size = compound->getScalarSourcesSize();
1377  for (size_t i = 0; i < sources_size; ++i) {
1378  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1379  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1380  }
1381  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1382  return std::make_pair(used_inputs, used_inputs_owned);
1383 }
1384 
1385 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1387  CHECK_EQ(size_t(1), aggregate->inputCount());
1388  std::unordered_set<const RexInput*> used_inputs;
1389  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1390  const auto source = aggregate->getInput(0);
1391  const auto& in_metainfo = source->getOutputMetainfo();
1392  const auto group_count = aggregate->getGroupByCount();
1393  CHECK_GE(in_metainfo.size(), group_count);
1394  for (size_t i = 0; i < group_count; ++i) {
1395  auto synthesized_used_input = new RexInput(source, i);
1396  used_inputs_owned.emplace_back(synthesized_used_input);
1397  used_inputs.insert(synthesized_used_input);
1398  }
1399  for (const auto& agg_expr : aggregate->getAggExprs()) {
1400  for (size_t i = 0; i < agg_expr->size(); ++i) {
1401  const auto operand_idx = agg_expr->getOperand(i);
1402  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1403  auto synthesized_used_input = new RexInput(source, operand_idx);
1404  used_inputs_owned.emplace_back(synthesized_used_input);
1405  used_inputs.insert(synthesized_used_input);
1406  }
1407  }
1408  return std::make_pair(used_inputs, used_inputs_owned);
1409 }
1410 
1411 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1413  RexUsedInputsVisitor visitor(cat);
1414  std::unordered_set<const RexInput*> used_inputs;
1415  for (size_t i = 0; i < project->size(); ++i) {
1416  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1417  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1418  }
1419  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1420  return std::make_pair(used_inputs, used_inputs_owned);
1421 }
1422 
1423 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1426  RexUsedInputsVisitor visitor(cat);
1427  std::unordered_set<const RexInput*> used_inputs;
1428  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1429  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1430  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1431  }
1432  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1433  return std::make_pair(used_inputs, used_inputs_owned);
1434 }
1435 
1436 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1438  std::unordered_set<const RexInput*> used_inputs;
1439  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1440  const auto data_sink_node = get_data_sink(filter);
1441  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1442  const auto source = data_sink_node->getInput(nest_level);
1443  const auto scan_source = dynamic_cast<const RelScan*>(source);
1444  if (scan_source) {
1445  CHECK(source->getOutputMetainfo().empty());
1446  for (size_t i = 0; i < scan_source->size(); ++i) {
1447  auto synthesized_used_input = new RexInput(scan_source, i);
1448  used_inputs_owned.emplace_back(synthesized_used_input);
1449  used_inputs.insert(synthesized_used_input);
1450  }
1451  } else {
1452  const auto& partial_in_metadata = source->getOutputMetainfo();
1453  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1454  auto synthesized_used_input = new RexInput(source, i);
1455  used_inputs_owned.emplace_back(synthesized_used_input);
1456  used_inputs.insert(synthesized_used_input);
1457  }
1458  }
1459  }
1460  return std::make_pair(used_inputs, used_inputs_owned);
1461 }
1462 
1463 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1465  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1466  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1467  used_inputs_owned.reserve(logical_union->inputCount());
1468  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1469  auto const n_inputs = logical_union->inputCount();
1470  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1471  auto input = logical_union->getInput(nest_level);
1472  for (size_t i = 0; i < input->size(); ++i) {
1473  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1474  used_inputs.insert(used_inputs_owned.back().get());
1475  }
1476  }
1477  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1478 }
1479 
1480 int table_id_from_ra(const RelAlgNode* ra_node) {
1481  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1482  if (scan_ra) {
1483  const auto td = scan_ra->getTableDescriptor();
1484  CHECK(td);
1485  return td->tableId;
1486  }
1487  return -ra_node->getId();
1488 }
1489 
1490 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1491  const RelAlgNode* ra_node,
1492  const std::vector<size_t>& input_permutation) {
1493  const auto data_sink_node = get_data_sink(ra_node);
1494  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1495  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1496  const auto input_node_idx =
1497  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1498  const auto input_ra = data_sink_node->getInput(input_node_idx);
1499  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1500  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1501  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1502  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1503  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1504  CHECK(it_ok.second);
1505  LOG_IF(INFO, !input_permutation.empty())
1506  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1507  << " to nest level " << input_idx;
1508  }
1509  return input_to_nest_level;
1510 }
1511 
1512 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1515  const auto data_sink_node = get_data_sink(ra_node);
1516  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1517  CHECK_EQ(join->inputCount(), 2u);
1518  const auto condition = join->getCondition();
1519  RexUsedInputsVisitor visitor(cat);
1520  auto condition_inputs = visitor.visit(condition);
1521  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1522  visitor.get_inputs_owned());
1523  return std::make_pair(condition_inputs, condition_inputs_owned);
1524  }
1525 
1526  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1527  CHECK_GE(left_deep_join->inputCount(), 2u);
1528  const auto condition = left_deep_join->getInnerCondition();
1529  RexUsedInputsVisitor visitor(cat);
1530  auto result = visitor.visit(condition);
1531  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1532  ++nesting_level) {
1533  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1534  if (outer_condition) {
1535  const auto outer_result = visitor.visit(outer_condition);
1536  result.insert(outer_result.begin(), outer_result.end());
1537  }
1538  }
1539  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1540  return std::make_pair(result, used_inputs_owned);
1541  }
1542 
1543  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1544  CHECK_GT(ra_node->inputCount(), 1u)
1546  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1547  // no-op
1548  CHECK_GE(ra_node->inputCount(), 0u)
1550  } else {
1551  CHECK_EQ(ra_node->inputCount(), 1u)
1553  }
1554  return std::make_pair(std::unordered_set<const RexInput*>{},
1555  std::vector<std::shared_ptr<RexInput>>{});
1556 }
1557 
1559  std::vector<InputDescriptor>& input_descs,
1561  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1562  const RelAlgNode* ra_node,
1563  const std::unordered_set<const RexInput*>& source_used_inputs,
1564  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1565  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1566  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1567  << " source_used_inputs.size()=" << source_used_inputs.size();
1568  for (const auto used_input : source_used_inputs) {
1569  const auto input_ra = used_input->getSourceNode();
1570  const int table_id = table_id_from_ra(input_ra);
1571  const auto col_id = used_input->getIndex();
1572  auto it = input_to_nest_level.find(input_ra);
1573  if (it != input_to_nest_level.end()) {
1574  const int input_desc = it->second;
1575  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1576  dynamic_cast<const RelScan*>(input_ra)
1577  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1578  : col_id,
1579  table_id,
1580  input_desc));
1581  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1582  throw std::runtime_error("Bushy joins not supported");
1583  }
1584  }
1585 }
1586 
1587 template <class RA>
1588 std::pair<std::vector<InputDescriptor>,
1589  std::list<std::shared_ptr<const InputColDescriptor>>>
1590 get_input_desc_impl(const RA* ra_node,
1591  const std::unordered_set<const RexInput*>& used_inputs,
1592  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1593  const std::vector<size_t>& input_permutation,
1595  std::vector<InputDescriptor> input_descs;
1596  const auto data_sink_node = get_data_sink(ra_node);
1597  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1598  const auto input_node_idx =
1599  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1600  auto input_ra = data_sink_node->getInput(input_node_idx);
1601  const int table_id = table_id_from_ra(input_ra);
1602  input_descs.emplace_back(table_id, input_idx);
1603  }
1604  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1605  collect_used_input_desc(input_descs,
1606  cat,
1607  input_col_descs_unique, // modified
1608  ra_node,
1609  used_inputs,
1610  input_to_nest_level);
1611  std::unordered_set<const RexInput*> join_source_used_inputs;
1612  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1613  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1614  get_join_source_used_inputs(ra_node, cat);
1615  collect_used_input_desc(input_descs,
1616  cat,
1617  input_col_descs_unique, // modified
1618  ra_node,
1619  join_source_used_inputs,
1620  input_to_nest_level);
1621  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1622  input_col_descs_unique.begin(), input_col_descs_unique.end());
1623 
1624  std::sort(input_col_descs.begin(),
1625  input_col_descs.end(),
1626  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1627  std::shared_ptr<const InputColDescriptor> const& rhs) {
1628  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1629  lhs->getColId(),
1630  lhs->getScanDesc().getTableId()) <
1631  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1632  rhs->getColId(),
1633  rhs->getScanDesc().getTableId());
1634  });
1635  return {input_descs,
1636  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1637  input_col_descs.end())};
1638 }
1639 
1640 template <class RA>
1641 std::tuple<std::vector<InputDescriptor>,
1642  std::list<std::shared_ptr<const InputColDescriptor>>,
1643  std::vector<std::shared_ptr<RexInput>>>
1644 get_input_desc(const RA* ra_node,
1645  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1646  const std::vector<size_t>& input_permutation,
1647  const Catalog_Namespace::Catalog& cat) {
1648  std::unordered_set<const RexInput*> used_inputs;
1649  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1650  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1651  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1652  auto input_desc_pair = get_input_desc_impl(
1653  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1654  return std::make_tuple(
1655  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1656 }
1657 
1658 size_t get_scalar_sources_size(const RelCompound* compound) {
1659  return compound->getScalarSourcesSize();
1660 }
1661 
1662 size_t get_scalar_sources_size(const RelProject* project) {
1663  return project->size();
1664 }
1665 
1666 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1667  return table_func->getTableFuncInputsSize();
1668 }
1669 
1670 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1671  return compound->getScalarSource(i);
1672 }
1673 
1674 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1675  return project->getProjectAt(i);
1676 }
1677 
1678 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1679  return table_func->getTableFuncInputAt(i);
1680 }
1681 
1682 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1683  const std::shared_ptr<Analyzer::Expr> expr) {
1684  const auto& ti = expr->get_type_info();
1685  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1686  return expr;
1687  }
1688  auto transient_dict_ti = ti;
1689  transient_dict_ti.set_compression(kENCODING_DICT);
1690  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1691  transient_dict_ti.set_fixed_size();
1692  return expr->add_cast(transient_dict_ti);
1693 }
1694 
1696  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1697  const std::shared_ptr<Analyzer::Expr>& expr) {
1698  try {
1699  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1700  } catch (...) {
1701  scalar_sources.push_back(fold_expr(expr.get()));
1702  }
1703 }
1704 
1705 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1706  const std::shared_ptr<Analyzer::Expr>& input) {
1707  const auto& input_ti = input->get_type_info();
1708  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1709  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1710  }
1711  return input;
1712 }
1713 
1714 template <class RA>
1715 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1716  const RA* ra_node,
1717  const RelAlgTranslator& translator,
1718  const ::ExecutorType executor_type) {
1719  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1720  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1721  VLOG(3) << "get_scalar_sources_size("
1722  << ra_node->toString(RelRexToStringConfig::defaults())
1723  << ") = " << scalar_sources_size;
1724  for (size_t i = 0; i < scalar_sources_size; ++i) {
1725  const auto scalar_rex = scalar_at(i, ra_node);
1726  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1727  // RexRef are synthetic scalars we append at the end of the real ones
1728  // for the sake of taking memory ownership, no real work needed here.
1729  continue;
1730  }
1731 
1732  const auto scalar_expr =
1733  rewrite_array_elements(translator.translate(scalar_rex).get());
1734  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1735  if (executor_type == ExecutorType::Native) {
1736  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1737  } else if (executor_type == ExecutorType::TableFunctions) {
1738  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1739  } else {
1740  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1741  }
1742  }
1743 
1744  return scalar_sources;
1745 }
1746 
1747 template <class RA>
1748 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1749  const RA* ra_node,
1750  const RelAlgTranslator& translator,
1751  int32_t tableId,
1752  const Catalog_Namespace::Catalog& cat,
1753  const ColumnNameList& colNames,
1754  size_t starting_projection_column_idx) {
1755  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1756  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1757  const auto scalar_rex = scalar_at(i, ra_node);
1758  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1759  // RexRef are synthetic scalars we append at the end of the real ones
1760  // for the sake of taking memory ownership, no real work needed here.
1761  continue;
1762  }
1763 
1764  std::shared_ptr<Analyzer::Expr> translated_expr;
1765  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1766  translated_expr = cast_to_column_type(translator.translate(scalar_rex),
1767  tableId,
1768  cat,
1769  colNames[i - starting_projection_column_idx]);
1770  } else {
1771  translated_expr = translator.translate(scalar_rex);
1772  }
1773  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1774  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1775  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1776  }
1777 
1778  return scalar_sources;
1779 }
1780 
1781 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1782  const RelCompound* compound,
1783  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1784  if (!compound->isAggregate()) {
1785  return {nullptr};
1786  }
1787  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1788  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1789  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1790  }
1791  return groupby_exprs;
1792 }
1793 
1794 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1795  const RelAggregate* aggregate,
1796  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1797  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1798  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1799  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1800  }
1801  return groupby_exprs;
1802 }
1803 
1805  const RelAlgTranslator& translator) {
1806  const auto filter_rex = compound->getFilterExpr();
1807  const auto filter_expr = filter_rex ? translator.translate(filter_rex) : nullptr;
1808  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1810 }
1811 
1812 namespace {
1813 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1814 // bother decoding it. This is done by changing the sql type to an integer.
1816  size_t target_expr_idx,
1817  std::shared_ptr<Analyzer::Expr>& target_expr,
1818  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1819  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1820  CHECK(agg_expr);
1821  if (agg_expr->get_is_distinct()) {
1822  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1823  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1824  target_exprs_type_infos.emplace(target_expr_idx, ti);
1825  target_expr = target_expr->deep_copy();
1826  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1828  return;
1829  }
1830  }
1831  target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1832 }
1833 } // namespace
1834 
1835 std::vector<Analyzer::Expr*> translate_targets(
1836  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1837  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1838  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1839  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1840  const RelCompound* compound,
1841  const RelAlgTranslator& translator,
1842  const ExecutorType executor_type) {
1843  std::vector<Analyzer::Expr*> target_exprs;
1844  for (size_t i = 0; i < compound->size(); ++i) {
1845  const auto target_rex = compound->getTargetExpr(i);
1846  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1847  std::shared_ptr<Analyzer::Expr> target_expr;
1848  if (target_rex_agg) {
1849  target_expr =
1850  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1851  conditionally_change_arg_to_int_type(i, target_expr, target_exprs_type_infos);
1852  } else {
1853  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1854  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1855  if (target_rex_ref) {
1856  const auto ref_idx = target_rex_ref->getIndex();
1857  CHECK_GE(ref_idx, size_t(1));
1858  CHECK_LE(ref_idx, groupby_exprs.size());
1859  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1860  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1861  } else {
1862  target_expr = translator.translate(target_rex_scalar);
1863  auto rewritten_expr = rewrite_expr(target_expr.get());
1864  target_expr = fold_expr(rewritten_expr.get());
1865  if (executor_type == ExecutorType::Native) {
1866  try {
1867  target_expr = set_transient_dict(target_expr);
1868  } catch (...) {
1869  // noop
1870  }
1871  } else {
1872  target_expr = cast_dict_to_none(target_expr);
1873  }
1874  }
1875  target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1876  }
1877  CHECK(target_expr);
1878  target_exprs_owned.push_back(target_expr);
1879  target_exprs.push_back(target_expr.get());
1880  }
1881  return target_exprs;
1882 }
1883 
1884 std::vector<Analyzer::Expr*> translate_targets(
1885  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1886  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1887  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1888  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1889  const RelAggregate* aggregate,
1890  const RelAlgTranslator& translator) {
1891  std::vector<Analyzer::Expr*> target_exprs;
1892  size_t group_key_idx = 1;
1893  for (const auto& groupby_expr : groupby_exprs) {
1894  auto target_expr =
1895  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1896  target_exprs_owned.push_back(target_expr);
1897  target_exprs.push_back(target_expr.get());
1898  }
1899 
1900  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1901  auto target_expr =
1902  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1903  CHECK(target_expr);
1904  target_expr = fold_expr(target_expr.get());
1905  target_exprs_owned.push_back(target_expr);
1906  target_exprs.push_back(target_expr.get());
1907  }
1908  return target_exprs;
1909 }
1910 
1912  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1913  return agg_expr && agg_expr->get_is_distinct();
1914 }
1915 
1916 bool is_agg(const Analyzer::Expr* expr) {
1917  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1918  if (agg_expr && agg_expr->get_contains_agg()) {
1919  auto agg_type = agg_expr->get_aggtype();
1920  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1921  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1922  return true;
1923  }
1924  }
1925  return false;
1926 }
1927 
1929  if (is_count_distinct(&expr)) {
1930  return SQLTypeInfo(kBIGINT, false);
1931  } else if (is_agg(&expr)) {
1933  }
1934  return get_logical_type_info(expr.get_type_info());
1935 }
1936 
1937 template <class RA>
1938 std::vector<TargetMetaInfo> get_targets_meta(
1939  const RA* ra_node,
1940  const std::vector<Analyzer::Expr*>& target_exprs) {
1941  std::vector<TargetMetaInfo> targets_meta;
1942  CHECK_EQ(ra_node->size(), target_exprs.size());
1943  for (size_t i = 0; i < ra_node->size(); ++i) {
1944  CHECK(target_exprs[i]);
1945  // TODO(alex): remove the count distinct type fixup.
1946  targets_meta.emplace_back(ra_node->getFieldName(i),
1947  get_logical_type_for_expr(*target_exprs[i]),
1948  target_exprs[i]->get_type_info());
1949  }
1950  return targets_meta;
1951 }
1952 
1953 template <>
1954 std::vector<TargetMetaInfo> get_targets_meta(
1955  const RelFilter* filter,
1956  const std::vector<Analyzer::Expr*>& target_exprs) {
1957  RelAlgNode const* input0 = filter->getInput(0);
1958  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1959  return get_targets_meta(input, target_exprs);
1960  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1961  return get_targets_meta(input, target_exprs);
1962  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1963  return get_targets_meta(input, target_exprs);
1964  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1965  return get_targets_meta(input, target_exprs);
1966  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1967  return get_targets_meta(input, target_exprs);
1968  }
1969  UNREACHABLE() << "Unhandled node type: "
1971  return {};
1972 }
1973 
1974 } // namespace
1975 
1977  const CompilationOptions& co_in,
1978  const ExecutionOptions& eo_in,
1979  const int64_t queue_time_ms) {
1980  CHECK(node);
1981  auto timer = DEBUG_TIMER(__func__);
1982 
1983  auto co = co_in;
1984  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1985  // encoded string updates
1986 
1987  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1988  auto& work_unit,
1989  const bool is_aggregate) {
1990  auto table_descriptor = node->getModifiedTableDescriptor();
1991  CHECK(table_descriptor);
1992  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1993  throw std::runtime_error(
1994  "UPDATE queries involving variable length columns are only supported on tables "
1995  "with the vacuum attribute set to 'delayed'");
1996  }
1997 
1998  Executor::clearExternalCaches(true, table_descriptor, cat_.getDatabaseId());
1999 
2001  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2002  node->getTargetColumns(),
2003  node->getOutputMetainfo(),
2004  node->isVarlenUpdateRequired());
2005 
2006  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2007 
2008  auto execute_update_ra_exe_unit =
2009  [this, &co, &eo_in, &table_infos, &table_descriptor, &node](
2010  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2012 
2013  auto eo = eo_in;
2014  if (dml_transaction_parameters_->tableIsTemporary()) {
2015  eo.output_columnar_hint = true;
2016  co_project.allow_lazy_fetch = false;
2017  co_project.filter_on_deleted_column =
2018  false; // project the entire delete column for columnar update
2019  }
2020 
2021  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2022  dml_transaction_parameters_.get());
2023  update_transaction_parameters->setInputSourceNode(node);
2024  CHECK(update_transaction_parameters);
2025  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2026  try {
2027  auto table_update_metadata =
2028  executor_->executeUpdate(ra_exe_unit,
2029  table_infos,
2030  table_descriptor,
2031  co_project,
2032  eo,
2033  cat_,
2034  executor_->row_set_mem_owner_,
2035  update_callback,
2036  is_aggregate);
2037  post_execution_callback_ = [table_update_metadata, this]() {
2038  dml_transaction_parameters_->finalizeTransaction(cat_);
2039  TableOptimizer table_optimizer{
2040  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2041  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2042  };
2043  } catch (const QueryExecutionError& e) {
2044  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2045  }
2046  };
2047 
2048  if (dml_transaction_parameters_->tableIsTemporary()) {
2049  // hold owned target exprs during execution if rewriting
2050  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2051  // rewrite temp table updates to generate the full column by moving the where
2052  // clause into a case if such a rewrite is not possible, bail on the update
2053  // operation build an expr for the update target
2054  auto update_transaction_params =
2055  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2056  CHECK(update_transaction_params);
2057  const auto td = update_transaction_params->getTableDescriptor();
2058  CHECK(td);
2059  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2060  if (update_column_names.size() > 1) {
2061  throw std::runtime_error(
2062  "Multi-column update is not yet supported for temporary tables.");
2063  }
2064 
2065  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
2066  CHECK(cd);
2067  auto projected_column_to_update =
2068  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
2069  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2070  work_unit.exe_unit, projected_column_to_update);
2071  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2072  throw std::runtime_error(
2073  "Variable length updates not yet supported on temporary tables.");
2074  }
2075  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2076  } else {
2077  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2078  }
2079  };
2080 
2081  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2082  auto work_unit =
2083  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2084 
2085  execute_update_for_node(compound, work_unit, compound->isAggregate());
2086  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2087  auto work_unit =
2088  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2089 
2090  if (project->isSimple()) {
2091  CHECK_EQ(size_t(1), project->inputCount());
2092  const auto input_ra = project->getInput(0);
2093  if (dynamic_cast<const RelSort*>(input_ra)) {
2094  const auto& input_table =
2095  get_temporary_table(&temporary_tables_, -input_ra->getId());
2096  CHECK(input_table);
2097  work_unit.exe_unit.scan_limit = input_table->rowCount();
2098  }
2099  }
2100  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2101  // the first condition means this project node has at least one window function
2102  // and the second condition indicates that this project node falls into
2103  // one of the following cases:
2104  // 1) window function expression on a multi-fragmented table
2105  // 2) window function expression is too complex to evaluate without codegen:
2106  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2107  // evaluate such a complex window function expression
2108  // 3) nested window function expression
2109  // but currently we do not support update on a multi-fragmented table having
2110  // window function, so the second condition only refers to non-fragmented table with
2111  // cases 2) or 3)
2112  // if at least one of two conditions satisfy, we must compute corresponding window
2113  // context before entering `execute_update_for_node` to properly update the table
2114  if (!leaf_results_.empty()) {
2115  throw std::runtime_error(
2116  "Update query having window function is not yet supported in distributed "
2117  "mode.");
2118  }
2119  ColumnCacheMap column_cache;
2121  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2122  }
2123  execute_update_for_node(project, work_unit, false);
2124  } else {
2125  throw std::runtime_error("Unsupported parent node for update: " +
2126  node->toString(RelRexToStringConfig::defaults()));
2127  }
2128 }
2129 
2131  const CompilationOptions& co,
2132  const ExecutionOptions& eo_in,
2133  const int64_t queue_time_ms) {
2134  CHECK(node);
2135  auto timer = DEBUG_TIMER(__func__);
2136 
2137  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2138  auto& work_unit,
2139  const bool is_aggregate) {
2140  auto* table_descriptor = node->getModifiedTableDescriptor();
2141  CHECK(table_descriptor);
2142  if (!table_descriptor->hasDeletedCol) {
2143  throw std::runtime_error(
2144  "DELETE queries are only supported on tables with the vacuum attribute set to "
2145  "'delayed'");
2146  }
2147 
2148  Executor::clearExternalCaches(false, table_descriptor, cat_.getDatabaseId());
2149 
2150  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2151 
2152  auto execute_delete_ra_exe_unit =
2153  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
2154  const bool is_aggregate) {
2156  std::make_unique<DeleteTransactionParameters>(table_descriptor);
2157  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2159  CHECK(delete_params);
2160  auto delete_callback = yieldDeleteCallback(*delete_params);
2162 
2163  auto eo = eo_in;
2164  if (dml_transaction_parameters_->tableIsTemporary()) {
2165  eo.output_columnar_hint = true;
2166  co_delete.filter_on_deleted_column =
2167  false; // project the entire delete column for columnar update
2168  } else {
2169  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2170  }
2171 
2172  try {
2173  auto table_update_metadata =
2174  executor_->executeUpdate(exe_unit,
2175  table_infos,
2176  table_descriptor,
2177  co_delete,
2178  eo,
2179  cat_,
2180  executor_->row_set_mem_owner_,
2181  delete_callback,
2182  is_aggregate);
2183  post_execution_callback_ = [table_update_metadata, this]() {
2184  dml_transaction_parameters_->finalizeTransaction(cat_);
2185  TableOptimizer table_optimizer{
2186  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2187  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2188  };
2189  } catch (const QueryExecutionError& e) {
2190  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2191  }
2192  };
2193 
2194  if (table_is_temporary(table_descriptor)) {
2195  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2196  auto cd = cat_.getDeletedColumn(table_descriptor);
2197  CHECK(cd);
2198  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2199  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2200  const auto rewritten_exe_unit =
2201  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2202  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2203  } else {
2204  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2205  }
2206  };
2207 
2208  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2209  const auto work_unit =
2210  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2211  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2212  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2213  auto work_unit =
2214  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2215  if (project->isSimple()) {
2216  CHECK_EQ(size_t(1), project->inputCount());
2217  const auto input_ra = project->getInput(0);
2218  if (dynamic_cast<const RelSort*>(input_ra)) {
2219  const auto& input_table =
2220  get_temporary_table(&temporary_tables_, -input_ra->getId());
2221  CHECK(input_table);
2222  work_unit.exe_unit.scan_limit = input_table->rowCount();
2223  }
2224  }
2225  execute_delete_for_node(project, work_unit, false);
2226  } else {
2227  throw std::runtime_error("Unsupported parent node for delete: " +
2228  node->toString(RelRexToStringConfig::defaults()));
2229  }
2230 }
2231 
2233  const CompilationOptions& co,
2234  const ExecutionOptions& eo,
2235  RenderInfo* render_info,
2236  const int64_t queue_time_ms) {
2237  auto timer = DEBUG_TIMER(__func__);
2238  const auto work_unit =
2239  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2240  CompilationOptions co_compound = co;
2241  return executeWorkUnit(work_unit,
2242  compound->getOutputMetainfo(),
2243  compound->isAggregate(),
2244  co_compound,
2245  eo,
2246  render_info,
2247  queue_time_ms);
2248 }
2249 
2251  const CompilationOptions& co,
2252  const ExecutionOptions& eo,
2253  RenderInfo* render_info,
2254  const int64_t queue_time_ms) {
2255  auto timer = DEBUG_TIMER(__func__);
2256  const auto work_unit = createAggregateWorkUnit(
2257  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2258  return executeWorkUnit(work_unit,
2259  aggregate->getOutputMetainfo(),
2260  true,
2261  co,
2262  eo,
2263  render_info,
2264  queue_time_ms);
2265 }
2266 
2267 namespace {
2268 
2269 // Returns true iff the execution unit contains window functions.
2271  return std::any_of(ra_exe_unit.target_exprs.begin(),
2272  ra_exe_unit.target_exprs.end(),
2273  [](const Analyzer::Expr* expr) {
2274  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2275  });
2276 }
2277 
2278 } // namespace
2279 
2281  const RelProject* project,
2282  const CompilationOptions& co,
2283  const ExecutionOptions& eo,
2284  RenderInfo* render_info,
2285  const int64_t queue_time_ms,
2286  const std::optional<size_t> previous_count) {
2287  auto timer = DEBUG_TIMER(__func__);
2288  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2289  CompilationOptions co_project = co;
2290  if (project->isSimple()) {
2291  CHECK_EQ(size_t(1), project->inputCount());
2292  const auto input_ra = project->getInput(0);
2293  if (dynamic_cast<const RelSort*>(input_ra)) {
2294  co_project.device_type = ExecutorDeviceType::CPU;
2295  const auto& input_table =
2296  get_temporary_table(&temporary_tables_, -input_ra->getId());
2297  CHECK(input_table);
2298  work_unit.exe_unit.scan_limit =
2299  std::min(input_table->getLimit(), input_table->rowCount());
2300  }
2301  }
2302  return executeWorkUnit(work_unit,
2303  project->getOutputMetainfo(),
2304  false,
2305  co_project,
2306  eo,
2307  render_info,
2308  queue_time_ms,
2309  previous_count);
2310 }
2311 
2313  const CompilationOptions& co_in,
2314  const ExecutionOptions& eo,
2315  const int64_t queue_time_ms) {
2317  auto timer = DEBUG_TIMER(__func__);
2318 
2319  auto co = co_in;
2320 
2321  if (g_cluster) {
2322  throw std::runtime_error("Table functions not supported in distributed mode yet");
2323  }
2324  if (!g_enable_table_functions) {
2325  throw std::runtime_error("Table function support is disabled");
2326  }
2327  auto table_func_work_unit = createTableFunctionWorkUnit(
2328  table_func,
2329  eo.just_explain,
2330  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2331  const auto body = table_func_work_unit.body;
2332  CHECK(body);
2333 
2334  const auto table_infos =
2335  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2336 
2337  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2338  co.device_type,
2340  nullptr,
2341  executor_->getCatalog(),
2342  executor_->blockSize(),
2343  executor_->gridSize()),
2344  {}};
2345 
2346  auto global_hint = getGlobalQueryHint();
2347  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2348  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2349  auto cached_resultset =
2350  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2351  table_func->getQueryPlanDagHash());
2352  if (cached_resultset) {
2353  VLOG(1) << "recycle table function's resultset of the root node "
2354  << table_func->getRelNodeDagId() << " from resultset cache";
2355  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2356  addTemporaryTable(-body->getId(), result.getDataPtr());
2357  return result;
2358  }
2359  }
2360 
2361  auto query_exec_time_begin = timer_start();
2362  try {
2363  result = {executor_->executeTableFunction(
2364  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
2365  body->getOutputMetainfo()};
2366  } catch (const QueryExecutionError& e) {
2369  throw std::runtime_error("Table function ran out of memory during execution");
2370  }
2371  auto query_exec_time = timer_stop(query_exec_time_begin);
2372  result.setQueueTime(queue_time_ms);
2373  auto resultset_ptr = result.getDataPtr();
2374  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2376  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2378  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2379  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2380  !hasStepForUnion()) {
2381  resultset_ptr->setExecTime(query_exec_time);
2382  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2383  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2384  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2385  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2386  if (allow_auto_caching_resultset) {
2387  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2388  }
2389  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2390  table_func_work_unit.exe_unit.query_plan_dag_hash,
2391  resultset_ptr->getInputTableKeys(),
2392  resultset_ptr,
2393  resultset_ptr->getBufferSizeBytes(co.device_type),
2395  } else {
2396  if (eo.keep_result) {
2397  if (g_cluster) {
2398  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2399  "support resultset recycling on distributed mode";
2400  } else if (hasStepForUnion()) {
2401  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2402  "has union-(all) operator";
2403  } else if (is_validate_or_explain_query(eo)) {
2404  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2405  "is either validate or explain query";
2406  } else {
2407  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2408  }
2409  }
2410  }
2411 
2412  return result;
2413 }
2414 
2415 namespace {
2416 
2417 // Creates a new expression which has the range table index set to 1. This is needed to
2418 // reuse the hash join construction helpers to generate a hash table for the window
2419 // function partition: create an equals expression with left and right sides identical
2420 // except for the range table index.
2421 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2422  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2423  if (tuple) {
2424  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2425  for (const auto& element : tuple->getTuple()) {
2426  transformed_tuple.push_back(transform_to_inner(element.get()));
2427  }
2428  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2429  }
2430  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2431  if (!col) {
2432  throw std::runtime_error("Only columns supported in the window partition for now");
2433  }
2434  return makeExpr<Analyzer::ColumnVar>(
2435  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2436 }
2437 
2438 } // namespace
2439 
2441  const CompilationOptions& co,
2442  const ExecutionOptions& eo,
2443  ColumnCacheMap& column_cache_map,
2444  const int64_t queue_time_ms) {
2445  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2446  CHECK_EQ(query_infos.size(), size_t(1));
2447  if (query_infos.front().info.fragments.size() != 1) {
2448  throw std::runtime_error(
2449  "Only single fragment tables supported for window functions for now");
2450  }
2451  if (eo.executor_type == ::ExecutorType::Extern) {
2452  return;
2453  }
2454  query_infos.push_back(query_infos.front());
2455  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2456  // a query may hold multiple window functions having the same partition by condition
2457  // then after building the first hash partition we can reuse it for the rest of
2458  // the window functions
2459  // here, a cached partition can be shared via multiple window function contexts as is
2460  // but sorted partition should be copied to reuse since we use it for (intermediate)
2461  // output buffer
2462  // todo (yoonmin) : support recycler for window function computation?
2463  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2464  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2465  sorted_partition_cache;
2466  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2467  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2468  window_function_context_map;
2469  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2470  ++target_index) {
2471  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2472  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2473  if (!window_func) {
2474  continue;
2475  }
2476  // Always use baseline layout hash tables for now, make the expression a tuple.
2477  const auto& partition_keys = window_func->getPartitionKeys();
2478  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2479  if (partition_keys.size() >= 1) {
2480  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2481  if (partition_keys.size() > 1) {
2482  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2483  } else {
2484  CHECK_EQ(partition_keys.size(), size_t(1));
2485  partition_key_tuple = partition_keys.front();
2486  }
2487  // Creates a tautology equality with the partition expression on both sides.
2488  partition_key_cond =
2489  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2490  kBW_EQ,
2491  kONE,
2492  partition_key_tuple,
2493  transform_to_inner(partition_key_tuple.get()));
2494  }
2495  auto context =
2496  createWindowFunctionContext(window_func,
2497  partition_key_cond /*nullptr if no partition key*/,
2498  partition_cache,
2499  sorted_partition_key_ref_count_map,
2500  work_unit,
2501  query_infos,
2502  co,
2503  column_cache_map,
2504  executor_->getRowSetMemoryOwner());
2505  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2506  }
2507 
2508  for (auto& kv : window_function_context_map) {
2509  kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2510  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2511  }
2512 }
2513 
2514 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2515  const Analyzer::WindowFunction* window_func,
2516  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2517  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2518  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2519  const WorkUnit& work_unit,
2520  const std::vector<InputTableInfo>& query_infos,
2521  const CompilationOptions& co,
2522  ColumnCacheMap& column_cache_map,
2523  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2524  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2525  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2528  std::unique_ptr<WindowFunctionContext> context;
2529  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2530  if (partition_key_cond) {
2531  auto partition_cond_str = partition_key_cond->toString();
2532  auto partition_key_hash = boost::hash_value(partition_cond_str);
2533  boost::hash_combine(partition_cache_key, partition_key_hash);
2534  std::shared_ptr<HashJoin> partition_ptr;
2535  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2536  if (cached_hash_table_it != partition_cache.end()) {
2537  partition_ptr = cached_hash_table_it->second;
2538  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2539  << partition_cache_key << ", partition condition: " << partition_cond_str
2540  << ")";
2541  } else {
2542  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2543  partition_key_cond,
2544  query_infos,
2545  memory_level,
2546  JoinType::INVALID, // for window function
2548  column_cache_map,
2550  work_unit.exe_unit.query_hint,
2551  work_unit.exe_unit.table_id_to_node_map);
2552  if (!hash_table_or_err.fail_reason.empty()) {
2553  throw std::runtime_error(hash_table_or_err.fail_reason);
2554  }
2555  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2556  partition_ptr = hash_table_or_err.hash_table;
2557  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2558  .second);
2559  VLOG(1) << "Put a generated hash table for computing window function context to "
2560  "cache (key: "
2561  << partition_cache_key << ", partition condition: " << partition_cond_str
2562  << ")";
2563  }
2564  CHECK(partition_ptr);
2565  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2566  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2567  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2568  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2569  }
2570  context = std::make_unique<WindowFunctionContext>(window_func,
2571  partition_cache_key,
2572  partition_ptr,
2573  elem_count,
2574  co.device_type,
2575  row_set_mem_owner,
2576  aggregate_tree_fanout);
2577  } else {
2578  context = std::make_unique<WindowFunctionContext>(
2579  window_func, elem_count, co.device_type, row_set_mem_owner);
2580  }
2581  const auto& order_keys = window_func->getOrderKeys();
2582  if (!order_keys.empty()) {
2583  auto sorted_partition_cache_key = partition_cache_key;
2584  for (auto& order_key : order_keys) {
2585  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2586  }
2587  for (auto& collation : window_func->getCollation()) {
2588  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2589  }
2590  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2591  auto cache_key_cnt_it =
2592  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2593  if (!cache_key_cnt_it.second) {
2594  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2595  cache_key_cnt_it.first->second + 1;
2596  }
2597 
2598  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2599  for (const auto& order_key : order_keys) {
2600  const auto order_col =
2601  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2602  if (!order_col) {
2603  throw std::runtime_error("Only order by columns supported for now");
2604  }
2605  const int8_t* column;
2606  size_t join_col_elem_count;
2607  std::tie(column, join_col_elem_count) =
2609  *order_col,
2610  query_infos.front().info.fragments.front(),
2611  memory_level,
2612  0,
2613  nullptr,
2614  /*thread_idx=*/0,
2615  chunks_owner,
2616  column_cache_map);
2617 
2618  CHECK_EQ(join_col_elem_count, elem_count);
2619  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2620  }
2621  }
2622  if (context->getWindowFunction()->hasFraming()) {
2623  // todo (yoonmin) : if we try to support generic window function expression without
2624  // extra project node, we need to revisit here b/c the current logic assumes that
2625  // window function expression has a single input source
2626  auto& window_function_expression_args = window_func->getArgs();
2627  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2628  for (auto& expr : window_function_expression_args) {
2629  if (const auto arg_col_var =
2630  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2631  auto const [column, join_col_elem_count] = ColumnFetcher::getOneColumnFragment(
2632  executor_,
2633  *arg_col_var,
2634  query_infos.front().info.fragments.front(),
2635  memory_level,
2636  0,
2637  nullptr,
2638  /*thread_idx=*/0,
2639  chunks_owner,
2640  column_cache_map);
2641 
2642  CHECK_EQ(join_col_elem_count, elem_count);
2643  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2644  }
2645  }
2646  }
2647  return context;
2648 }
2649 
2651  const CompilationOptions& co,
2652  const ExecutionOptions& eo,
2653  RenderInfo* render_info,
2654  const int64_t queue_time_ms) {
2655  auto timer = DEBUG_TIMER(__func__);
2656  const auto work_unit =
2657  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2658  return executeWorkUnit(
2659  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2660 }
2661 
2662 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2663  std::vector<TargetMetaInfo> const& rhs) {
2664  if (lhs.size() == rhs.size()) {
2665  for (size_t i = 0; i < lhs.size(); ++i) {
2666  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2667  return false;
2668  }
2669  }
2670  return true;
2671  }
2672  return false;
2673 }
2674 
2675 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2676  return target_meta_info.get_type_info().is_geometry();
2677 }
2678 
2680  const RaExecutionSequence& seq,
2681  const CompilationOptions& co,
2682  const ExecutionOptions& eo,
2683  RenderInfo* render_info,
2684  const int64_t queue_time_ms) {
2685  auto timer = DEBUG_TIMER(__func__);
2686  if (!logical_union->isAll()) {
2687  throw std::runtime_error("UNION without ALL is not supported yet.");
2688  }
2689  // Will throw a std::runtime_error if types don't match.
2690  logical_union->checkForMatchingMetaInfoTypes();
2691  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2692  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2693  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2694  }
2695  auto work_unit =
2696  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2697  return executeWorkUnit(work_unit,
2698  logical_union->getOutputMetainfo(),
2699  false,
2701  eo,
2702  render_info,
2703  queue_time_ms);
2704 }
2705 
2707  const RelLogicalValues* logical_values,
2708  const ExecutionOptions& eo) {
2709  auto timer = DEBUG_TIMER(__func__);
2711  logical_values->getNumRows(),
2713  /*is_table_function=*/false);
2714 
2715  auto tuple_type = logical_values->getTupleType();
2716  for (size_t i = 0; i < tuple_type.size(); ++i) {
2717  auto& target_meta_info = tuple_type[i];
2718  if (target_meta_info.get_type_info().is_varlen()) {
2719  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2720  }
2721  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2722  // replace w/ bigint
2723  tuple_type[i] =
2724  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2725  }
2726  query_mem_desc.addColSlotInfo(
2727  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2728  }
2729  logical_values->setOutputMetainfo(tuple_type);
2730 
2731  std::vector<TargetInfo> target_infos;
2732  for (const auto& tuple_type_component : tuple_type) {
2733  target_infos.emplace_back(TargetInfo{false,
2734  kCOUNT,
2735  tuple_type_component.get_type_info(),
2736  SQLTypeInfo(kNULLT, false),
2737  false,
2738  false,
2739  /*is_varlen_projection=*/false});
2740  }
2741 
2742  std::shared_ptr<ResultSet> rs{
2743  ResultSetLogicalValuesBuilder{logical_values,
2744  target_infos,
2747  executor_->getRowSetMemoryOwner(),
2748  executor_}
2749  .build()};
2750 
2751  return {rs, tuple_type};
2752 }
2753 
2754 namespace {
2755 
2756 template <class T>
2757 int64_t insert_one_dict_str(T* col_data,
2758  const std::string& columnName,
2759  const SQLTypeInfo& columnType,
2760  const Analyzer::Constant* col_cv,
2761  const Catalog_Namespace::Catalog& catalog) {
2762  if (col_cv->get_is_null()) {
2763  *col_data = inline_fixed_encoding_null_val(columnType);
2764  } else {
2765  const int dict_id = columnType.get_comp_param();
2766  const auto col_datum = col_cv->get_constval();
2767  const auto& str = *col_datum.stringval;
2768  const auto dd = catalog.getMetadataForDict(dict_id);
2769  CHECK(dd && dd->stringDict);
2770  int32_t str_id = dd->stringDict->getOrAdd(str);
2771  if (!dd->dictIsTemp) {
2772  const auto checkpoint_ok = dd->stringDict->checkpoint();
2773  if (!checkpoint_ok) {
2774  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2775  columnName);
2776  }
2777  }
2778  const bool invalid = str_id > max_valid_int_value<T>();
2779  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2780  if (invalid) {
2781  LOG(ERROR) << "Could not encode string: " << str
2782  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2783  << " bits. Will store NULL instead.";
2784  }
2785  str_id = inline_fixed_encoding_null_val(columnType);
2786  }
2787  *col_data = str_id;
2788  }
2789  return *col_data;
2790 }
2791 
2792 template <class T>
2793 int64_t insert_one_dict_str(T* col_data,
2794  const ColumnDescriptor* cd,
2795  const Analyzer::Constant* col_cv,
2796  const Catalog_Namespace::Catalog& catalog) {
2797  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2798 }
2799 
2800 } // namespace
2801 
2803  const ExecutionOptions& eo) {
2804  auto timer = DEBUG_TIMER(__func__);
2805  if (eo.just_explain) {
2806  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2807  }
2808 
2809  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2812  executor_->getRowSetMemoryOwner(),
2813  executor_->getCatalog(),
2814  executor_->blockSize(),
2815  executor_->gridSize());
2816 
2817  std::vector<TargetMetaInfo> empty_targets;
2818  return {rs, empty_targets};
2819 }
2820 
2822  const Analyzer::Query& query,
2824  const Catalog_Namespace::SessionInfo& session) {
2825  // Note: We currently obtain an executor for this method, but we do not need it.
2826  // Therefore, we skip the executor state setup in the regular execution path. In the
2827  // future, we will likely want to use the executor to evaluate expressions in the insert
2828  // statement.
2829 
2830  const auto& values_lists = query.get_values_lists();
2831  const int table_id = query.get_result_table_id();
2832  const auto& col_id_list = query.get_result_col_list();
2833  size_t rows_number = values_lists.size();
2834  size_t leaf_count = inserter.getLeafCount();
2835  const auto td = cat_.getMetadataForTable(table_id);
2836  CHECK(td);
2837  size_t rows_per_leaf = rows_number;
2838  if (td->nShards == 0) {
2839  rows_per_leaf =
2840  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2841  }
2842  auto max_number_of_rows_per_package =
2843  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2844 
2845  std::vector<const ColumnDescriptor*> col_descriptors;
2846  std::vector<int> col_ids;
2847  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2848  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2849  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2850  std::unordered_map<int, int> sequential_ids;
2851 
2852  for (const int col_id : col_id_list) {
2853  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2854  const auto col_enc = cd->columnType.get_compression();
2855  if (cd->columnType.is_string()) {
2856  switch (col_enc) {
2857  case kENCODING_NONE: {
2858  auto it_ok =
2859  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2860  CHECK(it_ok.second);
2861  break;
2862  }
2863  case kENCODING_DICT: {
2864  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2865  CHECK(dd);
2866  const auto it_ok = col_buffers.emplace(
2867  col_id,
2868  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2869  max_number_of_rows_per_package));
2870  CHECK(it_ok.second);
2871  break;
2872  }
2873  default:
2874  CHECK(false);
2875  }
2876  } else if (cd->columnType.is_geometry()) {
2877  auto it_ok =
2878  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2879  CHECK(it_ok.second);
2880  } else if (cd->columnType.is_array()) {
2881  auto it_ok =
2882  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2883  CHECK(it_ok.second);
2884  } else {
2885  const auto it_ok = col_buffers.emplace(
2886  col_id,
2887  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2888  max_number_of_rows_per_package]()));
2889  CHECK(it_ok.second);
2890  }
2891  col_descriptors.push_back(cd);
2892  sequential_ids[col_id] = col_ids.size();
2893  col_ids.push_back(col_id);
2894  }
2895 
2896  // mark the target table's cached item as dirty
2897  std::vector<int> table_chunk_key_prefix{cat_.getCurrentDB().dbId, table_id};
2898  auto table_key = boost::hash_value(table_chunk_key_prefix);
2901 
2902  size_t start_row = 0;
2903  size_t rows_left = rows_number;
2904  while (rows_left != 0) {
2905  // clear the buffers
2906  for (const auto& kv : col_buffers) {
2907  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2908  }
2909  for (auto& kv : str_col_buffers) {
2910  kv.second.clear();
2911  }
2912  for (auto& kv : arr_col_buffers) {
2913  kv.second.clear();
2914  }
2915 
2916  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2917  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2918  // more efficient to do the loops below column by column instead of row by row.
2919  // But for now I consider such a refactoring not worth investigating, as we have more
2920  // efficient ways to insert many rows anyway.
2921  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2922  const auto& values_list = values_lists[row_idx + start_row];
2923  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2924  CHECK(values_list.size() == col_descriptors.size());
2925  auto col_cv =
2926  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2927  if (!col_cv) {
2928  auto col_cast =
2929  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2930  CHECK(col_cast);
2931  CHECK_EQ(kCAST, col_cast->get_optype());
2932  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2933  }
2934  CHECK(col_cv);
2935  const auto cd = col_descriptors[col_idx];
2936  auto col_datum = col_cv->get_constval();
2937  auto col_type = cd->columnType.get_type();
2938  uint8_t* col_data_bytes{nullptr};
2939  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2940  (!cd->columnType.is_string() ||
2941  cd->columnType.get_compression() == kENCODING_DICT)) {
2942  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2943  CHECK(col_data_bytes_it != col_buffers.end());
2944  col_data_bytes = col_data_bytes_it->second.get();
2945  }
2946  switch (col_type) {
2947  case kBOOLEAN: {
2948  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2949  auto null_bool_val =
2950  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2951  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2952  ? inline_fixed_encoding_null_val(cd->columnType)
2953  : (col_datum.boolval ? 1 : 0);
2954  break;
2955  }
2956  case kTINYINT: {
2957  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2958  col_data[row_idx] = col_cv->get_is_null()
2959  ? inline_fixed_encoding_null_val(cd->columnType)
2960  : col_datum.tinyintval;
2961  break;
2962  }
2963  case kSMALLINT: {
2964  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2965  col_data[row_idx] = col_cv->get_is_null()
2966  ? inline_fixed_encoding_null_val(cd->columnType)
2967  : col_datum.smallintval;
2968  break;
2969  }
2970  case kINT: {
2971  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2972  col_data[row_idx] = col_cv->get_is_null()
2973  ? inline_fixed_encoding_null_val(cd->columnType)
2974  : col_datum.intval;
2975  break;
2976  }
2977  case kBIGINT:
2978  case kDECIMAL:
2979  case kNUMERIC: {
2980  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2981  col_data[row_idx] = col_cv->get_is_null()
2982  ? inline_fixed_encoding_null_val(cd->columnType)
2983  : col_datum.bigintval;
2984  break;
2985  }
2986  case kFLOAT: {
2987  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2988  col_data[row_idx] = col_datum.floatval;
2989  break;
2990  }
2991  case kDOUBLE: {
2992  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2993  col_data[row_idx] = col_datum.doubleval;
2994  break;
2995  }
2996  case kTEXT:
2997  case kVARCHAR:
2998  case kCHAR: {
2999  switch (cd->columnType.get_compression()) {
3000  case kENCODING_NONE:
3001  str_col_buffers[col_ids[col_idx]].push_back(
3002  col_datum.stringval ? *col_datum.stringval : "");
3003  break;
3004  case kENCODING_DICT: {
3005  switch (cd->columnType.get_size()) {
3006  case 1:
3008  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3009  cd,
3010  col_cv,
3011  cat_);
3012  break;
3013  case 2:
3015  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3016  cd,
3017  col_cv,
3018  cat_);
3019  break;
3020  case 4:
3022  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3023  cd,
3024  col_cv,
3025  cat_);
3026  break;
3027  default:
3028  CHECK(false);
3029  }
3030  break;
3031  }
3032  default:
3033  CHECK(false);
3034  }
3035  break;
3036  }
3037  case kTIME:
3038  case kTIMESTAMP:
3039  case kDATE: {
3040  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3041  col_data[row_idx] = col_cv->get_is_null()
3042  ? inline_fixed_encoding_null_val(cd->columnType)
3043  : col_datum.bigintval;
3044  break;
3045  }
3046  case kARRAY: {
3047  const auto is_null = col_cv->get_is_null();
3048  const auto size = cd->columnType.get_size();
3049  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3050  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3051  const auto is_point_coords =
3052  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3053  if (is_null && !is_point_coords) {
3054  if (size > 0) {
3055  int8_t* buf = (int8_t*)checked_malloc(size);
3056  put_null_array(static_cast<void*>(buf), elem_ti, "");
3057  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3058  p += elem_ti.get_size()) {
3059  put_null(static_cast<void*>(p), elem_ti, "");
3060  }
3061  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3062  } else {
3063  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3064  }
3065  break;
3066  }
3067  const auto l = col_cv->get_value_list();
3068  size_t len = l.size() * elem_ti.get_size();
3069  if (size > 0 && static_cast<size_t>(size) != len) {
3070  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3071  std::to_string(size / elem_ti.get_size()) +
3072  " values, " + "received " +
3073  std::to_string(l.size()));
3074  }
3075  if (elem_ti.is_string()) {
3076  CHECK(kENCODING_DICT == elem_ti.get_compression());
3077  CHECK(4 == elem_ti.get_size());
3078 
3079  int8_t* buf = (int8_t*)checked_malloc(len);
3080  int32_t* p = reinterpret_cast<int32_t*>(buf);
3081 
3082  int elemIndex = 0;
3083  for (auto& e : l) {
3084  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3085  CHECK(c);
3087  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
3088  elemIndex++;
3089  }
3090  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3091 
3092  } else {
3093  int8_t* buf = (int8_t*)checked_malloc(len);
3094  int8_t* p = buf;
3095  for (auto& e : l) {
3096  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3097  CHECK(c);
3098  p = append_datum(p, c->get_constval(), elem_ti);
3099  CHECK(p);
3100  }
3101  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3102  }
3103  break;
3104  }
3105  case kPOINT:
3106  case kMULTIPOINT:
3107  case kLINESTRING:
3108  case kMULTILINESTRING:
3109  case kPOLYGON:
3110  case kMULTIPOLYGON:
3111  str_col_buffers[col_ids[col_idx]].push_back(
3112  col_datum.stringval ? *col_datum.stringval : "");
3113  break;
3114  default:
3115  CHECK(false);
3116  }
3117  }
3118  }
3119  start_row += package_size;
3120  rows_left -= package_size;
3121 
3123  insert_data.databaseId = cat_.getCurrentDB().dbId;
3124  insert_data.tableId = table_id;
3125  insert_data.data.resize(col_ids.size());
3126  insert_data.columnIds = col_ids;
3127  for (const auto& kv : col_buffers) {
3128  DataBlockPtr p;
3129  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3130  insert_data.data[sequential_ids[kv.first]] = p;
3131  }
3132  for (auto& kv : str_col_buffers) {
3133  DataBlockPtr p;
3134  p.stringsPtr = &kv.second;
3135  insert_data.data[sequential_ids[kv.first]] = p;
3136  }
3137  for (auto& kv : arr_col_buffers) {
3138  DataBlockPtr p;
3139  p.arraysPtr = &kv.second;
3140  insert_data.data[sequential_ids[kv.first]] = p;
3141  }
3142  insert_data.numRows = package_size;
3143  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
3144  inserter.insertData(session, insert_data);
3145  }
3146 
3147  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3150  executor_->getRowSetMemoryOwner(),
3151  nullptr,
3152  0,
3153  0);
3154  std::vector<TargetMetaInfo> empty_targets;
3155  return {rs, empty_targets};
3156 }
3157 
3158 namespace {
3159 
3160 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
3161  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3162  if (aggregate) {
3163  return 0;
3164  }
3165  const auto compound = dynamic_cast<const RelCompound*>(ra);
3166  return (compound && compound->isAggregate()) ? 0 : limit;
3167 }
3168 
3169 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3170  return !order_entries.empty() && order_entries.front().is_desc;
3171 }
3172 
3173 } // namespace
3174 
3176  const CompilationOptions& co,
3177  const ExecutionOptions& eo,
3178  RenderInfo* render_info,
3179  const int64_t queue_time_ms) {
3180  auto timer = DEBUG_TIMER(__func__);
3182  const auto source = sort->getInput(0);
3183  const bool is_aggregate = node_is_aggregate(source);
3184  auto it = leaf_results_.find(sort->getId());
3185  auto order_entries = get_order_entries(sort);
3186  if (it != leaf_results_.end()) {
3187  // Add any transient string literals to the sdp on the agg
3188  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3189  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3190  executor_->row_set_mem_owner_);
3191  // Handle push-down for LIMIT for multi-node
3192  auto& aggregated_result = it->second;
3193  auto& result_rows = aggregated_result.rs;
3194  const size_t limit = sort->getLimit();
3195  const size_t offset = sort->getOffset();
3196  if (limit || offset) {
3197  if (!order_entries.empty()) {
3198  result_rows->sort(order_entries, limit + offset, executor_);
3199  }
3200  result_rows->dropFirstN(offset);
3201  if (limit) {
3202  result_rows->keepFirstN(limit);
3203  }
3204  }
3205 
3206  if (render_info) {
3207  // We've hit a sort step that is the very last step
3208  // in a distributed render query. We'll fill in the render targets
3209  // since we have all that data needed to do so. This is normally
3210  // done in executeWorkUnit, but that is bypassed in this case.
3211  build_render_targets(*render_info,
3212  source_work_unit.exe_unit.target_exprs,
3213  aggregated_result.targets_meta);
3214  }
3215 
3216  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3217  sort->setOutputMetainfo(aggregated_result.targets_meta);
3218 
3219  return result;
3220  }
3221 
3222  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3223  bool is_desc{false};
3224  bool use_speculative_top_n_sort{false};
3225 
3226  auto execute_sort_query = [this,
3227  sort,
3228  &source,
3229  &is_aggregate,
3230  &eo,
3231  &co,
3232  render_info,
3233  queue_time_ms,
3234  &groupby_exprs,
3235  &is_desc,
3236  &order_entries,
3237  &use_speculative_top_n_sort]() -> ExecutionResult {
3238  const size_t limit = sort->getLimit();
3239  const size_t offset = sort->getOffset();
3240  // check whether sort's input is cached
3241  auto source_node = sort->getInput(0);
3242  CHECK(source_node);
3243  ExecutionResult source_result{nullptr, {}};
3244  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3245  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3246  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3247  !sort->isEmptyResult()) {
3248  if (auto cached_resultset =
3249  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3250  source_query_plan_dag)) {
3251  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3252  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3253  << " from resultset cache";
3254  source_result =
3255  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3256  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3257  addTemporaryTable(-source_node->getId(), cached_resultset);
3258  }
3259  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3260  co.device_type == ExecutorDeviceType::GPU;
3261  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3262  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3263  }
3264  }
3265  if (!source_result.getDataPtr()) {
3266  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3267  is_desc = first_oe_is_desc(order_entries);
3268  ExecutionOptions eo_copy = {
3270  eo.keep_result,
3271  eo.allow_multifrag,
3272  eo.just_explain,
3273  eo.allow_loop_joins,
3274  eo.with_watchdog,
3275  eo.jit_debug,
3276  eo.just_validate || sort->isEmptyResult(),
3277  eo.with_dynamic_watchdog,
3278  eo.dynamic_watchdog_time_limit,
3279  eo.find_push_down_candidates,
3280  eo.just_calcite_explain,
3281  eo.gpu_input_mem_limit_percent,
3282  eo.allow_runtime_query_interrupt,
3283  eo.running_query_interrupt_freq,
3284  eo.pending_query_interrupt_freq,
3285  eo.executor_type,
3286  };
3287 
3288  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3289  source_result = executeWorkUnit(source_work_unit,
3290  source->getOutputMetainfo(),
3291  is_aggregate,
3292  co,
3293  eo_copy,
3294  render_info,
3295  queue_time_ms);
3296  use_speculative_top_n_sort =
3297  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3298  use_speculative_top_n(source_work_unit.exe_unit,
3299  source_result.getRows()->getQueryMemDesc());
3300  }
3301  if (render_info && render_info->isInSitu()) {
3302  return source_result;
3303  }
3304  if (source_result.isFilterPushDownEnabled()) {
3305  return source_result;
3306  }
3307  auto rows_to_sort = source_result.getRows();
3308  if (eo.just_explain) {
3309  return {rows_to_sort, {}};
3310  }
3311  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3312  !use_speculative_top_n_sort) {
3313  const size_t top_n = limit == 0 ? 0 : limit + offset;
3314  rows_to_sort->sort(order_entries, top_n, executor_);
3315  }
3316  if (limit || offset) {
3317  if (g_cluster && sort->collationCount() == 0) {
3318  if (offset >= rows_to_sort->rowCount()) {
3319  rows_to_sort->dropFirstN(offset);
3320  } else {
3321  rows_to_sort->keepFirstN(limit + offset);
3322  }
3323  } else {
3324  rows_to_sort->dropFirstN(offset);
3325  if (limit) {
3326  rows_to_sort->keepFirstN(limit);
3327  }
3328  }
3329  }
3330  return {rows_to_sort, source_result.getTargetsMeta()};
3331  };
3332 
3333  try {
3334  return execute_sort_query();
3335  } catch (const SpeculativeTopNFailed& e) {
3336  CHECK_EQ(size_t(1), groupby_exprs.size());
3337  CHECK(groupby_exprs.front());
3338  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3339  return execute_sort_query();
3340  }
3341 }
3342 
3344  const RelSort* sort,
3345  std::list<Analyzer::OrderEntry>& order_entries,
3346  const ExecutionOptions& eo) {
3347  const auto source = sort->getInput(0);
3348  const size_t limit = sort->getLimit();
3349  const size_t offset = sort->getOffset();
3350  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3351  const size_t scan_total_limit =
3352  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3353  size_t max_groups_buffer_entry_guess{
3354  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3356  SortInfo sort_info{
3357  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3358  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3359  const auto& source_exe_unit = source_work_unit.exe_unit;
3360 
3361  // we do not allow sorting geometry or array types
3362  for (auto order_entry : order_entries) {
3363  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3364  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3365  const auto& ti = get_target_info(te, false);
3366  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3367  throw std::runtime_error(
3368  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3369  }
3370  }
3371 
3372  if (source_exe_unit.groupby_exprs.size() == 1) {
3373  if (!source_exe_unit.groupby_exprs.front()) {
3374  sort_algorithm = SortAlgorithm::StreamingTopN;
3375  } else {
3376  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3377  first_oe_is_desc(order_entries))) {
3378  sort_algorithm = SortAlgorithm::Default;
3379  }
3380  }
3381  }
3382 
3383  sort->setOutputMetainfo(source->getOutputMetainfo());
3384  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3385  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3386  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3387  std::move(source_exe_unit.input_col_descs),
3388  source_exe_unit.simple_quals,
3389  source_exe_unit.quals,
3390  source_exe_unit.join_quals,
3391  source_exe_unit.groupby_exprs,
3392  source_exe_unit.target_exprs,
3393  source_exe_unit.target_exprs_original_type_infos,
3394  nullptr,
3395  {sort_info.order_entries,
3396  sort_algorithm,
3397  limit,
3398  offset,
3399  sort_info.limit_delivered},
3400  scan_total_limit,
3401  source_exe_unit.query_hint,
3402  source_exe_unit.query_plan_dag_hash,
3403  source_exe_unit.hash_table_build_plan_dag,
3404  source_exe_unit.table_id_to_node_map,
3405  source_exe_unit.use_bump_allocator,
3406  source_exe_unit.union_all,
3407  source_exe_unit.query_state},
3408  source,
3409  max_groups_buffer_entry_guess,
3410  std::move(source_work_unit.query_rewriter),
3411  source_work_unit.input_permutation,
3412  source_work_unit.left_deep_join_input_sizes};
3413 }
3414 
3415 namespace {
3416 
3423 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
3424  CHECK(!table_infos.empty());
3425  const auto& first_table = table_infos.front();
3426  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3427  for (const auto& table_info : table_infos) {
3428  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3429  max_num_groups = table_info.info.getNumTuplesUpperBound();
3430  }
3431  }
3432  return std::max(max_num_groups, size_t(1));
3433 }
3434 
3435 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3436  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3437 }
3438 
3440  const RenderInfo* render_info,
3441  const RelAlgNode* body) {
3442  if (!is_projection(ra_exe_unit)) {
3443  return false;
3444  }
3445  if (render_info && render_info->isInSitu()) {
3446  return false;
3447  }
3448  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3449  // disable output columnar when we have top-sort node query
3450  return false;
3451  }
3452  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3453  // We don't currently support varlen columnar projections, so
3454  // return false if we find one
3455  if (target_expr->get_type_info().is_varlen()) {
3456  return false;
3457  }
3458  }
3459  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3460  if (top_project->isRowwiseOutputForced()) {
3461  return false;
3462  }
3463  }
3464  return true;
3465 }
3466 
3470 }
3471 
3479  for (const auto target_expr : ra_exe_unit.target_exprs) {
3480  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3481  return false;
3482  }
3483  }
3484  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
3485  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
3486  return true;
3487  }
3488  return false;
3489 }
3490 
3491 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3492  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3493  ra_exe_unit.simple_quals.empty());
3494 }
3495 
3497  const RelAlgExecutionUnit& ra_exe_unit_in,
3498  const std::vector<InputTableInfo>& table_infos,
3499  const Executor* executor,
3500  const ExecutorDeviceType device_type_in,
3501  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3502  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3503  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3504  const auto target_expr = ra_exe_unit.target_exprs[i];
3505  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3506  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3507  continue;
3508  }
3509  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3510  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3511  CHECK(arg);
3512  const auto& arg_ti = arg->get_type_info();
3513  // Avoid calling getExpressionRange for variable length types (string and array),
3514  // it'd trigger an assertion since that API expects to be called only for types
3515  // for which the notion of range is well-defined. A bit of a kludge, but the
3516  // logic to reject these types anyway is at lower levels in the stack and not
3517  // really worth pulling into a separate function for now.
3518  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3519  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3520  continue;
3521  }
3522  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3523  if (arg_range.getType() != ExpressionRangeType::Integer) {
3524  continue;
3525  }
3526  // When running distributed, the threshold for using the precise implementation
3527  // must be consistent across all leaves, otherwise we could have a mix of precise
3528  // and approximate bitmaps and we cannot aggregate them.
3529  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3530  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3531  const auto sub_bitmap_count =
3532  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3533  int64_t approx_bitmap_sz_bits{0};
3534  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3535  if (error_rate) {
3536  CHECK(error_rate->get_type_info().get_type() == kINT);
3537  CHECK_GE(error_rate->get_constval().intval, 1);
3538  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3539  } else {
3540  approx_bitmap_sz_bits = g_hll_precision_bits;
3541  }
3542  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3543  arg_range.getIntMin(),
3544  approx_bitmap_sz_bits,
3545  true,
3546  device_type,
3547  sub_bitmap_count};
3548  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3549  arg_range.getIntMin(),
3550  bitmap_sz_bits,
3551  false,
3552  device_type,
3553  sub_bitmap_count};
3554  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3555  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3556  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3557  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3558  target_exprs_owned.push_back(precise_count_distinct);
3559  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3560  }
3561  }
3562  return ra_exe_unit;
3563 }
3564 
3565 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3566  const CompilationOptions& co,
3567  const ExecutionOptions& eo) {
3569  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3570 }
3571 
3572 } // namespace
3573 
3575  const RelAlgExecutor::WorkUnit& work_unit,
3576  const std::vector<TargetMetaInfo>& targets_meta,
3577  const bool is_agg,
3578  const CompilationOptions& co_in,
3579  const ExecutionOptions& eo_in,
3580  RenderInfo* render_info,
3581  const int64_t queue_time_ms,
3582  const std::optional<size_t> previous_count) {
3584  auto timer = DEBUG_TIMER(__func__);
3585  auto query_exec_time_begin = timer_start();
3586 
3587  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3588  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3589 
3590  auto co = co_in;
3591  auto eo = eo_in;
3592  ColumnCacheMap column_cache;
3593  ScopeGuard clearWindowContextIfNecessary = [&]() {
3594  if (is_window_execution_unit(work_unit.exe_unit)) {
3596  }
3597  };
3598  if (is_window_execution_unit(work_unit.exe_unit)) {
3600  throw std::runtime_error("Window functions support is disabled");
3601  }
3603  co.allow_lazy_fetch = false;
3604  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3605  }
3606  if (!eo.just_explain && eo.find_push_down_candidates) {
3607  // find potential candidates:
3608  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3609  if (!selected_filters.empty() || eo.just_calcite_explain) {
3610  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3611  }
3612  }
3613  if (render_info && render_info->isInSitu()) {
3614  co.allow_lazy_fetch = false;
3615  }
3616  const auto body = work_unit.body;
3617  CHECK(body);
3618  auto it = leaf_results_.find(body->getId());
3619  VLOG(3) << "body->getId()=" << body->getId()
3620  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3621  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3622  if (it != leaf_results_.end()) {
3623  executor_->addTransientStringLiterals(work_unit.exe_unit,
3624  executor_->row_set_mem_owner_);
3625  auto& aggregated_result = it->second;
3626  auto& result_rows = aggregated_result.rs;
3627  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3628  body->setOutputMetainfo(aggregated_result.targets_meta);
3629  if (render_info) {
3630  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3631  }
3632  return result;
3633  }
3634  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3635 
3637  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3638 
3639  // register query hint if query_dag_ is valid
3640  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3641  if (query_dag_) {
3642  auto candidate = query_dag_->getQueryHint(body);
3643  if (candidate) {
3644  ra_exe_unit.query_hint = *candidate;
3645  }
3646  }
3647 
3648  const auto& query_hints = ra_exe_unit.query_hint;
3649  ScopeGuard reset_cuda_block_grid_sizes = [&,
3650  orig_block_size = executor_->blockSize(),
3651  orig_grid_size = executor_->gridSize()]() {
3652  if (cat_.getDataMgr().getCudaMgr()) {
3653  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3654  if (orig_block_size) {
3655  executor_->setBlockSize(orig_block_size);
3656  } else {
3657  executor_->resetBlockSize();
3658  }
3659  }
3660  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3661  if (orig_grid_size) {
3662  executor_->setGridSize(orig_grid_size);
3663  } else {
3664  executor_->resetGridSize();
3665  }
3666  }
3667  }
3668  };
3669 
3671  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3672  if (!cat_.getDataMgr().getCudaMgr()) {
3673  VLOG(1) << "Skip CUDA grid size query hint: cannot detect CUDA device";
3674  } else {
3675  const auto default_grid_size = executor_->gridSize();
3676  const auto new_grid_size =
3677  std::round(default_grid_size * query_hints.cuda_grid_size_multiplier);
3678  if (new_grid_size >= 1) {
3679  VLOG(1) << "Change CUDA grid size: " << default_grid_size
3680  << " (default_grid_size) -> " << new_grid_size
3681  << " (default_grid_size * " << query_hints.cuda_grid_size_multiplier
3682  << ")";
3683  // todo (yoonmin): do we need to check a hard limit?
3684  executor_->setGridSize(new_grid_size);
3685  } else {
3686  VLOG(1) << "Skip CUDA grid size query hint: invalid grid size";
3687  }
3688  }
3689  }
3690  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3691  if (!cat_.getDataMgr().getCudaMgr()) {
3692  VLOG(1) << "Skip CUDA block size query hint: cannot detect CUDA device";
3693  } else {
3694  int cuda_block_size = query_hints.cuda_block_size;
3695  int warp_size = executor_->warpSize();
3696  if (cuda_block_size >= warp_size) {
3697  cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3698  VLOG(1) << "Change CUDA block size w.r.t warp size (" << warp_size
3699  << "): " << executor_->blockSize() << " -> " << cuda_block_size;
3700  } else {
3701  VLOG(1) << "Change CUDA block size: " << executor_->blockSize() << " -> "
3702  << cuda_block_size;
3703  }
3704  executor_->setBlockSize(cuda_block_size);
3705  }
3706  }
3707  }
3708 
3709  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3710  if (is_window_execution_unit(ra_exe_unit)) {
3711  CHECK_EQ(table_infos.size(), size_t(1));
3712  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3713  max_groups_buffer_entry_guess =
3714  table_infos.front().info.fragments.front().getNumTuples();
3715  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3716  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3717  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3718  ra_exe_unit.scan_limit = *previous_count;
3719  } else {
3720  // TODO(adb): enable bump allocator path for render queries
3721  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3722  ra_exe_unit.scan_limit = 0;
3723  ra_exe_unit.use_bump_allocator = true;
3724  } else if (eo.executor_type == ::ExecutorType::Extern) {
3725  ra_exe_unit.scan_limit = 0;
3726  } else if (!eo.just_explain) {
3727  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3728  if (filter_count_all) {
3729  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3730  }
3731  }
3732  }
3733  }
3734 
3735  // when output_columnar_hint is true here, it means either 1) columnar output
3736  // configuration is on or 2) a user hint is given but we have to disable it if some
3737  // requirements are not satisfied
3738  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3739  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3740  VLOG(1) << "Using columnar layout for projection as output size of "
3741  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3743  eo.output_columnar_hint = true;
3744  }
3745  } else {
3746  eo.output_columnar_hint = false;
3747  }
3748 
3749  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3750  co.device_type,
3752  nullptr,
3753  executor_->getCatalog(),
3754  executor_->blockSize(),
3755  executor_->gridSize()),
3756  {}};
3757 
3758  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3759  const bool has_cardinality_estimation,
3760  const bool has_ndv_estimation) -> ExecutionResult {
3761  // Note that the groups buffer entry guess may be modified during query execution.
3762  // Create a local copy so we can track those changes if we need to attempt a retry
3763  // due to OOM
3764  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3765  try {
3766  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3767  is_agg,
3768  table_infos,
3769  ra_exe_unit,
3770  co,
3771  eo,
3772  cat_,
3773  render_info,
3774  has_cardinality_estimation,
3775  column_cache),
3776  targets_meta};
3777  } catch (const QueryExecutionError& e) {
3778  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3779  throw CardinalityEstimationRequired(/*range=*/0);
3780  }
3782  return handleOutOfMemoryRetry(
3783  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3784  targets_meta,
3785  is_agg,
3786  co,
3787  eo,
3788  render_info,
3790  queue_time_ms);
3791  }
3792  };
3793 
3794  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3795  for (const auto& table_info : table_infos) {
3796  const auto td = cat_.getMetadataForTable(table_info.table_id);
3797  if (td && (td->isTemporaryTable() || td->isView)) {
3798  use_resultset_cache = false;
3799  if (eo.keep_result) {
3800  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3801  "temporary table or view";
3802  }
3803  }
3804  }
3805 
3806  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3807  try {
3808  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3809  auto card = cached_cardinality.second;
3810  if (cached_cardinality.first && card >= 0) {
3811  result = execute_and_handle_errors(
3812  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3813  } else {
3814  result = execute_and_handle_errors(
3815  max_groups_buffer_entry_guess,
3817  /*has_ndv_estimation=*/false);
3818  }
3819  } catch (const CardinalityEstimationRequired& e) {
3820  // check the cardinality cache
3821  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3822  auto card = cached_cardinality.second;
3823  if (cached_cardinality.first && card >= 0) {
3824  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3825  } else {
3826  const auto ndv_groups_estimation =
3827  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3828  const auto estimated_groups_buffer_entry_guess =
3829  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3830  : std::min(groups_approx_upper_bound(table_infos),
3832  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3833  result = execute_and_handle_errors(
3834  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3835  if (!(eo.just_validate || eo.just_explain)) {
3836  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3837  }
3838  }
3839  }
3840 
3841  result.setQueueTime(queue_time_ms);
3842  if (render_info) {
3843  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3844  if (render_info->isInSitu()) {
3845  // return an empty result (with the same queue time, and zero render time)
3846  return {std::make_shared<ResultSet>(
3847  queue_time_ms,
3848  0,
3849  executor_->row_set_mem_owner_
3850  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3851  : nullptr),
3852  {}};
3853  }
3854  }
3855 
3856  for (auto& target_info : result.getTargetsMeta()) {
3857  if (target_info.get_type_info().is_string() &&
3858  !target_info.get_type_info().is_dict_encoded_string()) {
3859  // currently, we do not support resultset caching if non-encoded string is projected
3860  use_resultset_cache = false;
3861  if (eo.keep_result) {
3862  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3863  "string column projection";
3864  }
3865  }
3866  }
3867 
3868  const auto res = result.getDataPtr();
3869  auto allow_auto_caching_resultset =
3870  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3871  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3872  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset) &&
3873  !work_unit.exe_unit.sort_info.limit_delivered) {
3874  auto query_exec_time = timer_stop(query_exec_time_begin);
3875  res->setExecTime(query_exec_time);
3876  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3877  res->setTargetMetaInfo(body->getOutputMetainfo());
3878  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3879  res->setInputTableKeys(std::move(input_table_keys));
3880  if (allow_auto_caching_resultset) {
3881  VLOG(1) << "Automatically keep query resultset to recycler";
3882  }
3883  res->setUseSpeculativeTopNSort(
3884  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3885  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3886  ra_exe_unit.query_plan_dag_hash,
3887  res->getInputTableKeys(),
3888  res,
3889  res->getBufferSizeBytes(co.device_type),
3891  } else {
3892  if (eo.keep_result) {
3893  if (g_cluster) {
3894  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3895  "resultset recycling on distributed mode";
3896  } else if (hasStepForUnion()) {
3897  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
3898  "operator";
3899  } else if (render_info && render_info->isInSitu()) {
3900  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
3901  "a in-situ rendering query";
3902  } else if (is_validate_or_explain_query(eo)) {
3903  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
3904  "validate or explain query";
3905  } else {
3906  VLOG(1) << "Query hint \'keep_result\' is ignored";
3907  }
3908  }
3909  }
3910 
3911  return result;
3912 }
3913 
3914 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3915  const bool is_agg,
3916  const CompilationOptions& co,
3917  const ExecutionOptions& eo) {
3918  const auto count =
3919  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3920  kCOUNT,
3921  nullptr,
3922  false,
3923  nullptr);
3924  const auto count_all_exe_unit =
3925  work_unit.exe_unit.createCountAllExecutionUnit(count.get());
3926  size_t one{1};
3927  ResultSetPtr count_all_result;
3928  try {
3929  ColumnCacheMap column_cache;
3930  count_all_result =
3931  executor_->executeWorkUnit(one,
3932  is_agg,
3933  get_table_infos(work_unit.exe_unit, executor_),
3934  count_all_exe_unit,
3935  co,
3936  eo,
3937  cat_,
3938  nullptr,
3939  false,
3940  column_cache);
3941  } catch (const foreign_storage::ForeignStorageException& error) {
3942  throw error;
3943  } catch (const QueryMustRunOnCpu&) {
3944  // force a retry of the top level query on CPU
3945  throw;
3946  } catch (const std::exception& e) {
3947  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3948  return std::nullopt;
3949  }
3950  const auto count_row = count_all_result->getNextRow(false, false);
3951  CHECK_EQ(size_t(1), count_row.size());
3952  const auto& count_tv = count_row.front();
3953  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3954  CHECK(count_scalar_tv);
3955  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3956  CHECK(count_ptr);
3957  CHECK_GE(*count_ptr, 0);
3958  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3959  return std::max(count_upper_bound, size_t(1));
3960 }
3961 
3962 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3963  const auto& ra_exe_unit = work_unit.exe_unit;
3964  if (ra_exe_unit.input_descs.size() != 1) {
3965  return false;
3966  }
3967  const auto& table_desc = ra_exe_unit.input_descs.front();
3968  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3969  return false;
3970  }
3971  const int table_id = table_desc.getTableId();
3972  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3973  const auto comp_expr =
3974  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3975  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3976  return false;
3977  }
3978  const auto lhs = comp_expr->get_left_operand();
3979  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3980  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3981  return false;
3982  }
3983  const auto rhs = comp_expr->get_right_operand();
3984  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3985  if (!rhs_const) {
3986  return false;
3987  }
3988  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3989  if (cd->isVirtualCol) {
3990  CHECK_EQ("rowid", cd->columnName);
3991  return true;
3992  }
3993  }
3994  return false;
3995 }
3996 
3998  const RelAlgExecutor::WorkUnit& work_unit,
3999  const std::vector<TargetMetaInfo>& targets_meta,
4000  const bool is_agg,
4001  const CompilationOptions& co,
4002  const ExecutionOptions& eo,
4003  RenderInfo* render_info,
4004  const bool was_multifrag_kernel_launch,
4005  const int64_t queue_time_ms) {
4006  // Disable the bump allocator
4007  // Note that this will have basically the same affect as using the bump allocator for
4008  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
4009  // path and the bump allocator path for kernel per fragment execution.
4010  auto ra_exe_unit_in = work_unit.exe_unit;
4011  ra_exe_unit_in.use_bump_allocator = false;
4012 
4013  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4014  co.device_type,
4016  nullptr,
4017  executor_->getCatalog(),
4018  executor_->blockSize(),
4019  executor_->gridSize()),
4020  {}};
4021 
4022  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
4023  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
4024  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
4025  eo.keep_result,
4026  false,
4027  false,
4028  eo.allow_loop_joins,
4029  eo.with_watchdog,
4030  eo.jit_debug,
4031  false,
4034  false,
4035  false,
4040  eo.executor_type,
4042 
4043  if (was_multifrag_kernel_launch) {
4044  try {
4045  // Attempt to retry using the kernel per fragment path. The smaller input size
4046  // required may allow the entire kernel to execute in GPU memory.
4047  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
4048  "kernels disabled.";
4049  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
4050  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
4051  ColumnCacheMap column_cache;
4052  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4053  is_agg,
4054  table_infos,
4055  ra_exe_unit,
4056  co,
4057  eo_no_multifrag,
4058  cat_,
4059  nullptr,
4060  true,
4061  column_cache),
4062  targets_meta};
4063  result.setQueueTime(queue_time_ms);
4064  } catch (const QueryExecutionError& e) {
4066  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
4067  }
4068  }
4069 
4070  if (render_info) {
4071  render_info->forceNonInSitu();
4072  }
4073 
4074  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
4075  // Only reset the group buffer entry guess if we ran out of slots, which
4076  // suggests a
4077  // highly pathological input which prevented a good estimation of distinct tuple
4078  // count. For projection queries, this will force a per-fragment scan limit, which is
4079  // compatible with the CPU path
4080  VLOG(1) << "Resetting max groups buffer entry guess.";
4081  max_groups_buffer_entry_guess = 0;
4082 
4083  int iteration_ctr = -1;
4084  while (true) {
4085  iteration_ctr++;
4087  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
4088  ColumnCacheMap column_cache;
4089  try {
4090  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4091  is_agg,
4092  table_infos,
4093  ra_exe_unit,
4094  co_cpu,
4095  eo_no_multifrag,
4096  cat_,
4097  nullptr,
4098  true,
4099  column_cache),
4100  targets_meta};
4101  } catch (const QueryExecutionError& e) {
4102  // Ran out of slots
4103  if (e.getErrorCode() < 0) {
4104  // Even the conservative guess failed; it should only happen when we group
4105  // by a huge cardinality array. Maybe we should throw an exception instead?
4106  // Such a heavy query is entirely capable of exhausting all the host memory.
4107  CHECK(max_groups_buffer_entry_guess);
4108  // Only allow two iterations of increasingly large entry guesses up to a maximum
4109  // of 512MB per column per kernel
4110  if (g_enable_watchdog || iteration_ctr > 1) {
4111  throw std::runtime_error("Query ran out of output slots in the result");
4112  }
4113  max_groups_buffer_entry_guess *= 2;
4114  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4115  "groups buffer entry "
4116  "guess equal to "
4117  << max_groups_buffer_entry_guess;
4118  } else {
4120  }
4121  continue;
4122  }
4123  result.setQueueTime(queue_time_ms);
4124  return result;
4125  }
4126  return result;
4127 }
4128 
4129 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
4130  LOG(ERROR) << "Query execution failed with error "
4131  << getErrorMessageFromCode(error_code);
4132  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
4133  // We ran out of GPU memory, this doesn't count as an error if the query is
4134  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4135  // --allow-cpu-retry.
4136  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4137  if (!g_allow_cpu_retry) {
4138  throw std::runtime_error(
4139  "Query ran out of GPU memory, unable to automatically retry on CPU");
4140  }
4141  return;
4142  }
4143  throw std::runtime_error(getErrorMessageFromCode(error_code));
4144 }
4145 
4146 namespace {
4147 struct ErrorInfo {
4148  const char* code{nullptr};
4149  const char* description{nullptr};
4150 };
4151 ErrorInfo getErrorDescription(const int32_t error_code) {
4152  // 'designated initializers' don't compile on Windows for std 17
4153  // They require /std:c++20. They been removed for the windows port.
4154  switch (error_code) {
4156  return {"ERR_DIV_BY_ZERO", "Division by zero"};
4158  return {"ERR_OUT_OF_GPU_MEM",
4159 
4160  "Query couldn't keep the entire working set of columns in GPU memory"};
4162  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
4164  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
4166  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
4168  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
4170  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
4172  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4173  "Columnar conversion not supported for variable length types"};
4175  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
4177  return {"ERR_STRING_CONST_IN_RESULTSET",
4178 
4179  "NONE ENCODED String types are not supported as input result set."};
4181  return {"ERR_OUT_OF_RENDER_MEM",
4182 
4183  "Insufficient GPU memory for query results in render output buffer "
4184  "sized by render-mem-bytes"};
4186  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4187  "Streaming-Top-N not supported in Render Query"};
4189  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4190  "Multiple distinct values encountered"};
4191  case Executor::ERR_GEOS:
4192  return {"ERR_GEOS", "ERR_GEOS"};
4194  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4195 
4196  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4197  default:
4198  return {nullptr, nullptr};
4199  }
4200 }
4201 
4202 } // namespace
4203 
4204 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
4205  if (error_code < 0) {
4206  return "Ran out of slots in the query output buffer";
4207  }
4208  const auto errorInfo = getErrorDescription(error_code);
4209 
4210  if (errorInfo.code) {
4211  return errorInfo.code + ": "s + errorInfo.description;
4212  } else {
4213  return "Other error: code "s + std::to_string(error_code);
4214  }
4215 }
4216 
4219  VLOG(1) << "Running post execution callback.";
4220  (*post_execution_callback_)();
4221  }
4222 }
4223 
4225  const SortInfo& sort_info,
4226  const ExecutionOptions& eo) {
4227  const auto compound = dynamic_cast<const RelCompound*>(node);
4228  if (compound) {
4229  return createCompoundWorkUnit(compound, sort_info, eo);
4230  }
4231  const auto project = dynamic_cast<const RelProject*>(node);
4232  if (project) {
4233  return createProjectWorkUnit(project, sort_info, eo);
4234  }
4235  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4236  if (aggregate) {
4237  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4238  }
4239  const auto filter = dynamic_cast<const RelFilter*>(node);
4240  if (filter) {
4241  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4242  }
4243  LOG(FATAL) << "Unhandled node type: "
4245  return {};
4246 }
4247 
4248 namespace {
4249 
4251  auto sink = get_data_sink(ra);
4252  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4253  return join->getJoinType();
4254  }
4255  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4256  return JoinType::INNER;
4257  }
4258 
4259  return JoinType::INVALID;
4260 }
4261 
4262 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4263  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4264  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4265  return nullptr;
4266  }
4267  const auto equi_join_condition =
4268  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4269  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4270  return nullptr;
4271  }
4272  const auto both_are_null_condition =
4273  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4274  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4275  both_are_null_condition->size() != 2) {
4276  return nullptr;
4277  }
4278  const auto lhs_is_null =
4279  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4280  const auto rhs_is_null =
4281  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4282  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4283  rhs_is_null->getOperator() != kISNULL) {
4284  return nullptr;
4285  }
4286  CHECK_EQ(size_t(1), lhs_is_null->size());
4287  CHECK_EQ(size_t(1), rhs_is_null->size());
4288  CHECK_EQ(size_t(2), equi_join_condition->size());
4289  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4290  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4291  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4292  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4293  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4294  return nullptr;
4295  }
4296  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4297  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4298  RexDeepCopyVisitor deep_copy_visitor;
4299  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4300  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4301  eq_operands.emplace_back(lhs_op_copy.release());
4302  eq_operands.emplace_back(rhs_op_copy.release());
4303  return boost::make_unique<const RexOperator>(
4304  kBW_EQ, eq_operands, equi_join_condition->getType());
4305  }
4306  return nullptr;
4307 }
4308 
4309 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4310  const RexScalar* scalar) {
4311  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4312  if (condition && condition->getOperator() == kAND) {
4313  CHECK_GE(condition->size(), size_t(2));
4314  auto acc = get_bitwise_equals(condition->getOperand(0));
4315  if (!acc) {
4316  return nullptr;
4317  }
4318  for (size_t i = 1; i < condition->size(); ++i) {
4319  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4320  and_operands.emplace_back(std::move(acc));
4321  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4322  acc =
4323  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4324  }
4325  return acc;
4326  }
4327  return get_bitwise_equals(scalar);
4328 }
4329 
4330 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4331  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4332  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4333  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4334  ++nesting_level) {
4335  if (left_deep_join->getOuterCondition(nesting_level)) {
4336  join_types[nesting_level - 1] = JoinType::LEFT;
4337  }
4338  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4339  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4340  join_types[nesting_level - 1] = cur_level_join_type;
4341  }
4342  }
4343  return join_types;
4344 }
4345 
4346 template <class RA>
4347 std::vector<size_t> do_table_reordering(
4348  std::vector<InputDescriptor>& input_descs,
4349  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4350  const JoinQualsPerNestingLevel& left_deep_join_quals,
4351  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4352  const RA* node,
4353  const std::vector<InputTableInfo>& query_infos,
4354  const Executor* executor) {
4355  if (g_cluster) {
4356  // Disable table reordering in distributed mode. The aggregator does not have enough
4357  // information to break ties
4358  return {};
4359  }
4360  const auto& cat = *executor->getCatalog();
4361  for (const auto& table_info : query_infos) {
4362  if (table_info.table_id < 0) {
4363  continue;
4364  }
4365  const auto td = cat.getMetadataForTable(table_info.table_id);
4366  CHECK(td);
4367  if (table_is_replicated(td)) {
4368  return {};
4369  }
4370  }
4371  const auto input_permutation =
4372  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4373  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4374  std::tie(input_descs, input_col_descs, std::ignore) =
4375  get_input_desc(node, input_to_nest_level, input_permutation, cat);
4376  return input_permutation;
4377 }
4378 
4380  const RelLeftDeepInnerJoin* left_deep_join) {
4381  std::vector<size_t> input_sizes;
4382  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4383  const auto inputs = get_node_output(left_deep_join->getInput(i));
4384  input_sizes.push_back(inputs.size());
4385  }
4386  return input_sizes;
4387 }
4388 
4389 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4390  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4391  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4392  for (const auto& qual : quals) {
4393  const auto rewritten_qual = rewrite_expr(qual.get());
4394  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4395  }
4396  return rewritten_quals;
4397 }
4398 
4399 } // namespace
4400 
4402  const RelCompound* compound,
4403  const SortInfo& sort_info,
4404  const ExecutionOptions& eo) {
4405  std::vector<InputDescriptor> input_descs;
4406  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4407  auto input_to_nest_level = get_input_nest_levels(compound, {});
4408  std::tie(input_descs, input_col_descs, std::ignore) =
4409  get_input_desc(compound, input_to_nest_level, {}, cat_);
4410  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4411  const auto query_infos = get_table_infos(input_descs, executor_);
4412  CHECK_EQ(size_t(1), compound->inputCount());
4413  const auto left_deep_join =
4414  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4415  JoinQualsPerNestingLevel left_deep_join_quals;
4416  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4417  : std::vector<JoinType>{get_join_type(compound)};
4418  std::vector<size_t> input_permutation;
4419  std::vector<size_t> left_deep_join_input_sizes;
4420  std::optional<unsigned> left_deep_tree_id;
4421  if (left_deep_join) {
4422  left_deep_tree_id = left_deep_join->getId();
4423  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4424  left_deep_join_quals = translateLeftDeepJoinFilter(
4425  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4427  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4428  join_types.end()) {
4429  input_permutation = do_table_reordering(input_descs,
4430  input_col_descs,
4431  left_deep_join_quals,
4432  input_to_nest_level,
4433  compound,
4434  query_infos,
4435  executor_);
4436  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4437  std::tie(input_descs, input_col_descs, std::ignore) =
4438  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
4439  left_deep_join_quals = translateLeftDeepJoinFilter(
4440  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4441  }
4442  }
4443  RelAlgTranslator translator(cat_,
4444  query_state_,
4445  executor_,
4446  input_to_nest_level,
4447  join_types,
4448  now_,
4449  eo.just_explain);
4450  const auto scalar_sources =
4451  translate_scalar_sources(compound, translator, eo.executor_type);
4452  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4453  const auto quals_cf = translate_quals(compound, translator);
4454  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4455  const auto target_exprs = translate_targets(target_exprs_owned_,
4456  target_exprs_type_infos,
4457  scalar_sources,
4458  groupby_exprs,
4459  compound,
4460  translator,
4461  eo.executor_type);
4462 
4463  auto query_hint = RegisteredQueryHint::defaults();
4464  if (query_dag_) {
4465  auto candidate = query_dag_->getQueryHint(compound);
4466  if (candidate) {
4467  query_hint = *candidate;
4468  }
4469  }
4470  CHECK_EQ(compound->size(), target_exprs.size());
4471  const RelAlgExecutionUnit exe_unit = {input_descs,
4472  input_col_descs,
4473  quals_cf.simple_quals,
4474  rewrite_quals(quals_cf.quals),
4475  left_deep_join_quals,
4476  groupby_exprs,
4477  target_exprs,
4478  target_exprs_type_infos,
4479  nullptr,
4480  sort_info,
4481  0,
4482  query_hint,
4483  compound->getQueryPlanDagHash(),
4484  {},
4485  {},
4486  false,
4487  std::nullopt,
4488  query_state_};
4489  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4490  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4491  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4492  compound->setOutputMetainfo(targets_meta);
4493  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4494  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4495  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4496  rewritten_exe_unit.join_quals);
4497  }
4498  if (has_valid_query_plan_dag(compound)) {
4499  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4500  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4501  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4502  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4503  }
4504  return {rewritten_exe_unit,
4505  compound,
4507  std::move(query_rewriter),
4508  input_permutation,
4509  left_deep_join_input_sizes};
4510 }
4511 
4512 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4513  const RelAlgNode* node) {
4514  auto input_to_nest_level = get_input_nest_levels(node, {});
4515  const auto left_deep_join =
4516  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4517  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4518  : std::vector<JoinType>{get_join_type(node)};
4519  return std::make_shared<RelAlgTranslator>(
4520  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
4521 }
4522 
4523 namespace {
4524 
4525 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4526  CHECK(qual_expr);
4527  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4528  if (!bin_oper || bin_oper->getOperator() != kAND) {
4529  return {qual_expr};
4530  }
4531  CHECK_GE(bin_oper->size(), size_t(2));
4532  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4533  for (size_t i = 1; i < bin_oper->size(); ++i) {
4534  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4535  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4536  }
4537  return lhs_cf;
4538 }
4539 
4540 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4541  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4542  const SQLOps sql_op) {
4543  CHECK(!factors.empty());
4544  auto acc = factors.front();
4545  for (size_t i = 1; i < factors.size(); ++i) {
4546  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4547  }
4548  return acc;
4549 }
4550 
4551 template <class QualsList>
4552 bool list_contains_expression(const QualsList& haystack,
4553  const std::shared_ptr<Analyzer::Expr>& needle) {
4554  for (const auto& qual : haystack) {
4555  if (*qual == *needle) {
4556  return true;
4557  }
4558  }
4559  return false;
4560 }
4561 
4562 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4563 // evaluations of `p` and allows use of the original form in joins if `p`
4564 // can be used for hash joins.
4565 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4566  const std::shared_ptr<Analyzer::Expr>& expr) {
4567  const auto expr_terms = qual_to_disjunctive_form(expr);
4568  CHECK_GE(expr_terms.size(), size_t(1));
4569  const auto& first_term = expr_terms.front();
4570  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4571  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4572  // First, collect the conjunctive components common to all the disjunctive components.
4573  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4574  for (const auto& first_term_factor : first_term_factors.quals) {
4575  bool is_common =
4576  expr_terms.size() > 1; // Only report common factors for disjunction.
4577  for (size_t i = 1; i < expr_terms.size(); ++i) {
4578  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4579  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4580  is_common = false;
4581  break;
4582  }
4583  }
4584  if (is_common) {
4585  common_factors.push_back(first_term_factor);
4586  }
4587  }
4588  if (common_factors.empty()) {
4589  return expr;
4590  }
4591  // Now that the common expressions are known, collect the remaining expressions.
4592  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4593  for (const auto& term : expr_terms) {
4594  const auto term_cf = qual_to_conjunctive_form(term);
4595  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4596  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4597  for (const auto& qual : term_cf.quals) {
4598  if (!list_contains_expression(common_factors, qual)) {
4599  remaining_quals.push_back(qual);
4600  }
4601  }
4602  if (!remaining_quals.empty()) {
4603  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4604  }
4605  }
4606  // Reconstruct the expression with the transformation applied.
4607  const auto common_expr = build_logical_expression(common_factors, kAND);
4608  if (remaining_terms.empty()) {
4609  return common_expr;
4610  }
4611  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4612  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4613 }
4614 
4615 } // namespace
4616 
4617 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4618  const RexScalar* join_condition,
4619  const std::vector<JoinType>& join_types,
4620  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4621  const bool just_explain) const {
4622  RelAlgTranslator translator(
4623  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4624  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4625  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4626  for (const auto rex_condition_component : rex_condition_cf) {
4627  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4628  const auto join_condition = reverse_logical_distribution(
4629  translator.translate(bw_equals ? bw_equals.get() : rex_condition_component));
4630  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4631 
4632  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4633  for (const auto& cf_qual : cf_quals) {
4634  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4635  }
4636  };
4637 
4638  append_folded_cf_quals(join_condition_cf.quals);
4639  append_folded_cf_quals(join_condition_cf.simple_quals);
4640  }
4641  return combine_equi_join_conditions(join_condition_quals);
4642 }
4643 
4644 // Translate left deep join filter and separate the conjunctive form qualifiers
4645 // per nesting level. The code generated for hash table lookups on each level
4646 // must dominate its uses in deeper nesting levels.
4648  const RelLeftDeepInnerJoin* join,
4649  const std::vector<InputDescriptor>& input_descs,
4650  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4651  const bool just_explain) {
4652  const auto join_types = left_deep_join_types(join);
4653  const auto join_condition_quals = makeJoinQuals(
4654  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4655  MaxRangeTableIndexVisitor rte_idx_visitor;
4656  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4657  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4658  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4659  const auto outer_condition = join->getOuterCondition(rte_idx);
4660  if (outer_condition) {
4661  result[rte_idx - 1].quals =
4662  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4663  CHECK_LE(rte_idx, join_types.size());
4664  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4665  result[rte_idx - 1].type = JoinType::LEFT;
4666  continue;
4667  }
4668  for (const auto& qual : join_condition_quals) {
4669  if (visited_quals.count(qual)) {
4670  continue;
4671  }
4672  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4673  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4674  const auto it_ok = visited_quals.emplace(qual);
4675  CHECK(it_ok.second);
4676  result[rte_idx - 1].quals.push_back(qual);
4677  }
4678  }
4679  CHECK_LE(rte_idx, join_types.size());
4680  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4681  join_types[rte_idx - 1] == JoinType::SEMI ||
4682  join_types[rte_idx - 1] == JoinType::ANTI);
4683  result[rte_idx - 1].type = join_types[rte_idx - 1];
4684  }
4685  return result;
4686 }
4687 
4688 namespace {
4689 
4690 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4691  const RelAlgNode* ra_node,
4692  const size_t nest_level,
4693  const std::vector<TargetMetaInfo>& in_metainfo,
4694  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4695  CHECK_LE(size_t(1), ra_node->inputCount());
4696  CHECK_GE(size_t(2), ra_node->inputCount());
4697  const auto input = ra_node->getInput(nest_level);
4698  const auto it_rte_idx = input_to_nest_level.find(input);
4699  CHECK(it_rte_idx != input_to_nest_level.end());
4700  const int rte_idx = it_rte_idx->second;
4701  const int table_id = table_id_from_ra(input);
4702  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4703  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4704  int input_idx = 0;
4705  for (const auto& input_meta : in_metainfo) {
4706  inputs.push_back(
4707  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4708  table_id,
4709  scan_ra ? input_idx + 1 : input_idx,
4710  rte_idx));
4711  ++input_idx;
4712  }
4713  return inputs;
4714 }
4715 
4716 std::vector<Analyzer::Expr*> get_raw_pointers(
4717  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4718  std::vector<Analyzer::Expr*> output(input.size());
4719  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4720  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4721  return output;
4722 }
4723 
4724 } // namespace
4725 
4727  const RelAggregate* aggregate,
4728  const SortInfo& sort_info,
4729  const bool just_explain) {
4730  std::vector<InputDescriptor> input_descs;
4731  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4732  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4733  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4734  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4735  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4736  const auto join_type = get_join_type(aggregate);
4737 
4738  RelAlgTranslator translator(cat_,
4739  query_state_,
4740  executor_,
4741  input_to_nest_level,
4742  {join_type},
4743  now_,
4744  just_explain);
4745  CHECK_EQ(size_t(1), aggregate->inputCount());
4746  const auto source = aggregate->getInput(0);
4747  const auto& in_metainfo = source->getOutputMetainfo();
4748  const auto scalar_sources =
4749  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4750  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4751  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4752  const auto target_exprs = translate_targets(target_exprs_owned_,
4753  target_exprs_type_infos,
4754  scalar_sources,
4755  groupby_exprs,
4756  aggregate,
4757  translator);
4758 
4759  const auto query_infos = get_table_infos(input_descs, executor_);
4760 
4761  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4762  aggregate->setOutputMetainfo(targets_meta);
4763  auto query_hint = RegisteredQueryHint::defaults();
4764  if (query_dag_) {
4765  auto candidate = query_dag_->getQueryHint(aggregate);
4766  if (candidate) {
4767  query_hint = *candidate;
4768  }
4769  }
4770  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4771  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4772  return {RelAlgExecutionUnit{input_descs,
4773  input_col_descs,
4774  {},
4775  {},
4776  {},
4777  groupby_exprs,
4778  target_exprs,
4779  target_exprs_type_infos,
4780  nullptr,
4781  sort_info,
4782  0,
4783  query_hint,
4784  aggregate->getQueryPlanDagHash(),
4785  join_info.hash_table_plan_dag,
4786  join_info.table_id_to_node_map,
4787  false,
4788  std::nullopt,
4789  query_state_},
4790  aggregate,
4792  nullptr};
4793 }
4794 
4796  const RelProject* project,
4797  const SortInfo& sort_info,
4798  const ExecutionOptions& eo) {
4799  std::vector<InputDescriptor> input_descs;
4800  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4801  auto input_to_nest_level = get_input_nest_levels(project, {});
4802  std::tie(input_descs, input_col_descs, std::ignore) =
4803  get_input_desc(project, input_to_nest_level, {}, cat_);
4804  const auto query_infos = get_table_infos(input_descs, executor_);
4805 
4806  const auto left_deep_join =
4807  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4808  JoinQualsPerNestingLevel left_deep_join_quals;
4809  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4810  : std::vector<JoinType>{get_join_type(project)};
4811  std::vector<size_t> input_permutation;
4812  std::vector<size_t> left_deep_join_input_sizes;
4813  std::optional<unsigned> left_deep_tree_id;
4814  if (left_deep_join) {
4815  left_deep_tree_id = left_deep_join->getId();
4816  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4817  const auto query_infos = get_table_infos(input_descs, executor_);
4818  left_deep_join_quals = translateLeftDeepJoinFilter(
4819  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4821  input_permutation = do_table_reordering(input_descs,
4822  input_col_descs,
4823  left_deep_join_quals,
4824  input_to_nest_level,
4825  project,
4826  query_infos,
4827  executor_);
4828  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4829  std::tie(input_descs, input_col_descs, std::ignore) =
4830  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4831  left_deep_join_quals = translateLeftDeepJoinFilter(
4832  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4833  }
4834  }
4835 
4836  RelAlgTranslator translator(cat_,
4837  query_state_,
4838  executor_,
4839  input_to_nest_level,
4840  join_types,
4841  now_,
4842  eo.just_explain);
4843  const auto target_exprs_owned =
4844  translate_scalar_sources(project, translator, eo.executor_type);
4845 
4846  target_exprs_owned_.insert(
4847  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4848  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4849  auto query_hint = RegisteredQueryHint::defaults();
4850  if (query_dag_) {
4851  auto candidate = query_dag_->getQueryHint(project);
4852  if (candidate) {
4853  query_hint = *candidate;
4854  }
4855  }
4856  const RelAlgExecutionUnit exe_unit = {input_descs,
4857  input_col_descs,
4858  {},
4859  {},
4860  left_deep_join_quals,
4861  {nullptr},
4862  target_exprs,
4863  {},
4864  nullptr,
4865  sort_info,
4866  0,
4867  query_hint,
4868  project->getQueryPlanDagHash(),
4869  {},
4870  {},
4871  false,
4872  std::nullopt,
4873  query_state_};
4874  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4875  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4876  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4877  project->setOutputMetainfo(targets_meta);
4878  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4879  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4880  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4881  rewritten_exe_unit.join_quals);
4882  }
4883  if (has_valid_query_plan_dag(project)) {
4884  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4885  project, left_deep_tree_id, left_deep_trees_info, executor_);
4886  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4887  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4888  }
4889  return {rewritten_exe_unit,
4890  project,
4892  std::move(query_rewriter),
4893  input_permutation,
4894  left_deep_join_input_sizes};
4895 }
4896 
4897 namespace {
4898 
4899 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4900  RelAlgNode const* input_node) {
4901  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4902  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4903  const int negative_node_id = -input_node->getId();
4904  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4905  target_exprs.reserve(tmis.size());
4906  for (size_t i = 0; i < tmis.size(); ++i) {
4907  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4908  tmis[i].get_type_info(), negative_node_id, i, 0));
4909  }
4910  return target_exprs;
4911 }
4912 
4913 } // namespace
4914 
4916  const RelLogicalUnion* logical_union,
4917  const SortInfo& sort_info,
4918  const ExecutionOptions& eo) {
4919  std::vector<InputDescriptor> input_descs;
4920  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4921  // Map ra input ptr to index (0, 1).
4922  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4923  std::tie(input_descs, input_col_descs, std::ignore) =
4924  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4925  const auto query_infos = get_table_infos(input_descs, executor_);
4926  auto const max_num_tuples =
4927  std::accumulate(query_infos.cbegin(),
4928  query_infos.cend(),
4929  size_t(0),
4930  [](auto max, auto const& query_info) {
4931  return std::max(max, query_info.info.getNumTuples());
4932  });
4933 
4934  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4935  for (auto& pair : input_to_nest_level) {
4936  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4937  << pair.second << ')';
4938  }
4939 
4940  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4941  // may differ on StringDictionaries.
4942  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4943  for (unsigned i = 0; i < 2; ++i) {
4944  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4945  CHECK(!input_exprs_owned.empty())
4946  << "No metainfo found for input node(" << i << ") "
4947  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
4948  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4949  for (auto& input_expr : input_exprs_owned) {
4950  VLOG(3) << " " << input_expr->toString();
4951  }
4952  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4953  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4954  }
4955 
4956  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4957  << " input_col_descs=" << shared::printContainer(input_col_descs)
4958  << " target_exprs.size()=" << target_exprs_pair[0].size()
4959  << " max_num_tuples=" << max_num_tuples;
4960 
4961  const RelAlgExecutionUnit exe_unit = {input_descs,
4962  input_col_descs,
4963  {}, // quals_cf.simple_quals,
4964  {}, // rewrite_quals(quals_cf.quals),
4965  {},
4966  {nullptr},
4967  target_exprs_pair[0],
4968  {},
4969  nullptr,
4970  sort_info,
4971  max_num_tuples,
4974  {},
4975  {},
4976  false,
4977  logical_union->isAll(),
4978  query_state_,
4979  target_exprs_pair[1]};
4980  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4981  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4982 
4983  RelAlgNode const* input0 = logical_union->getInput(0);
4984  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4985  logical_union->setOutputMetainfo(
4986  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4987  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4988  logical_union->setOutputMetainfo(
4989  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4990  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4991  logical_union->setOutputMetainfo(
4992  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4993  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4994  logical_union->setOutputMetainfo(
4995  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4996  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4997  logical_union->setOutputMetainfo(
4998  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4999  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5000  logical_union->setOutputMetainfo(
5001  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5002  } else if (dynamic_cast<const RelSort*>(input0)) {
5003  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5004  } else {
5005  throw QueryNotSupported("Unsupported input type: " +
5007  }
5008  VLOG(3) << "logical_union->getOutputMetainfo()="
5009  << shared::printContainer(logical_union->getOutputMetainfo())
5010  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
5011  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
5012 
5013  return {rewritten_exe_unit,
5014  logical_union,
5016  std::move(query_rewriter)};
5017 }
5018 
5020  const RelTableFunction* rel_table_func,
5021  const bool just_explain,
5022  const bool is_gpu) {
5023  std::vector<InputDescriptor> input_descs;
5024  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5025  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5026  std::tie(input_descs, input_col_descs, std::ignore) =
5027  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
5028  const auto query_infos = get_table_infos(input_descs, executor_);
5029  RelAlgTranslator translator(
5030  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5031  const auto input_exprs_owned = translate_scalar_sources(
5032  rel_table_func, translator, ::ExecutorType::TableFunctions);
5033  target_exprs_owned_.insert(
5034  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5035  auto input_exprs = get_raw_pointers(input_exprs_owned);
5036 
5037  const auto table_function_impl_and_type_infos = [=]() {
5038  if (is_gpu) {
5039  try {
5040  return bind_table_function(
5041  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5042  } catch (ExtensionFunctionBindingError& e) {
5043  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5044  << " Redirecting " << rel_table_func->getFunctionName()
5045  << " step to run on CPU.";
5046  throw QueryMustRunOnCpu();
5047  }
5048  } else {
5049  try {
5050  return bind_table_function(
5051  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5052  } catch (ExtensionFunctionBindingError& e) {
5053  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5054  throw;
5055  }
5056  }
5057  }();
5058  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5059  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5060 
5061  size_t output_row_sizing_param = 0;
5062  if (table_function_impl
5063  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5064  const auto parameter_index =
5065  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5066  CHECK_GT(parameter_index, size_t(0));
5067  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5068  const auto parameter_expr =
5069  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5070  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5071  if (!parameter_expr_literal) {
5072  throw std::runtime_error(
5073  "Provided output buffer sizing parameter is not a literal. Only literal "
5074  "values are supported with output buffer sizing configured table "
5075  "functions.");
5076  }
5077  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5078  if (literal_val < 0) {
5079  throw std::runtime_error("Provided output sizing parameter " +
5080  std::to_string(literal_val) +
5081  " must be positive integer.");
5082  }
5083  output_row_sizing_param = static_cast<size_t>(literal_val);
5084  } else {
5085  // RowMultiplier not specified in the SQL query. Set it to 1
5086  output_row_sizing_param = 1; // default value for RowMultiplier
5087  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5088  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
5089  makeExpr<Analyzer::Constant>(kINT, false, d);
5090  // Push the constant 1 to input_exprs
5091  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
5092  DEFAULT_ROW_MULTIPLIER_EXPR.get());
5093  }
5094  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5095  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5096  } else {
5097  UNREACHABLE();
5098  }
5099 
5100  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5101  size_t input_index = 0;
5102  size_t arg_index = 0;
5103  const auto table_func_args = table_function_impl.getInputArgs();
5104  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5105  for (const auto& ti : table_function_type_infos) {
5106  if (ti.is_column_list()) {
5107  for (int i = 0; i < ti.get_dimension(); i++) {
5108  auto& input_expr = input_exprs[input_index];
5109  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5110  CHECK(col_var);
5111 
5112  // avoid setting type info to ti here since ti doesn't have all the
5113  // properties correctly set
5114  auto type_info = input_expr->get_type_info();
5115  if (ti.is_column_array()) {
5116  type_info.set_compression(kENCODING_ARRAY);
5117  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5118  } else {
5119  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5120  }
5121  type_info.set_type(ti.get_type()); // set type to column list
5122  type_info.set_dimension(ti.get_dimension());
5123  input_expr->set_type_info(type_info);
5124 
5125  input_col_exprs.push_back(col_var);
5126  input_index++;
5127  }
5128  } else if (ti.is_column()) {
5129  auto& input_expr = input_exprs[input_index];
5130  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5131  CHECK(col_var);
5132  // same here! avoid setting type info to ti since it doesn't have all the
5133  // properties correctly set
5134  auto type_info = input_expr->get_type_info();
5135  if (ti.is_column_array()) {
5136  type_info.set_compression(kENCODING_ARRAY);
5137  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5138  } else {
5139  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5140  }
5141  type_info.set_type(ti.get_type()); // set type to column
5142  input_expr->set_type_info(type_info);
5143  input_col_exprs.push_back(col_var);
5144  input_index++;
5145  } else {
5146  auto input_expr = input_exprs[input_index];
5147  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5148  if (ext_func_arg_ti != input_expr->get_type_info()) {
5149  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
5150  }
5151  input_index++;
5152  }
5153  arg_index++;
5154  }
5155  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5156  std::vector<Analyzer::Expr*> table_func_outputs;
5157  constexpr int32_t transient_pos{-1};
5158  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5159  auto ti = table_function_impl.getOutputSQLType(i);
5160  if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5161  auto p = table_function_impl.getInputID(i);
5162 
5163  int32_t input_pos = p.first;
5164  if (input_pos == transient_pos) {
5165  ti.set_comp_param(TRANSIENT_DICT_ID);
5166  } else {
5167  // Iterate over the list of arguments to compute the offset. Use this offset to
5168  // get the corresponding input
5169  int32_t offset = 0;
5170  for (int j = 0; j < input_pos; j++) {
5171  const auto ti = table_function_type_infos[j];
5172  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5173  }
5174  input_pos = offset + p.second;
5175 
5176  CHECK_LT(input_pos, input_exprs.size());
5177  int32_t comp_param =
5178  input_exprs_owned[input_pos]->get_type_info().get_comp_param();
5179  ti.set_comp_param(comp_param);
5180  }
5181  }
5182  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
5183  table_func_outputs.push_back(target_exprs_owned_.back().get());
5184  }
5185  const TableFunctionExecutionUnit exe_unit = {
5186  input_descs,
5187  input_col_descs,
5188  input_exprs, // table function inputs
5189  input_col_exprs, // table function column inputs (duplicates w/ above)
5190  table_func_outputs, // table function projected exprs
5191  output_row_sizing_param, // output buffer sizing param
5192  table_function_impl};
5193  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5194  rel_table_func->setOutputMetainfo(targets_meta);
5195  return {exe_unit, rel_table_func};
5196 }
5197 
5198 namespace {
5199 
5200 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5202  const RelAlgTranslator& translator,
5203  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5204  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5205  std::vector<TargetMetaInfo> in_metainfo;
5206  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5207  const auto data_sink_node = get_data_sink(filter);
5208  auto input_it = inputs_owned.begin();
5209  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5210  const auto source = data_sink_node->getInput(nest_level);
5211  const auto scan_source = dynamic_cast<const RelScan*>(source);
5212  if (scan_source) {
5213  CHECK(source->getOutputMetainfo().empty());
5214  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5215  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5216  scalar_sources_owned.push_back(translator.translate(input_it->get()));
5217  }
5218  const auto source_metadata =
5219  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
5220  in_metainfo.insert(
5221  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5222  exprs_owned.insert(
5223  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5224  } else {
5225  const auto& source_metadata = source->getOutputMetainfo();
5226  input_it += source_metadata.size();
5227  in_metainfo.insert(
5228  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5229  const auto scalar_sources_owned = synthesize_inputs(
5230  data_sink_node, nest_level, source_metadata, input_to_nest_level);
5231  exprs_owned.insert(
5232  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5233  }
5234  }
5235  return std::make_pair(in_metainfo, exprs_owned);
5236 }
5237 
5238 } // namespace
5239 
5241  const SortInfo& sort_info,
5242  const bool just_explain) {
5243  CHECK_EQ(size_t(1), filter->inputCount());
5244  std::vector<InputDescriptor> input_descs;
5245  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5246  std::vector<TargetMetaInfo> in_metainfo;
5247  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5248  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5249 
5250  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5251  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5252  get_input_desc(filter, input_to_nest_level, {}, cat_);
5253  const auto join_type = get_join_type(filter);
5254  RelAlgTranslator translator(cat_,
5255  query_state_,
5256  executor_,
5257  input_to_nest_level,
5258  {join_type},
5259  now_,
5260</