OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
21 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RelAlgDag.h"
38 #include "QueryEngine/RexVisitor.h"
42 #include "Shared/measure.h"
43 #include "Shared/misc.h"
44 #include "Shared/shard_key.h"
45 
46 #include <boost/algorithm/cxx11/any_of.hpp>
47 #include <boost/range/adaptor/reversed.hpp>
48 
49 #include <algorithm>
50 #include <functional>
51 #include <numeric>
52 
54 bool g_enable_interop{false};
55 bool g_enable_union{true}; // DEPRECATED
59 
60 extern bool g_enable_watchdog;
62 extern bool g_enable_bump_allocator;
64 extern bool g_enable_system_tables;
65 
66 namespace {
67 
68 bool node_is_aggregate(const RelAlgNode* ra) {
69  const auto compound = dynamic_cast<const RelCompound*>(ra);
70  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
71  return ((compound && compound->isAggregate()) || aggregate);
72 }
73 
74 std::unordered_set<PhysicalInput> get_physical_inputs(
76  const RelAlgNode* ra) {
77  auto phys_inputs = get_physical_inputs(ra);
78  std::unordered_set<PhysicalInput> phys_inputs2;
79  for (auto& phi : phys_inputs) {
80  phys_inputs2.insert(
81  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
82  }
83  return phys_inputs2;
84 }
85 
86 void set_parallelism_hints(const RelAlgNode& ra_node,
87  const Catalog_Namespace::Catalog& catalog) {
88  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
89  parallelism_hints_per_table;
90  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
91  int table_id = physical_input.table_id;
92  auto table = catalog.getMetadataForTable(table_id, false);
93  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
94  !table->is_in_memory_system_table) {
95  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
96  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
97  auto foreign_table = catalog.getForeignTable(table_id);
98  for (const auto& fragment :
99  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
100  Chunk_NS::Chunk chunk{col_desc};
101  ChunkKey chunk_key = {
102  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
103 
104  // Parallelism hints should not include fragments that are not mapped to the
105  // current node, otherwise we will try to prefetch them and run into trouble.
107  continue;
108  }
109 
110  // do not include chunk hints that are in CPU memory
111  if (!chunk.isChunkOnDevice(
112  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
113  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
115  fragment.fragmentId});
116  }
117  }
118  }
119  }
120  if (!parallelism_hints_per_table.empty()) {
121  auto foreign_storage_mgr =
123  CHECK(foreign_storage_mgr);
124  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
125  }
126 }
127 
129  const Catalog_Namespace::Catalog& catalog) {
130  for (const auto [col_id, table_id] : get_physical_inputs(&ra_node)) {
131  auto table = catalog.getMetadataForTable(table_id, false);
132  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
133  auto spi_col_id = catalog.getColumnIdBySpi(table_id, col_id);
134  foreign_storage::populate_string_dictionary(table_id, spi_col_id, catalog);
135  }
136  }
137 }
138 
140  const Catalog_Namespace::Catalog& catalog) {
141  // Iterate through ra_node inputs for types that need to be loaded pre-execution
142  // If they do not have valid metadata, load them into CPU memory to generate
143  // the metadata and leave them ready to be used by the query
144  set_parallelism_hints(ra_node, catalog);
145  prepare_string_dictionaries(ra_node, catalog);
146 }
147 
149  const Catalog_Namespace::Catalog& catalog,
150  const CompilationOptions& co) {
152  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
153  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
154  int table_id = physical_input.table_id;
155  auto table = catalog.getMetadataForTable(table_id, false);
156  if (table && table->is_in_memory_system_table) {
157  auto column_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
158  system_table_columns_by_table_id[table_id].emplace_back(column_id);
159  }
160  }
161  // Execute on CPU for queries involving system tables
162  if (!system_table_columns_by_table_id.empty() &&
164  throw QueryMustRunOnCpu();
165  }
166 
167  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
168  // Clear any previously cached data, since system tables depend on point in
169  // time data snapshots.
171  ChunkKey{catalog.getDatabaseId(), table_id}, Data_Namespace::CPU_LEVEL);
172 
173  // TODO(Misiu): This prefetching can be removed if we can add support for
174  // ExpressionRanges to reduce invalid with valid ranges (right now prefetching
175  // causes us to fetch the chunks twice). Right now if we do not prefetch (i.e. if
176  // we remove the code below) some nodes will return valid ranges and others will
177  // return unknown because they only use placeholder metadata and the LeafAggregator
178  // has no idea how to reduce the two.
179  auto td = catalog.getMetadataForTable(table_id);
180  CHECK(td);
181  CHECK(td->fragmenter);
182  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
183  CHECK_LE(fragment_count, static_cast<size_t>(1))
184  << "In-memory system tables are expected to have a single fragment.";
185  if (fragment_count > 0) {
186  for (auto column_id : column_ids) {
187  // Prefetch system table chunks in order to force chunk statistics metadata
188  // computation.
189  auto cd = catalog.getMetadataForColumn(table_id, column_id);
190  ChunkKey chunk_key{catalog.getDatabaseId(), table_id, column_id, 0};
192  cd, &(catalog.getDataMgr()), chunk_key, Data_Namespace::CPU_LEVEL, 0, 0, 0);
193  }
194  }
195  }
196  }
197 }
198 
201 }
202 
203 // TODO(alex): Once we're fully migrated to the relational algebra model, change
204 // the executor interface to use the collation directly and remove this conversion.
205 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
206  std::list<Analyzer::OrderEntry> result;
207  for (size_t i = 0; i < sort->collationCount(); ++i) {
208  const auto sort_field = sort->getCollation(i);
209  result.emplace_back(sort_field.getField() + 1,
210  sort_field.getSortDir() == SortDirection::Descending,
211  sort_field.getNullsPosition() == NullSortedPosition::First);
212  }
213  return result;
214 }
215 
217  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
218  const std::vector<TargetMetaInfo>& targets_meta) {
219  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
220  render_info.targets.clear();
221  for (size_t i = 0; i < targets_meta.size(); ++i) {
222  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
223  targets_meta[i].get_resname(),
224  work_unit_target_exprs[i]->get_shared_ptr(),
225  false));
226  }
227 }
228 
230  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
231 }
232 
233 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
234  public:
235  std::vector<unsigned> visitLeftDeepInnerJoin(
236  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
237  return {left_deep_join_tree->getId()};
238  }
239 
240  protected:
241  std::vector<unsigned> aggregateResult(
242  const std::vector<unsigned>& aggregate,
243  const std::vector<unsigned>& next_result) const override {
244  auto result = aggregate;
245  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
246  return result;
247  }
248 };
249 
253  TextEncodingCastCounts() : text_decoding_casts(0UL), text_encoding_casts(0UL) {}
254  TextEncodingCastCounts(const size_t text_decoding_casts,
255  const size_t text_encoding_casts)
256  : text_decoding_casts(text_decoding_casts)
257  , text_encoding_casts(text_encoding_casts) {}
258 };
259 
260 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
261  public:
262  TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
263  : default_disregard_casts_to_none_encoding_(
264  default_disregard_casts_to_none_encoding) {}
265 
266  protected:
267  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
268  TextEncodingCastCounts result = defaultResult();
269  // Save state of input disregard_casts_to_none_encoding_ as child node traversal
270  // will reset it
271  const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
272  result = aggregateResult(result, visit(u_oper->get_operand()));
273  if (u_oper->get_optype() != kCAST) {
274  return result;
275  }
276  const auto& operand_ti = u_oper->get_operand()->get_type_info();
277  const auto& casted_ti = u_oper->get_type_info();
278  if (!operand_ti.is_string() || !casted_ti.is_string()) {
279  return result;
280  }
281  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
282  if (literals_only) {
283  return result;
284  }
285  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
286  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
287  }
288  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
289  if (!disregard_casts_to_none_encoding) {
290  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
291  } else {
292  return result;
293  }
294  }
295  return result;
296  }
297 
298  TextEncodingCastCounts visitBinOper(const Analyzer::BinOper* bin_oper) const override {
299  TextEncodingCastCounts result = defaultResult();
300  // Currently the join framework handles casts between string types, and
301  // casts to none-encoded strings should be considered spurious, except
302  // when the join predicate is not a =/<> operator, in which case
303  // for both join predicates and all other instances we have to decode
304  // to a none-encoded string to do the comparison. The logic below essentially
305  // overrides the logic such as to always count none-encoded casts on strings
306  // that are children of binary operators other than =/<>
307  if (bin_oper->get_optype() != kEQ && bin_oper->get_optype() != kNE) {
308  // Override the global override so that join opers don't skip
309  // the check when there is an actual cast to none-encoded string
310  const auto prev_disregard_casts_to_none_encoding_state =
311  disregard_casts_to_none_encoding_;
312  const auto left_u_oper =
313  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
314  if (left_u_oper && left_u_oper->get_optype() == kCAST) {
315  disregard_casts_to_none_encoding_ = false;
316  result = aggregateResult(result, visitUOper(left_u_oper));
317  } else {
318  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
319  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
320  }
321 
322  const auto right_u_oper =
323  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
324  if (right_u_oper && right_u_oper->get_optype() == kCAST) {
325  disregard_casts_to_none_encoding_ = false;
326  result = aggregateResult(result, visitUOper(right_u_oper));
327  } else {
328  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
329  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
330  }
331  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
332  } else {
333  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
334  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
335  }
336  return result;
337  }
338 
340  TextEncodingCastCounts result = defaultResult();
341  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
342  const auto prev_disregard_casts_to_none_encoding_state =
343  disregard_casts_to_none_encoding_;
344  if (u_oper && u_oper->get_optype() == kCAST) {
345  disregard_casts_to_none_encoding_ = true;
346  result = aggregateResult(result, visitUOper(u_oper));
347  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
348  } else {
349  result = aggregateResult(result, visit(like->get_arg()));
350  }
351  result = aggregateResult(result, visit(like->get_like_expr()));
352  if (like->get_escape_expr()) {
353  result = aggregateResult(result, visit(like->get_escape_expr()));
354  }
355  return result;
356  }
357 
359  const TextEncodingCastCounts& aggregate,
360  const TextEncodingCastCounts& next_result) const override {
361  auto result = aggregate;
362  result.text_decoding_casts += next_result.text_decoding_casts;
363  result.text_encoding_casts += next_result.text_encoding_casts;
364  return result;
365  }
366 
367  void visitBegin() const override {
368  disregard_casts_to_none_encoding_ = default_disregard_casts_to_none_encoding_;
369  }
370 
372  return TextEncodingCastCounts();
373  }
374 
375  private:
376  mutable bool disregard_casts_to_none_encoding_ = false;
378 };
379 
381  TextEncodingCastCounts cast_counts;
382 
383  auto check_node_for_text_casts = [&cast_counts](
384  const Analyzer::Expr* expr,
385  const bool disregard_casts_to_none_encoding) {
386  if (!expr) {
387  return;
388  }
389  TextEncodingCastCountVisitor visitor(disregard_casts_to_none_encoding);
390  const auto this_node_cast_counts = visitor.visit(expr);
391  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
392  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
393  };
394 
395  for (const auto& qual : ra_exe_unit.quals) {
396  check_node_for_text_casts(qual.get(), false);
397  }
398  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
399  check_node_for_text_casts(simple_qual.get(), false);
400  }
401  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
402  check_node_for_text_casts(groupby_expr.get(), false);
403  }
404  for (const auto& target_expr : ra_exe_unit.target_exprs) {
405  check_node_for_text_casts(target_expr, false);
406  }
407  for (const auto& join_condition : ra_exe_unit.join_quals) {
408  for (const auto& join_qual : join_condition.quals) {
409  // We currently need to not count casts to none-encoded strings for join quals,
410  // as analyzer will generate these but our join framework disregards them.
411  // Some investigation was done on having analyzer issue the correct inter-string
412  // dictionary casts, but this actually causes them to get executed and so the same
413  // work gets done twice.
414  check_node_for_text_casts(join_qual.get(),
415  true /* disregard_casts_to_none_encoding */);
416  }
417  }
418  return cast_counts;
419 }
420 
422  const std::vector<InputTableInfo>& query_infos,
423  const RelAlgExecutionUnit& ra_exe_unit) {
424  if (!g_enable_watchdog) {
425  return;
426  }
427  auto const tuples_upper_bound =
428  std::accumulate(query_infos.cbegin(),
429  query_infos.cend(),
430  size_t(0),
431  [](auto max, auto const& query_info) {
432  return std::max(max, query_info.info.getNumTuples());
433  });
434  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
435  return;
436  }
437 
438  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
439  const bool has_text_casts =
440  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
441 
442  if (!has_text_casts) {
443  return;
444  }
445  std::ostringstream oss;
446  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
447  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
448  << "exceeds the configured watchdog none-encoded string translation limit of "
450  throw std::runtime_error(oss.str());
451 }
452 
453 } // namespace
454 
456  RenderInfo* render_info) const {
457  auto validate_or_explain_query = is_validate_or_explain_query(eo);
458  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
460  !validate_or_explain_query && !hasStepForUnion() &&
461  !query_for_partial_outer_frag &&
462  (!render_info || (render_info && !render_info->isPotentialInSituRender()));
463 }
464 
466  const ExecutionOptions& eo) {
467  if (eo.find_push_down_candidates) {
468  return 0;
469  }
470 
471  if (eo.just_explain) {
472  return 0;
473  }
474 
475  CHECK(query_dag_);
476 
477  query_dag_->resetQueryExecutionState();
478  const auto& ra = query_dag_->getRootNode();
479 
480  auto lock = executor_->acquireExecuteMutex();
481  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
482  setupCaching(&ra);
483 
484  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
485  auto ed_seq = RaExecutionSequence(&ra, executor_);
486 
487  if (!getSubqueries().empty()) {
488  return 0;
489  }
490 
491  CHECK(!ed_seq.empty());
492  if (ed_seq.size() > 1) {
493  return 0;
494  }
495 
498  executor_->setCatalog(&cat_);
499  executor_->temporary_tables_ = &temporary_tables_;
500 
501  auto exec_desc_ptr = ed_seq.getDescriptor(0);
502  CHECK(exec_desc_ptr);
503  auto& exec_desc = *exec_desc_ptr;
504  const auto body = exec_desc.getBody();
505  if (body->isNop()) {
506  return 0;
507  }
508 
509  const auto project = dynamic_cast<const RelProject*>(body);
510  if (project) {
511  auto work_unit =
512  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
513 
514  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
515  executor_);
516  }
517 
518  const auto compound = dynamic_cast<const RelCompound*>(body);
519  if (compound) {
520  if (compound->isDeleteViaSelect()) {
521  return 0;
522  } else if (compound->isUpdateViaSelect()) {
523  return 0;
524  } else {
525  if (compound->isAggregate()) {
526  return 0;
527  }
528 
529  const auto work_unit =
530  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
531 
532  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
533  executor_);
534  }
535  }
536 
537  return 0;
538 }
539 
541  const ExecutionOptions& eo,
542  const bool just_explain_plan,
543  RenderInfo* render_info) {
544  CHECK(query_dag_);
546  << static_cast<int>(query_dag_->getBuildState());
547 
548  auto timer = DEBUG_TIMER(__func__);
550 
551  auto run_query = [&](const CompilationOptions& co_in) {
552  auto execution_result =
553  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
554 
555  constexpr bool vlog_result_set_summary{false};
556  if constexpr (vlog_result_set_summary) {
557  VLOG(1) << execution_result.getRows()->summaryToString();
558  }
559 
561  VLOG(1) << "Running post execution callback.";
562  (*post_execution_callback_)();
563  }
564  return execution_result;
565  };
566 
567  try {
568  return run_query(co);
569  } catch (const QueryMustRunOnCpu&) {
570  if (!g_allow_cpu_retry) {
571  throw;
572  }
573  }
574  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
575  auto co_cpu = CompilationOptions::makeCpuOnly(co);
576 
577  if (render_info) {
578  render_info->setForceNonInSituData();
579  }
580  return run_query(co_cpu);
581 }
582 
584  const ExecutionOptions& eo,
585  const bool just_explain_plan,
586  RenderInfo* render_info) {
588  auto timer = DEBUG_TIMER(__func__);
589  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
590 
591  query_dag_->resetQueryExecutionState();
592  const auto& ra = query_dag_->getRootNode();
593 
594  // capture the lock acquistion time
595  auto clock_begin = timer_start();
597  executor_->resetInterrupt();
598  }
599  std::string query_session{""};
600  std::string query_str{"N/A"};
601  std::string query_submitted_time{""};
602  // gather necessary query's info
603  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
604  query_session = query_state_->getConstSessionInfo()->get_session_id();
605  query_str = query_state_->getQueryStr();
606  query_submitted_time = query_state_->getQuerySubmittedTime();
607  }
608 
609  auto validate_or_explain_query =
610  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
611  auto interruptable = !render_info && !query_session.empty() &&
612  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
613  if (interruptable) {
614  // if we reach here, the current query which was waiting an idle executor
615  // within the dispatch queue is now scheduled to the specific executor
616  // (not UNITARY_EXECUTOR)
617  // so we update the query session's status with the executor that takes this query
618  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
619  query_session, query_str, query_submitted_time);
620 
621  // now the query is going to be executed, so update the status as
622  // "RUNNING_QUERY_KERNEL"
623  executor_->updateQuerySessionStatus(
624  query_session,
625  query_submitted_time,
626  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
627  }
628 
629  // so it should do cleanup session info after finishing its execution
630  ScopeGuard clearQuerySessionInfo =
631  [this, &query_session, &interruptable, &query_submitted_time] {
632  // reset the runtime query interrupt status after the end of query execution
633  if (interruptable) {
634  // cleanup running session's info
635  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
636  }
637  };
638 
639  auto acquire_execute_mutex = [](Executor * executor) -> auto {
640  auto ret = executor->acquireExecuteMutex();
641  return ret;
642  };
643  // now we acquire executor lock in here to make sure that this executor holds
644  // all necessary resources and at the same time protect them against other executor
645  auto lock = acquire_execute_mutex(executor_);
646 
647  if (interruptable) {
648  // check whether this query session is "already" interrupted
649  // this case occurs when there is very short gap between being interrupted and
650  // taking the execute lock
651  // if so we have to remove "all" queries initiated by this session and we do in here
652  // without running the query
653  try {
654  executor_->checkPendingQueryStatus(query_session);
655  } catch (QueryExecutionError& e) {
657  throw std::runtime_error("Query execution has been interrupted (pending query)");
658  }
659  throw e;
660  } catch (...) {
661  throw std::runtime_error("Checking pending query status failed: unknown error");
662  }
663  }
664  int64_t queue_time_ms = timer_stop(clock_begin);
665 
667 
668  // Notify foreign tables to load prior to caching
670 
671  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
672  setupCaching(&ra);
673 
674  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
675  auto ed_seq = RaExecutionSequence(&ra, executor_);
676 
677  if (just_explain_plan) {
678  std::stringstream ss;
679  std::vector<const RelAlgNode*> nodes;
680  for (size_t i = 0; i < ed_seq.size(); i++) {
681  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
682  }
683  size_t ctr_node_id_in_plan = nodes.size();
684  for (auto& body : boost::adaptors::reverse(nodes)) {
685  // we set each node's id in the query plan in advance before calling toString
686  // method to properly represent the query plan
687  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
688  body->setIdInPlanTree(node_id_in_plan_tree);
689  }
690  size_t ctr = nodes.size();
691  size_t tab_ctr = 0;
692  RelRexToStringConfig config;
693  config.skip_input_nodes = true;
694  for (auto& body : boost::adaptors::reverse(nodes)) {
695  const auto index = ctr--;
696  const auto tabs = std::string(tab_ctr++, '\t');
697  CHECK(body);
698  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
699  if (auto sort = dynamic_cast<const RelSort*>(body)) {
700  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
701  }
702  if (dynamic_cast<const RelProject*>(body) ||
703  dynamic_cast<const RelCompound*>(body)) {
704  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
705  ss << tabs << " : " << join->toString(config) << "\n";
706  }
707  }
708  }
709  const auto& subqueries = getSubqueries();
710  if (!subqueries.empty()) {
711  ss << "Subqueries: "
712  << "\n";
713  for (const auto& subquery : subqueries) {
714  const auto ra = subquery->getRelAlg();
715  ss << "\t" << ra->toString(config) << "\n";
716  }
717  }
718  auto rs = std::make_shared<ResultSet>(ss.str());
719  return {rs, {}};
720  }
721 
722  if (eo.find_push_down_candidates) {
723  // this extra logic is mainly due to current limitations on multi-step queries
724  // and/or subqueries.
726  ed_seq, co, eo, render_info, queue_time_ms);
727  }
728  timer_setup.stop();
729 
730  // Dispatch the subqueries first
731  for (auto subquery : getSubqueries()) {
732  const auto subquery_ra = subquery->getRelAlg();
733  CHECK(subquery_ra);
734  if (subquery_ra->hasContextData()) {
735  continue;
736  }
737  // Execute the subquery and cache the result.
738  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
739  RaExecutionSequence subquery_seq(subquery_ra, executor_);
740  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
741  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
742  }
743  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
744 }
745 
747  AggregatedColRange agg_col_range_cache;
748  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
749  return executor_->computeColRangesCache(phys_inputs);
750 }
751 
753  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
754  return executor_->computeStringDictionaryGenerations(phys_inputs);
755 }
756 
758  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
759  return executor_->computeTableGenerations(phys_table_ids);
760 }
761 
762 Executor* RelAlgExecutor::getExecutor() const {
763  return executor_;
764 }
765 
767  CHECK(executor_);
768  executor_->row_set_mem_owner_ = nullptr;
769 }
770 
771 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
773  auto sort_node = dynamic_cast<const RelSort*>(root_node);
774  if (sort_node) {
775  // we assume that test query that needs join info does not contain any sort node
776  return {};
777  }
778  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
779  RelLeftDeepTreeIdsCollector visitor;
780  auto left_deep_tree_ids = visitor.visit(root_node);
781  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
782 }
783 
784 namespace {
785 
787  CHECK_EQ(size_t(1), sort->inputCount());
788  const auto source = sort->getInput(0);
789  if (dynamic_cast<const RelSort*>(source)) {
790  throw std::runtime_error("Sort node not supported as input to another sort");
791  }
792 }
793 
794 } // namespace
795 
797  const RaExecutionSequence& seq,
798  const size_t step_idx,
799  const CompilationOptions& co,
800  const ExecutionOptions& eo,
801  RenderInfo* render_info) {
802  INJECT_TIMER(executeRelAlgQueryStep);
803 
804  auto exe_desc_ptr = seq.getDescriptor(step_idx);
805  CHECK(exe_desc_ptr);
806  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
807 
808  size_t shard_count{0};
809  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
810  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
811  };
812 
813  if (sort) {
815  auto order_entries = get_order_entries(sort);
816  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
818  source_work_unit.exe_unit, *executor_->getCatalog());
819  if (!shard_count) {
820  // No point in sorting on the leaf, only execute the input to the sort node.
821  CHECK_EQ(size_t(1), sort->inputCount());
822  const auto source = sort->getInput(0);
823  if (sort->collationCount() || node_is_aggregate(source)) {
824  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
825  CHECK_EQ(temp_seq.size(), size_t(1));
826  ExecutionOptions eo_copy = {
828  eo.keep_result,
829  eo.allow_multifrag,
830  eo.just_explain,
831  eo.allow_loop_joins,
832  eo.with_watchdog,
833  eo.jit_debug,
834  eo.just_validate || sort->isEmptyResult(),
843  eo.executor_type,
844  };
845  // Use subseq to avoid clearing existing temporary tables
846  return {
847  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
848  merge_type(source),
849  source->getId(),
850  false};
851  }
852  }
853  }
856  std::make_pair(step_idx, step_idx + 1),
857  co,
858  eo,
859  render_info,
861  merge_type(exe_desc_ptr->getBody()),
862  exe_desc_ptr->getBody()->getId(),
863  false};
865  VLOG(1) << "Running post execution callback.";
866  (*post_execution_callback_)();
867  }
868  return result;
869 }
870 
872  const AggregatedColRange& agg_col_range,
873  const StringDictionaryGenerations& string_dictionary_generations,
874  const TableGenerations& table_generations) {
875  // capture the lock acquistion time
876  auto clock_begin = timer_start();
878  executor_->resetInterrupt();
879  }
880  queue_time_ms_ = timer_stop(clock_begin);
881  executor_->row_set_mem_owner_ =
882  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
883  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
884  executor_->table_generations_ = table_generations;
885  executor_->agg_col_range_cache_ = agg_col_range;
886 }
887 
889  const CompilationOptions& co,
890  const ExecutionOptions& eo,
891  RenderInfo* render_info,
892  const int64_t queue_time_ms,
893  const bool with_existing_temp_tables) {
895  auto timer = DEBUG_TIMER(__func__);
896  if (!with_existing_temp_tables) {
898  }
901  executor_->setCatalog(&cat_);
902  executor_->temporary_tables_ = &temporary_tables_;
903 
904  time(&now_);
905  CHECK(!seq.empty());
906 
907  auto get_descriptor_count = [&seq, &eo]() -> size_t {
908  if (eo.just_explain) {
909  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
910  // run the logical values descriptor to generate the result set, then the next
911  // descriptor to generate the explain
912  CHECK_GE(seq.size(), size_t(2));
913  return 2;
914  } else {
915  return 1;
916  }
917  } else {
918  return seq.size();
919  }
920  };
921 
922  const auto exec_desc_count = get_descriptor_count();
923  auto eo_copied = eo;
924  if (seq.hasQueryStepForUnion()) {
925  // we currently do not support resultset recycling when an input query
926  // contains union (all) operation
927  eo_copied.keep_result = false;
928  }
929 
930  // we have to register resultset(s) of the skipped query step(s) as temporary table
931  // before executing the remaining query steps
932  // since they may be required during the query processing
933  // i.e., get metadata of the target expression from the skipped query step
935  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
936  const auto cached_res =
937  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
938  CHECK(cached_res);
939  addTemporaryTable(kv.first, cached_res);
940  }
941  }
942 
943  const auto num_steps = exec_desc_count - 1;
944  for (size_t i = 0; i < exec_desc_count; i++) {
945  VLOG(1) << "Executing query step " << i << " / " << num_steps;
946  try {
948  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
949  } catch (const QueryMustRunOnCpu&) {
950  // Do not allow per-step retry if flag is off or in distributed mode
951  // TODO(todd): Determine if and when we can relax this restriction
952  // for distributed
955  throw;
956  }
957  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
958  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
959  if (render_info && i == num_steps) {
960  // only render on the last step
961  render_info->setForceNonInSituData();
962  }
963  executeRelAlgStep(seq,
964  i,
965  co_cpu,
966  eo_copied,
967  (i == num_steps) ? render_info : nullptr,
968  queue_time_ms);
969  } catch (const NativeExecutionError&) {
970  if (!g_enable_interop) {
971  throw;
972  }
973  auto eo_extern = eo_copied;
974  eo_extern.executor_type = ::ExecutorType::Extern;
975  auto exec_desc_ptr = seq.getDescriptor(i);
976  const auto body = exec_desc_ptr->getBody();
977  const auto compound = dynamic_cast<const RelCompound*>(body);
978  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979  LOG(INFO) << "Also failed to run the query using interoperability";
980  throw;
981  }
983  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
984  }
985  }
986 
987  return seq.getDescriptor(num_steps)->getResult();
988 }
989 
991  const RaExecutionSequence& seq,
992  const std::pair<size_t, size_t> interval,
993  const CompilationOptions& co,
994  const ExecutionOptions& eo,
995  RenderInfo* render_info,
996  const int64_t queue_time_ms) {
998  executor_->setCatalog(&cat_);
999  executor_->temporary_tables_ = &temporary_tables_;
1001  time(&now_);
1002  for (size_t i = interval.first; i < interval.second; i++) {
1003  // only render on the last step
1004  try {
1005  executeRelAlgStep(seq,
1006  i,
1007  co,
1008  eo,
1009  (i == interval.second - 1) ? render_info : nullptr,
1010  queue_time_ms);
1011  } catch (const QueryMustRunOnCpu&) {
1012  // Do not allow per-step retry if flag is off or in distributed mode
1013  // TODO(todd): Determine if and when we can relax this restriction
1014  // for distributed
1017  throw;
1018  }
1019  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1020  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1021  if (render_info && i == interval.second - 1) {
1022  render_info->setForceNonInSituData();
1023  }
1024  executeRelAlgStep(seq,
1025  i,
1026  co_cpu,
1027  eo,
1028  (i == interval.second - 1) ? render_info : nullptr,
1029  queue_time_ms);
1030  }
1031  }
1032 
1033  return seq.getDescriptor(interval.second - 1)->getResult();
1034 }
1035 
1037  const size_t step_idx,
1038  const CompilationOptions& co,
1039  const ExecutionOptions& eo,
1040  RenderInfo* render_info,
1041  const int64_t queue_time_ms) {
1043  auto timer = DEBUG_TIMER(__func__);
1044  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1045  CHECK(exec_desc_ptr);
1046  auto& exec_desc = *exec_desc_ptr;
1047  const auto body = exec_desc.getBody();
1048  if (body->isNop()) {
1049  handleNop(exec_desc);
1050  return;
1051  }
1052 
1053  const ExecutionOptions eo_work_unit{
1055  eo.keep_result,
1056  eo.allow_multifrag,
1057  eo.just_explain,
1058  eo.allow_loop_joins,
1059  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
1060  eo.jit_debug,
1061  eo.just_validate,
1070  eo.executor_type,
1071  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
1072 
1073  auto handle_hint = [co,
1074  eo_work_unit,
1075  body,
1076  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1077  ExecutionOptions eo_hint_applied = eo_work_unit;
1078  CompilationOptions co_hint_applied = co;
1079  auto target_node = body;
1080  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1081  target_node = sort_body->getInput(0);
1082  }
1083  auto query_hints = getParsedQueryHint(target_node);
1084  auto columnar_output_hint_enabled = false;
1085  auto rowwise_output_hint_enabled = false;
1086  if (query_hints) {
1087  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1088  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1089  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1090  }
1091  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1092  if (!g_enable_data_recycler) {
1093  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1094  "recycler is disabled";
1095  }
1097  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1098  "resultset recycler is disabled";
1099  } else {
1100  VLOG(1) << "A user enables keeping query resultset";
1101  eo_hint_applied.keep_result = true;
1102  }
1103  }
1104  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1105  // we use this hint within the function 'executeTableFunction`
1106  if (!g_enable_data_recycler) {
1107  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1108  "since data recycler is disabled";
1109  }
1111  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1112  "since query resultset recycler is disabled";
1113  } else {
1114  VLOG(1) << "A user enables keeping table function's resultset";
1115  eo_hint_applied.keep_result = true;
1116  }
1117  }
1118  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1119  VLOG(1) << "A user forces the query to run with columnar output";
1120  columnar_output_hint_enabled = true;
1121  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1122  VLOG(1) << "A user forces the query to run with rowwise output";
1123  rowwise_output_hint_enabled = true;
1124  }
1125  }
1126  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1127  ? !rowwise_output_hint_enabled
1128  : columnar_output_hint_enabled;
1129  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1130  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1131  "output layout in distributed mode.";
1132  }
1133  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1134  return std::make_pair(co_hint_applied, eo_hint_applied);
1135  };
1136 
1137  auto hint_applied = handle_hint();
1139 
1140  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1141  if (auto cached_resultset =
1142  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1143  body->getQueryPlanDagHash())) {
1144  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1145  << " from resultset cache";
1146  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1147  if (render_info) {
1148  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1149  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1150  body->getQueryPlanDagHash());
1151  std::vector<Analyzer::Expr*> copied_target_exprs;
1152  for (const auto& expr : cached_target_exprs) {
1153  copied_target_exprs.push_back(expr.get());
1154  }
1156  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1157  }
1158  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1159  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1160  return;
1161  }
1162  }
1163 
1164  const auto compound = dynamic_cast<const RelCompound*>(body);
1165  if (compound) {
1166  if (compound->isDeleteViaSelect()) {
1167  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1168  } else if (compound->isUpdateViaSelect()) {
1169  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1170  } else {
1171  exec_desc.setResult(executeCompound(
1172  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1173  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1174  << static_cast<int>(-compound->getId()) << ", ...)"
1175  << " exec_desc.getResult().getDataPtr()->rowCount()="
1176  << exec_desc.getResult().getDataPtr()->rowCount();
1177  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1178  return;
1179  }
1180  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1181  }
1182  return;
1183  }
1184  const auto project = dynamic_cast<const RelProject*>(body);
1185  if (project) {
1186  if (project->isDeleteViaSelect()) {
1187  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1188  } else if (project->isUpdateViaSelect()) {
1189  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1190  } else {
1191  std::optional<size_t> prev_count;
1192  // Disabling the intermediate count optimization in distributed, as the previous
1193  // execution descriptor will likely not hold the aggregated result.
1194  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1195  // If the previous node produced a reliable count, skip the pre-flight count.
1196  RelAlgNode const* const prev_body = project->getInput(0);
1197  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1198  if (RaExecutionDesc const* const prev_exec_desc =
1199  prev_body->hasContextData()
1200  ? prev_body->getContextData()
1201  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1202  const auto& prev_exe_result = prev_exec_desc->getResult();
1203  const auto prev_result = prev_exe_result.getRows();
1204  if (prev_result) {
1205  prev_count = prev_result->rowCount();
1206  VLOG(3) << "Setting output row count for projection node to previous node ("
1207  << prev_exec_desc->getBody()->toString(
1209  << ") to " << *prev_count;
1210  }
1211  }
1212  }
1213  }
1214  exec_desc.setResult(executeProject(project,
1215  hint_applied.first,
1216  hint_applied.second,
1217  render_info,
1218  queue_time_ms,
1219  prev_count));
1220  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1221  << static_cast<int>(-project->getId()) << ", ...)"
1222  << " exec_desc.getResult().getDataPtr()->rowCount()="
1223  << exec_desc.getResult().getDataPtr()->rowCount();
1224  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1225  return;
1226  }
1227  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1228  }
1229  return;
1230  }
1231  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1232  if (aggregate) {
1233  exec_desc.setResult(executeAggregate(
1234  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1235  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1236  return;
1237  }
1238  const auto filter = dynamic_cast<const RelFilter*>(body);
1239  if (filter) {
1240  exec_desc.setResult(executeFilter(
1241  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1242  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1243  return;
1244  }
1245  const auto sort = dynamic_cast<const RelSort*>(body);
1246  if (sort) {
1247  exec_desc.setResult(executeSort(
1248  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1249  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1250  return;
1251  }
1252  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1253  return;
1254  }
1255  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1256  if (logical_values) {
1257  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1258  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1259  return;
1260  }
1261  const auto modify = dynamic_cast<const RelModify*>(body);
1262  if (modify) {
1263  exec_desc.setResult(executeModify(modify, hint_applied.second));
1264  return;
1265  }
1266  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1267  if (logical_union) {
1268  exec_desc.setResult(executeUnion(logical_union,
1269  seq,
1270  hint_applied.first,
1271  hint_applied.second,
1272  render_info,
1273  queue_time_ms));
1274  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1275  return;
1276  }
1277  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1278  if (table_func) {
1279  exec_desc.setResult(executeTableFunction(
1280  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1281  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1282  return;
1283  }
1284  LOG(FATAL) << "Unhandled body type: "
1285  << body->toString(RelRexToStringConfig::defaults());
1286 }
1287 
1289  // just set the result of the previous node as the result of no op
1290  auto body = ed.getBody();
1291  CHECK(dynamic_cast<const RelAggregate*>(body));
1292  CHECK_EQ(size_t(1), body->inputCount());
1293  const auto input = body->getInput(0);
1294  body->setOutputMetainfo(input->getOutputMetainfo());
1295  const auto it = temporary_tables_.find(-input->getId());
1296  CHECK(it != temporary_tables_.end());
1297  // set up temp table as it could be used by the outer query or next step
1298  addTemporaryTable(-body->getId(), it->second);
1299 
1300  ed.setResult({it->second, input->getOutputMetainfo()});
1301 }
1302 
1303 namespace {
1304 
1305 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1306  public:
1308 
1309  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1310  return synthesized_physical_inputs_owned;
1311  }
1312 
1313  std::unordered_set<const RexInput*> visitInput(
1314  const RexInput* rex_input) const override {
1315  const auto input_ra = rex_input->getSourceNode();
1316  CHECK(input_ra);
1317  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1318  if (scan_ra) {
1319  const auto td = scan_ra->getTableDescriptor();
1320  if (td) {
1321  const auto col_id = rex_input->getIndex();
1322  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
1323  if (cd && cd->columnType.get_physical_cols() > 0) {
1324  CHECK(IS_GEO(cd->columnType.get_type()));
1325  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1326  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1327  auto physical_input =
1328  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1329  synthesized_physical_inputs_owned.emplace_back(physical_input);
1330  synthesized_physical_inputs.insert(physical_input);
1331  }
1332  return synthesized_physical_inputs;
1333  }
1334  }
1335  }
1336  return {rex_input};
1337  }
1338 
1339  protected:
1340  std::unordered_set<const RexInput*> aggregateResult(
1341  const std::unordered_set<const RexInput*>& aggregate,
1342  const std::unordered_set<const RexInput*>& next_result) const override {
1343  auto result = aggregate;
1344  result.insert(next_result.begin(), next_result.end());
1345  return result;
1346  }
1347 
1348  private:
1349  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1351 };
1352 
1353 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1354  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1355  return table_func;
1356  }
1357  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1358  CHECK_EQ(size_t(2), join->inputCount());
1359  return join;
1360  }
1361  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1362  CHECK_EQ(size_t(1), ra_node->inputCount());
1363  }
1364  auto only_src = ra_node->getInput(0);
1365  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1366  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1367  return is_join ? only_src : ra_node;
1368 }
1369 
1370 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1372  RexUsedInputsVisitor visitor(cat);
1373  const auto filter_expr = compound->getFilterExpr();
1374  std::unordered_set<const RexInput*> used_inputs =
1375  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1376  const auto sources_size = compound->getScalarSourcesSize();
1377  for (size_t i = 0; i < sources_size; ++i) {
1378  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1379  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1380  }
1381  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1382  return std::make_pair(used_inputs, used_inputs_owned);
1383 }
1384 
1385 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1387  CHECK_EQ(size_t(1), aggregate->inputCount());
1388  std::unordered_set<const RexInput*> used_inputs;
1389  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1390  const auto source = aggregate->getInput(0);
1391  const auto& in_metainfo = source->getOutputMetainfo();
1392  const auto group_count = aggregate->getGroupByCount();
1393  CHECK_GE(in_metainfo.size(), group_count);
1394  for (size_t i = 0; i < group_count; ++i) {
1395  auto synthesized_used_input = new RexInput(source, i);
1396  used_inputs_owned.emplace_back(synthesized_used_input);
1397  used_inputs.insert(synthesized_used_input);
1398  }
1399  for (const auto& agg_expr : aggregate->getAggExprs()) {
1400  for (size_t i = 0; i < agg_expr->size(); ++i) {
1401  const auto operand_idx = agg_expr->getOperand(i);
1402  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1403  auto synthesized_used_input = new RexInput(source, operand_idx);
1404  used_inputs_owned.emplace_back(synthesized_used_input);
1405  used_inputs.insert(synthesized_used_input);
1406  }
1407  }
1408  return std::make_pair(used_inputs, used_inputs_owned);
1409 }
1410 
1411 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1413  RexUsedInputsVisitor visitor(cat);
1414  std::unordered_set<const RexInput*> used_inputs;
1415  for (size_t i = 0; i < project->size(); ++i) {
1416  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1417  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1418  }
1419  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1420  return std::make_pair(used_inputs, used_inputs_owned);
1421 }
1422 
1423 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1426  RexUsedInputsVisitor visitor(cat);
1427  std::unordered_set<const RexInput*> used_inputs;
1428  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1429  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1430  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1431  }
1432  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1433  return std::make_pair(used_inputs, used_inputs_owned);
1434 }
1435 
1436 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1438  std::unordered_set<const RexInput*> used_inputs;
1439  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1440  const auto data_sink_node = get_data_sink(filter);
1441  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1442  const auto source = data_sink_node->getInput(nest_level);
1443  const auto scan_source = dynamic_cast<const RelScan*>(source);
1444  if (scan_source) {
1445  CHECK(source->getOutputMetainfo().empty());
1446  for (size_t i = 0; i < scan_source->size(); ++i) {
1447  auto synthesized_used_input = new RexInput(scan_source, i);
1448  used_inputs_owned.emplace_back(synthesized_used_input);
1449  used_inputs.insert(synthesized_used_input);
1450  }
1451  } else {
1452  const auto& partial_in_metadata = source->getOutputMetainfo();
1453  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1454  auto synthesized_used_input = new RexInput(source, i);
1455  used_inputs_owned.emplace_back(synthesized_used_input);
1456  used_inputs.insert(synthesized_used_input);
1457  }
1458  }
1459  }
1460  return std::make_pair(used_inputs, used_inputs_owned);
1461 }
1462 
1463 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1465  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1466  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1467  used_inputs_owned.reserve(logical_union->inputCount());
1468  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1469  auto const n_inputs = logical_union->inputCount();
1470  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1471  auto input = logical_union->getInput(nest_level);
1472  for (size_t i = 0; i < input->size(); ++i) {
1473  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1474  used_inputs.insert(used_inputs_owned.back().get());
1475  }
1476  }
1477  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1478 }
1479 
1480 int table_id_from_ra(const RelAlgNode* ra_node) {
1481  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1482  if (scan_ra) {
1483  const auto td = scan_ra->getTableDescriptor();
1484  CHECK(td);
1485  return td->tableId;
1486  }
1487  return -ra_node->getId();
1488 }
1489 
1490 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1491  const RelAlgNode* ra_node,
1492  const std::vector<size_t>& input_permutation) {
1493  const auto data_sink_node = get_data_sink(ra_node);
1494  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1495  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1496  const auto input_node_idx =
1497  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1498  const auto input_ra = data_sink_node->getInput(input_node_idx);
1499  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1500  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1501  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1502  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1503  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1504  CHECK(it_ok.second);
1505  LOG_IF(INFO, !input_permutation.empty())
1506  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1507  << " to nest level " << input_idx;
1508  }
1509  return input_to_nest_level;
1510 }
1511 
1512 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1515  const auto data_sink_node = get_data_sink(ra_node);
1516  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1517  CHECK_EQ(join->inputCount(), 2u);
1518  const auto condition = join->getCondition();
1519  RexUsedInputsVisitor visitor(cat);
1520  auto condition_inputs = visitor.visit(condition);
1521  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1522  visitor.get_inputs_owned());
1523  return std::make_pair(condition_inputs, condition_inputs_owned);
1524  }
1525 
1526  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1527  CHECK_GE(left_deep_join->inputCount(), 2u);
1528  const auto condition = left_deep_join->getInnerCondition();
1529  RexUsedInputsVisitor visitor(cat);
1530  auto result = visitor.visit(condition);
1531  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1532  ++nesting_level) {
1533  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1534  if (outer_condition) {
1535  const auto outer_result = visitor.visit(outer_condition);
1536  result.insert(outer_result.begin(), outer_result.end());
1537  }
1538  }
1539  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1540  return std::make_pair(result, used_inputs_owned);
1541  }
1542 
1543  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1544  CHECK_GT(ra_node->inputCount(), 1u)
1546  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1547  // no-op
1548  CHECK_GE(ra_node->inputCount(), 0u)
1550  } else {
1551  CHECK_EQ(ra_node->inputCount(), 1u)
1553  }
1554  return std::make_pair(std::unordered_set<const RexInput*>{},
1555  std::vector<std::shared_ptr<RexInput>>{});
1556 }
1557 
1559  std::vector<InputDescriptor>& input_descs,
1561  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1562  const RelAlgNode* ra_node,
1563  const std::unordered_set<const RexInput*>& source_used_inputs,
1564  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1565  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1566  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1567  << " source_used_inputs.size()=" << source_used_inputs.size();
1568  for (const auto used_input : source_used_inputs) {
1569  const auto input_ra = used_input->getSourceNode();
1570  const int table_id = table_id_from_ra(input_ra);
1571  const auto col_id = used_input->getIndex();
1572  auto it = input_to_nest_level.find(input_ra);
1573  if (it != input_to_nest_level.end()) {
1574  const int input_desc = it->second;
1575  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1576  dynamic_cast<const RelScan*>(input_ra)
1577  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1578  : col_id,
1579  table_id,
1580  input_desc));
1581  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1582  throw std::runtime_error("Bushy joins not supported");
1583  }
1584  }
1585 }
1586 
1587 template <class RA>
1588 std::pair<std::vector<InputDescriptor>,
1589  std::list<std::shared_ptr<const InputColDescriptor>>>
1590 get_input_desc_impl(const RA* ra_node,
1591  const std::unordered_set<const RexInput*>& used_inputs,
1592  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1593  const std::vector<size_t>& input_permutation,
1595  std::vector<InputDescriptor> input_descs;
1596  const auto data_sink_node = get_data_sink(ra_node);
1597  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1598  const auto input_node_idx =
1599  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1600  auto input_ra = data_sink_node->getInput(input_node_idx);
1601  const int table_id = table_id_from_ra(input_ra);
1602  input_descs.emplace_back(table_id, input_idx);
1603  }
1604  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1605  collect_used_input_desc(input_descs,
1606  cat,
1607  input_col_descs_unique, // modified
1608  ra_node,
1609  used_inputs,
1610  input_to_nest_level);
1611  std::unordered_set<const RexInput*> join_source_used_inputs;
1612  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1613  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1614  get_join_source_used_inputs(ra_node, cat);
1615  collect_used_input_desc(input_descs,
1616  cat,
1617  input_col_descs_unique, // modified
1618  ra_node,
1619  join_source_used_inputs,
1620  input_to_nest_level);
1621  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1622  input_col_descs_unique.begin(), input_col_descs_unique.end());
1623 
1624  std::sort(input_col_descs.begin(),
1625  input_col_descs.end(),
1626  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1627  std::shared_ptr<const InputColDescriptor> const& rhs) {
1628  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1629  lhs->getColId(),
1630  lhs->getScanDesc().getTableId()) <
1631  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1632  rhs->getColId(),
1633  rhs->getScanDesc().getTableId());
1634  });
1635  return {input_descs,
1636  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1637  input_col_descs.end())};
1638 }
1639 
1640 template <class RA>
1641 std::tuple<std::vector<InputDescriptor>,
1642  std::list<std::shared_ptr<const InputColDescriptor>>,
1643  std::vector<std::shared_ptr<RexInput>>>
1644 get_input_desc(const RA* ra_node,
1645  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1646  const std::vector<size_t>& input_permutation,
1647  const Catalog_Namespace::Catalog& cat) {
1648  std::unordered_set<const RexInput*> used_inputs;
1649  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1650  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1651  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1652  auto input_desc_pair = get_input_desc_impl(
1653  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1654  return std::make_tuple(
1655  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1656 }
1657 
1658 size_t get_scalar_sources_size(const RelCompound* compound) {
1659  return compound->getScalarSourcesSize();
1660 }
1661 
1662 size_t get_scalar_sources_size(const RelProject* project) {
1663  return project->size();
1664 }
1665 
1666 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1667  return table_func->getTableFuncInputsSize();
1668 }
1669 
1670 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1671  return compound->getScalarSource(i);
1672 }
1673 
1674 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1675  return project->getProjectAt(i);
1676 }
1677 
1678 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1679  return table_func->getTableFuncInputAt(i);
1680 }
1681 
1682 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1683  const std::shared_ptr<Analyzer::Expr> expr) {
1684  const auto& ti = expr->get_type_info();
1685  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1686  return expr;
1687  }
1688  auto transient_dict_ti = ti;
1689  transient_dict_ti.set_compression(kENCODING_DICT);
1690  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1691  transient_dict_ti.set_fixed_size();
1692  return expr->add_cast(transient_dict_ti);
1693 }
1694 
1696  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1697  const std::shared_ptr<Analyzer::Expr>& expr) {
1698  try {
1699  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1700  } catch (...) {
1701  scalar_sources.push_back(fold_expr(expr.get()));
1702  }
1703 }
1704 
1705 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1706  const std::shared_ptr<Analyzer::Expr>& input) {
1707  const auto& input_ti = input->get_type_info();
1708  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1709  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1710  }
1711  return input;
1712 }
1713 
1714 template <class RA>
1715 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1716  const RA* ra_node,
1717  const RelAlgTranslator& translator,
1718  const ::ExecutorType executor_type) {
1719  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1720  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1721  VLOG(3) << "get_scalar_sources_size("
1722  << ra_node->toString(RelRexToStringConfig::defaults())
1723  << ") = " << scalar_sources_size;
1724  for (size_t i = 0; i < scalar_sources_size; ++i) {
1725  const auto scalar_rex = scalar_at(i, ra_node);
1726  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1727  // RexRef are synthetic scalars we append at the end of the real ones
1728  // for the sake of taking memory ownership, no real work needed here.
1729  continue;
1730  }
1731 
1732  const auto scalar_expr =
1733  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1734  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1735  if (executor_type == ExecutorType::Native) {
1736  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1737  } else if (executor_type == ExecutorType::TableFunctions) {
1738  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1739  } else {
1740  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1741  }
1742  }
1743 
1744  return scalar_sources;
1745 }
1746 
1747 template <class RA>
1748 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1749  const RA* ra_node,
1750  const RelAlgTranslator& translator,
1751  int32_t tableId,
1752  const Catalog_Namespace::Catalog& cat,
1753  const ColumnNameList& colNames,
1754  size_t starting_projection_column_idx) {
1755  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1756  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1757  const auto scalar_rex = scalar_at(i, ra_node);
1758  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1759  // RexRef are synthetic scalars we append at the end of the real ones
1760  // for the sake of taking memory ownership, no real work needed here.
1761  continue;
1762  }
1763 
1764  std::shared_ptr<Analyzer::Expr> translated_expr;
1765  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1766  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1767  tableId,
1768  cat,
1769  colNames[i - starting_projection_column_idx]);
1770  } else {
1771  translated_expr = translator.translateScalarRex(scalar_rex);
1772  }
1773  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1774  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1775  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1776  }
1777 
1778  return scalar_sources;
1779 }
1780 
1781 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1782  const RelCompound* compound,
1783  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1784  if (!compound->isAggregate()) {
1785  return {nullptr};
1786  }
1787  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1788  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1789  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1790  }
1791  return groupby_exprs;
1792 }
1793 
1794 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1795  const RelAggregate* aggregate,
1796  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1797  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1798  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1799  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1800  }
1801  return groupby_exprs;
1802 }
1803 
1805  const RelAlgTranslator& translator) {
1806  const auto filter_rex = compound->getFilterExpr();
1807  const auto filter_expr =
1808  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1809  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1811 }
1812 
1813 namespace {
1814 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1815 // bother decoding it. This is done by changing the sql type to an integer.
1816 void conditionally_change_arg_to_int_type(std::shared_ptr<Analyzer::Expr>& target_expr) {
1817  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1818  CHECK(agg_expr);
1819  if (agg_expr->get_is_distinct()) {
1820  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1821  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1822  target_expr = target_expr->deep_copy();
1823  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1825  }
1826  }
1827 }
1828 } // namespace
1829 
1830 std::vector<Analyzer::Expr*> translate_targets(
1831  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1832  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1833  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1834  const RelCompound* compound,
1835  const RelAlgTranslator& translator,
1836  const ExecutorType executor_type) {
1837  std::vector<Analyzer::Expr*> target_exprs;
1838  for (size_t i = 0; i < compound->size(); ++i) {
1839  const auto target_rex = compound->getTargetExpr(i);
1840  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1841  std::shared_ptr<Analyzer::Expr> target_expr;
1842  if (target_rex_agg) {
1843  target_expr =
1844  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1846  } else {
1847  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1848  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1849  if (target_rex_ref) {
1850  const auto ref_idx = target_rex_ref->getIndex();
1851  CHECK_GE(ref_idx, size_t(1));
1852  CHECK_LE(ref_idx, groupby_exprs.size());
1853  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1854  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1855  } else {
1856  target_expr = translator.translateScalarRex(target_rex_scalar);
1857  auto rewritten_expr = rewrite_expr(target_expr.get());
1858  target_expr = fold_expr(rewritten_expr.get());
1859  if (executor_type == ExecutorType::Native) {
1860  try {
1861  target_expr = set_transient_dict(target_expr);
1862  } catch (...) {
1863  // noop
1864  }
1865  } else {
1866  target_expr = cast_dict_to_none(target_expr);
1867  }
1868  }
1869  }
1870  CHECK(target_expr);
1871  target_exprs_owned.push_back(target_expr);
1872  target_exprs.push_back(target_expr.get());
1873  }
1874  return target_exprs;
1875 }
1876 
1877 std::vector<Analyzer::Expr*> translate_targets(
1878  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1879  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1880  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1881  const RelAggregate* aggregate,
1882  const RelAlgTranslator& translator) {
1883  std::vector<Analyzer::Expr*> target_exprs;
1884  size_t group_key_idx = 1;
1885  for (const auto& groupby_expr : groupby_exprs) {
1886  auto target_expr =
1887  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1888  target_exprs_owned.push_back(target_expr);
1889  target_exprs.push_back(target_expr.get());
1890  }
1891 
1892  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1893  auto target_expr =
1894  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1895  CHECK(target_expr);
1896  target_expr = fold_expr(target_expr.get());
1897  target_exprs_owned.push_back(target_expr);
1898  target_exprs.push_back(target_expr.get());
1899  }
1900  return target_exprs;
1901 }
1902 
1904  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1905  return agg_expr && agg_expr->get_is_distinct();
1906 }
1907 
1908 bool is_agg(const Analyzer::Expr* expr) {
1909  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1910  if (agg_expr && agg_expr->get_contains_agg()) {
1911  auto agg_type = agg_expr->get_aggtype();
1912  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1913  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1914  return true;
1915  }
1916  }
1917  return false;
1918 }
1919 
1921  if (is_count_distinct(&expr)) {
1922  return SQLTypeInfo(kBIGINT, false);
1923  } else if (is_agg(&expr)) {
1925  }
1926  return get_logical_type_info(expr.get_type_info());
1927 }
1928 
1929 template <class RA>
1930 std::vector<TargetMetaInfo> get_targets_meta(
1931  const RA* ra_node,
1932  const std::vector<Analyzer::Expr*>& target_exprs) {
1933  std::vector<TargetMetaInfo> targets_meta;
1934  CHECK_EQ(ra_node->size(), target_exprs.size());
1935  for (size_t i = 0; i < ra_node->size(); ++i) {
1936  CHECK(target_exprs[i]);
1937  // TODO(alex): remove the count distinct type fixup.
1938  targets_meta.emplace_back(ra_node->getFieldName(i),
1939  get_logical_type_for_expr(*target_exprs[i]),
1940  target_exprs[i]->get_type_info());
1941  }
1942  return targets_meta;
1943 }
1944 
1945 template <>
1946 std::vector<TargetMetaInfo> get_targets_meta(
1947  const RelFilter* filter,
1948  const std::vector<Analyzer::Expr*>& target_exprs) {
1949  RelAlgNode const* input0 = filter->getInput(0);
1950  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1951  return get_targets_meta(input, target_exprs);
1952  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1953  return get_targets_meta(input, target_exprs);
1954  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1955  return get_targets_meta(input, target_exprs);
1956  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1957  return get_targets_meta(input, target_exprs);
1958  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1959  return get_targets_meta(input, target_exprs);
1960  }
1961  UNREACHABLE() << "Unhandled node type: "
1963  return {};
1964 }
1965 
1966 } // namespace
1967 
1969  const CompilationOptions& co_in,
1970  const ExecutionOptions& eo_in,
1971  const int64_t queue_time_ms) {
1972  CHECK(node);
1973  auto timer = DEBUG_TIMER(__func__);
1974 
1975  auto co = co_in;
1976  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1977  // encoded string updates
1978 
1979  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1980  auto& work_unit,
1981  const bool is_aggregate) {
1982  auto table_descriptor = node->getModifiedTableDescriptor();
1983  CHECK(table_descriptor);
1984  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1985  throw std::runtime_error(
1986  "UPDATE queries involving variable length columns are only supported on tables "
1987  "with the vacuum attribute set to 'delayed'");
1988  }
1989 
1990  Executor::clearExternalCaches(true, table_descriptor, cat_.getDatabaseId());
1991 
1992  auto updated_table_desc = node->getModifiedTableDescriptor();
1994  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1995  node->getTargetColumns(),
1996  node->getOutputMetainfo(),
1997  node->isVarlenUpdateRequired());
1998 
1999  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2000 
2001  auto execute_update_ra_exe_unit =
2002  [this, &co, &eo_in, &table_infos, &updated_table_desc](
2003  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2005 
2006  auto eo = eo_in;
2007  if (dml_transaction_parameters_->tableIsTemporary()) {
2008  eo.output_columnar_hint = true;
2009  co_project.allow_lazy_fetch = false;
2010  co_project.filter_on_deleted_column =
2011  false; // project the entire delete column for columnar update
2012  }
2013 
2014  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2016  CHECK(update_transaction_parameters);
2017  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2018  try {
2019  auto table_update_metadata =
2020  executor_->executeUpdate(ra_exe_unit,
2021  table_infos,
2022  updated_table_desc,
2023  co_project,
2024  eo,
2025  cat_,
2026  executor_->row_set_mem_owner_,
2027  update_callback,
2028  is_aggregate);
2029  post_execution_callback_ = [table_update_metadata, this]() {
2030  dml_transaction_parameters_->finalizeTransaction(cat_);
2031  TableOptimizer table_optimizer{
2032  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2033  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2034  };
2035  } catch (const QueryExecutionError& e) {
2036  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2037  }
2038  };
2039 
2040  if (dml_transaction_parameters_->tableIsTemporary()) {
2041  // hold owned target exprs during execution if rewriting
2042  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2043  // rewrite temp table updates to generate the full column by moving the where
2044  // clause into a case if such a rewrite is not possible, bail on the update
2045  // operation build an expr for the update target
2046  auto update_transaction_params =
2048  CHECK(update_transaction_params);
2049  const auto td = update_transaction_params->getTableDescriptor();
2050  CHECK(td);
2051  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2052  if (update_column_names.size() > 1) {
2053  throw std::runtime_error(
2054  "Multi-column update is not yet supported for temporary tables.");
2055  }
2056 
2057  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
2058  CHECK(cd);
2059  auto projected_column_to_update =
2060  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
2061  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2062  work_unit.exe_unit, projected_column_to_update);
2063  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2064  throw std::runtime_error(
2065  "Variable length updates not yet supported on temporary tables.");
2066  }
2067  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2068  } else {
2069  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2070  }
2071  };
2072 
2073  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2074  auto work_unit =
2075  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2076 
2077  execute_update_for_node(compound, work_unit, compound->isAggregate());
2078  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2079  auto work_unit =
2080  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2081 
2082  if (project->isSimple()) {
2083  CHECK_EQ(size_t(1), project->inputCount());
2084  const auto input_ra = project->getInput(0);
2085  if (dynamic_cast<const RelSort*>(input_ra)) {
2086  const auto& input_table =
2087  get_temporary_table(&temporary_tables_, -input_ra->getId());
2088  CHECK(input_table);
2089  work_unit.exe_unit.scan_limit = input_table->rowCount();
2090  }
2091  }
2092 
2093  execute_update_for_node(project, work_unit, false);
2094  } else {
2095  throw std::runtime_error("Unsupported parent node for update: " +
2096  node->toString(RelRexToStringConfig::defaults()));
2097  }
2098 }
2099 
2101  const CompilationOptions& co,
2102  const ExecutionOptions& eo_in,
2103  const int64_t queue_time_ms) {
2104  CHECK(node);
2105  auto timer = DEBUG_TIMER(__func__);
2106 
2107  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2108  auto& work_unit,
2109  const bool is_aggregate) {
2110  auto* table_descriptor = node->getModifiedTableDescriptor();
2111  CHECK(table_descriptor);
2112  if (!table_descriptor->hasDeletedCol) {
2113  throw std::runtime_error(
2114  "DELETE queries are only supported on tables with the vacuum attribute set to "
2115  "'delayed'");
2116  }
2117 
2118  Executor::clearExternalCaches(false, table_descriptor, cat_.getDatabaseId());
2119 
2120  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2121 
2122  auto execute_delete_ra_exe_unit =
2123  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
2124  const bool is_aggregate) {
2126  std::make_unique<DeleteTransactionParameters>(table_descriptor);
2127  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2129  CHECK(delete_params);
2130  auto delete_callback = yieldDeleteCallback(*delete_params);
2132 
2133  auto eo = eo_in;
2134  if (dml_transaction_parameters_->tableIsTemporary()) {
2135  eo.output_columnar_hint = true;
2136  co_delete.filter_on_deleted_column =
2137  false; // project the entire delete column for columnar update
2138  } else {
2139  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2140  }
2141 
2142  try {
2143  auto table_update_metadata =
2144  executor_->executeUpdate(exe_unit,
2145  table_infos,
2146  table_descriptor,
2147  co_delete,
2148  eo,
2149  cat_,
2150  executor_->row_set_mem_owner_,
2151  delete_callback,
2152  is_aggregate);
2153  post_execution_callback_ = [table_update_metadata, this]() {
2154  dml_transaction_parameters_->finalizeTransaction(cat_);
2155  TableOptimizer table_optimizer{
2156  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2157  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2158  };
2159  } catch (const QueryExecutionError& e) {
2160  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2161  }
2162  };
2163 
2164  if (table_is_temporary(table_descriptor)) {
2165  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2166  auto cd = cat_.getDeletedColumn(table_descriptor);
2167  CHECK(cd);
2168  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2169  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2170  const auto rewritten_exe_unit =
2171  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2172  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2173  } else {
2174  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2175  }
2176  };
2177 
2178  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2179  const auto work_unit =
2180  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2181  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2182  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2183  auto work_unit =
2184  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2185  if (project->isSimple()) {
2186  CHECK_EQ(size_t(1), project->inputCount());
2187  const auto input_ra = project->getInput(0);
2188  if (dynamic_cast<const RelSort*>(input_ra)) {
2189  const auto& input_table =
2190  get_temporary_table(&temporary_tables_, -input_ra->getId());
2191  CHECK(input_table);
2192  work_unit.exe_unit.scan_limit = input_table->rowCount();
2193  }
2194  }
2195  execute_delete_for_node(project, work_unit, false);
2196  } else {
2197  throw std::runtime_error("Unsupported parent node for delete: " +
2198  node->toString(RelRexToStringConfig::defaults()));
2199  }
2200 }
2201 
2203  const CompilationOptions& co,
2204  const ExecutionOptions& eo,
2205  RenderInfo* render_info,
2206  const int64_t queue_time_ms) {
2207  auto timer = DEBUG_TIMER(__func__);
2208  const auto work_unit =
2209  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2210  CompilationOptions co_compound = co;
2211  return executeWorkUnit(work_unit,
2212  compound->getOutputMetainfo(),
2213  compound->isAggregate(),
2214  co_compound,
2215  eo,
2216  render_info,
2217  queue_time_ms);
2218 }
2219 
2221  const CompilationOptions& co,
2222  const ExecutionOptions& eo,
2223  RenderInfo* render_info,
2224  const int64_t queue_time_ms) {
2225  auto timer = DEBUG_TIMER(__func__);
2226  const auto work_unit = createAggregateWorkUnit(
2227  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2228  return executeWorkUnit(work_unit,
2229  aggregate->getOutputMetainfo(),
2230  true,
2231  co,
2232  eo,
2233  render_info,
2234  queue_time_ms);
2235 }
2236 
2237 namespace {
2238 
2239 // Returns true iff the execution unit contains window functions.
2241  return std::any_of(ra_exe_unit.target_exprs.begin(),
2242  ra_exe_unit.target_exprs.end(),
2243  [](const Analyzer::Expr* expr) {
2244  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2245  });
2246 }
2247 
2248 } // namespace
2249 
2251  const RelProject* project,
2252  const CompilationOptions& co,
2253  const ExecutionOptions& eo,
2254  RenderInfo* render_info,
2255  const int64_t queue_time_ms,
2256  const std::optional<size_t> previous_count) {
2257  auto timer = DEBUG_TIMER(__func__);
2258  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2259  CompilationOptions co_project = co;
2260  if (project->isSimple()) {
2261  CHECK_EQ(size_t(1), project->inputCount());
2262  const auto input_ra = project->getInput(0);
2263  if (dynamic_cast<const RelSort*>(input_ra)) {
2264  co_project.device_type = ExecutorDeviceType::CPU;
2265  const auto& input_table =
2266  get_temporary_table(&temporary_tables_, -input_ra->getId());
2267  CHECK(input_table);
2268  work_unit.exe_unit.scan_limit =
2269  std::min(input_table->getLimit(), input_table->rowCount());
2270  }
2271  }
2272  return executeWorkUnit(work_unit,
2273  project->getOutputMetainfo(),
2274  false,
2275  co_project,
2276  eo,
2277  render_info,
2278  queue_time_ms,
2279  previous_count);
2280 }
2281 
2283  const CompilationOptions& co_in,
2284  const ExecutionOptions& eo,
2285  const int64_t queue_time_ms) {
2287  auto timer = DEBUG_TIMER(__func__);
2288 
2289  auto co = co_in;
2290 
2291  if (g_cluster) {
2292  throw std::runtime_error("Table functions not supported in distributed mode yet");
2293  }
2294  if (!g_enable_table_functions) {
2295  throw std::runtime_error("Table function support is disabled");
2296  }
2297  auto table_func_work_unit = createTableFunctionWorkUnit(
2298  table_func,
2299  eo.just_explain,
2300  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2301  const auto body = table_func_work_unit.body;
2302  CHECK(body);
2303 
2304  const auto table_infos =
2305  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2306 
2307  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2308  co.device_type,
2310  nullptr,
2311  executor_->getCatalog(),
2312  executor_->blockSize(),
2313  executor_->gridSize()),
2314  {}};
2315 
2316  auto global_hint = getGlobalQueryHint();
2317  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2318  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2319  auto cached_resultset =
2320  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2321  table_func->getQueryPlanDagHash());
2322  if (cached_resultset) {
2323  VLOG(1) << "recycle table function's resultset of the root node "
2324  << table_func->getRelNodeDagId() << " from resultset cache";
2325  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2326  addTemporaryTable(-body->getId(), result.getDataPtr());
2327  return result;
2328  }
2329  }
2330 
2331  auto query_exec_time_begin = timer_start();
2332  try {
2333  result = {executor_->executeTableFunction(
2334  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
2335  body->getOutputMetainfo()};
2336  } catch (const QueryExecutionError& e) {
2339  throw std::runtime_error("Table function ran out of memory during execution");
2340  }
2341  auto query_exec_time = timer_stop(query_exec_time_begin);
2342  result.setQueueTime(queue_time_ms);
2343  auto resultset_ptr = result.getDataPtr();
2344  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2346  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2348  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2349  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2350  !hasStepForUnion()) {
2351  resultset_ptr->setExecTime(query_exec_time);
2352  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2353  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2354  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2355  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2356  if (allow_auto_caching_resultset) {
2357  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2358  }
2359  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2360  table_func_work_unit.exe_unit.query_plan_dag_hash,
2361  resultset_ptr->getInputTableKeys(),
2362  resultset_ptr,
2363  resultset_ptr->getBufferSizeBytes(co.device_type),
2365  } else {
2366  if (eo.keep_result) {
2367  if (g_cluster) {
2368  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2369  "support resultset recycling on distributed mode";
2370  } else if (hasStepForUnion()) {
2371  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2372  "has union-(all) operator";
2373  } else if (is_validate_or_explain_query(eo)) {
2374  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2375  "is either validate or explain query";
2376  } else {
2377  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2378  }
2379  }
2380  }
2381 
2382  return result;
2383 }
2384 
2385 namespace {
2386 
2387 // Creates a new expression which has the range table index set to 1. This is needed to
2388 // reuse the hash join construction helpers to generate a hash table for the window
2389 // function partition: create an equals expression with left and right sides identical
2390 // except for the range table index.
2391 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2392  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2393  if (tuple) {
2394  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2395  for (const auto& element : tuple->getTuple()) {
2396  transformed_tuple.push_back(transform_to_inner(element.get()));
2397  }
2398  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2399  }
2400  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2401  if (!col) {
2402  throw std::runtime_error("Only columns supported in the window partition for now");
2403  }
2404  return makeExpr<Analyzer::ColumnVar>(
2405  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2406 }
2407 
2408 } // namespace
2409 
2411  const CompilationOptions& co,
2412  const ExecutionOptions& eo,
2413  ColumnCacheMap& column_cache_map,
2414  const int64_t queue_time_ms) {
2415  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2416  CHECK_EQ(query_infos.size(), size_t(1));
2417  if (query_infos.front().info.fragments.size() != 1) {
2418  throw std::runtime_error(
2419  "Only single fragment tables supported for window functions for now");
2420  }
2421  if (eo.executor_type == ::ExecutorType::Extern) {
2422  return;
2423  }
2424  query_infos.push_back(query_infos.front());
2425  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2426  // a query may hold multiple window functions having the same partition by condition
2427  // then after building the first hash partition we can reuse it for the rest of
2428  // the window functions
2429  // here, a cached partition can be shared via multiple window function contexts as is
2430  // but sorted partition should be copied to reuse since we use it for (intermediate)
2431  // output buffer
2432  // todo (yoonmin) : support recycler for window function computation?
2433  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2434  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2435  sorted_partition_cache;
2436  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2437  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2438  window_function_context_map;
2439  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2440  ++target_index) {
2441  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2442  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2443  if (!window_func) {
2444  continue;
2445  }
2446  // Always use baseline layout hash tables for now, make the expression a tuple.
2447  const auto& partition_keys = window_func->getPartitionKeys();
2448  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2449  if (partition_keys.size() >= 1) {
2450  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2451  if (partition_keys.size() > 1) {
2452  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2453  } else {
2454  CHECK_EQ(partition_keys.size(), size_t(1));
2455  partition_key_tuple = partition_keys.front();
2456  }
2457  // Creates a tautology equality with the partition expression on both sides.
2458  partition_key_cond =
2459  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2460  kBW_EQ,
2461  kONE,
2462  partition_key_tuple,
2463  transform_to_inner(partition_key_tuple.get()));
2464  }
2465  auto context =
2466  createWindowFunctionContext(window_func,
2467  partition_key_cond /*nullptr if no partition key*/,
2468  partition_cache,
2469  sorted_partition_key_ref_count_map,
2470  work_unit,
2471  query_infos,
2472  co,
2473  column_cache_map,
2474  executor_->getRowSetMemoryOwner());
2475  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2476  }
2477 
2478  for (auto& kv : window_function_context_map) {
2479  kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2480  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2481  }
2482 }
2483 
2484 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2485  const Analyzer::WindowFunction* window_func,
2486  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2487  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2488  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2489  const WorkUnit& work_unit,
2490  const std::vector<InputTableInfo>& query_infos,
2491  const CompilationOptions& co,
2492  ColumnCacheMap& column_cache_map,
2493  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2494  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2495  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2498  std::unique_ptr<WindowFunctionContext> context;
2499  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2500  if (partition_key_cond) {
2501  auto partition_cond_str = partition_key_cond->toString();
2502  auto partition_key_hash = boost::hash_value(partition_cond_str);
2503  boost::hash_combine(partition_cache_key, partition_key_hash);
2504  std::shared_ptr<HashJoin> partition_ptr;
2505  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2506  if (cached_hash_table_it != partition_cache.end()) {
2507  partition_ptr = cached_hash_table_it->second;
2508  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2509  << partition_cache_key << ", partition condition: " << partition_cond_str
2510  << ")";
2511  } else {
2512  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2513  partition_key_cond,
2514  query_infos,
2515  memory_level,
2516  JoinType::INVALID, // for window function
2518  column_cache_map,
2520  work_unit.exe_unit.query_hint,
2521  work_unit.exe_unit.table_id_to_node_map);
2522  if (!hash_table_or_err.fail_reason.empty()) {
2523  throw std::runtime_error(hash_table_or_err.fail_reason);
2524  }
2525  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2526  partition_ptr = hash_table_or_err.hash_table;
2527  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2528  .second);
2529  VLOG(1) << "Put a generated hash table for computing window function context to "
2530  "cache (key: "
2531  << partition_cache_key << ", partition condition: " << partition_cond_str
2532  << ")";
2533  }
2534  CHECK(partition_ptr);
2535  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2536  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2537  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2538  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2539  }
2540  context = std::make_unique<WindowFunctionContext>(window_func,
2541  partition_cache_key,
2542  partition_ptr,
2543  elem_count,
2544  co.device_type,
2545  row_set_mem_owner,
2546  aggregate_tree_fanout);
2547  } else {
2548  context = std::make_unique<WindowFunctionContext>(
2549  window_func, elem_count, co.device_type, row_set_mem_owner);
2550  }
2551  const auto& order_keys = window_func->getOrderKeys();
2552  if (!order_keys.empty()) {
2553  auto sorted_partition_cache_key = partition_cache_key;
2554  for (auto& order_key : order_keys) {
2555  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2556  }
2557  for (auto& collation : window_func->getCollation()) {
2558  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2559  }
2560  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2561  auto cache_key_cnt_it =
2562  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2563  if (!cache_key_cnt_it.second) {
2564  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2565  cache_key_cnt_it.first->second + 1;
2566  }
2567 
2568  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2569  for (const auto& order_key : order_keys) {
2570  const auto order_col =
2571  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2572  if (!order_col) {
2573  throw std::runtime_error("Only order by columns supported for now");
2574  }
2575  const int8_t* column;
2576  size_t join_col_elem_count;
2577  std::tie(column, join_col_elem_count) =
2579  *order_col,
2580  query_infos.front().info.fragments.front(),
2581  memory_level,
2582  0,
2583  nullptr,
2584  /*thread_idx=*/0,
2585  chunks_owner,
2586  column_cache_map);
2587 
2588  CHECK_EQ(join_col_elem_count, elem_count);
2589  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2590  }
2591  }
2592  if (context->needsToBuildAggregateTree()) {
2593  // todo (yoonmin) : if we try to support generic window function expression without
2594  // extra project node, we need to revisit here b/c the current logic assumes that
2595  // window function expression has a single input source
2596  auto& window_function_expression_args = window_func->getArgs();
2597  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2598  for (auto& expr : window_function_expression_args) {
2599  const auto arg_col_var = std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr);
2600  CHECK(arg_col_var);
2601  const int8_t* column;
2602  size_t join_col_elem_count;
2603  std::tie(column, join_col_elem_count) =
2605  *arg_col_var,
2606  query_infos.front().info.fragments.front(),
2607  memory_level,
2608  0,
2609  nullptr,
2610  /*thread_idx=*/0,
2611  chunks_owner,
2612  column_cache_map);
2613 
2614  CHECK_EQ(join_col_elem_count, elem_count);
2615  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2616  }
2617  }
2618  return context;
2619 }
2620 
2622  const CompilationOptions& co,
2623  const ExecutionOptions& eo,
2624  RenderInfo* render_info,
2625  const int64_t queue_time_ms) {
2626  auto timer = DEBUG_TIMER(__func__);
2627  const auto work_unit =
2628  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2629  return executeWorkUnit(
2630  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2631 }
2632 
2633 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2634  std::vector<TargetMetaInfo> const& rhs) {
2635  if (lhs.size() == rhs.size()) {
2636  for (size_t i = 0; i < lhs.size(); ++i) {
2637  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2638  return false;
2639  }
2640  }
2641  return true;
2642  }
2643  return false;
2644 }
2645 
2646 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2647  return target_meta_info.get_type_info().is_geometry();
2648 }
2649 
2651  const RaExecutionSequence& seq,
2652  const CompilationOptions& co,
2653  const ExecutionOptions& eo,
2654  RenderInfo* render_info,
2655  const int64_t queue_time_ms) {
2656  auto timer = DEBUG_TIMER(__func__);
2657  if (!logical_union->isAll()) {
2658  throw std::runtime_error("UNION without ALL is not supported yet.");
2659  }
2660  // Will throw a std::runtime_error if types don't match.
2661  logical_union->checkForMatchingMetaInfoTypes();
2662  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2663  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2664  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2665  }
2666  auto work_unit =
2667  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2668  return executeWorkUnit(work_unit,
2669  logical_union->getOutputMetainfo(),
2670  false,
2672  eo,
2673  render_info,
2674  queue_time_ms);
2675 }
2676 
2678  const RelLogicalValues* logical_values,
2679  const ExecutionOptions& eo) {
2680  auto timer = DEBUG_TIMER(__func__);
2682  logical_values->getNumRows(),
2684  /*is_table_function=*/false);
2685 
2686  auto tuple_type = logical_values->getTupleType();
2687  for (size_t i = 0; i < tuple_type.size(); ++i) {
2688  auto& target_meta_info = tuple_type[i];
2689  if (target_meta_info.get_type_info().is_varlen()) {
2690  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2691  }
2692  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2693  // replace w/ bigint
2694  tuple_type[i] =
2695  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2696  }
2697  query_mem_desc.addColSlotInfo(
2698  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2699  }
2700  logical_values->setOutputMetainfo(tuple_type);
2701 
2702  std::vector<TargetInfo> target_infos;
2703  for (const auto& tuple_type_component : tuple_type) {
2704  target_infos.emplace_back(TargetInfo{false,
2705  kCOUNT,
2706  tuple_type_component.get_type_info(),
2707  SQLTypeInfo(kNULLT, false),
2708  false,
2709  false,
2710  /*is_varlen_projection=*/false});
2711  }
2712 
2713  std::shared_ptr<ResultSet> rs{
2714  ResultSetLogicalValuesBuilder{logical_values,
2715  target_infos,
2718  executor_->getRowSetMemoryOwner(),
2719  executor_}
2720  .build()};
2721 
2722  return {rs, tuple_type};
2723 }
2724 
2725 namespace {
2726 
2727 template <class T>
2728 int64_t insert_one_dict_str(T* col_data,
2729  const std::string& columnName,
2730  const SQLTypeInfo& columnType,
2731  const Analyzer::Constant* col_cv,
2732  const Catalog_Namespace::Catalog& catalog) {
2733  if (col_cv->get_is_null()) {
2734  *col_data = inline_fixed_encoding_null_val(columnType);
2735  } else {
2736  const int dict_id = columnType.get_comp_param();
2737  const auto col_datum = col_cv->get_constval();
2738  const auto& str = *col_datum.stringval;
2739  const auto dd = catalog.getMetadataForDict(dict_id);
2740  CHECK(dd && dd->stringDict);
2741  int32_t str_id = dd->stringDict->getOrAdd(str);
2742  if (!dd->dictIsTemp) {
2743  const auto checkpoint_ok = dd->stringDict->checkpoint();
2744  if (!checkpoint_ok) {
2745  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2746  columnName);
2747  }
2748  }
2749  const bool invalid = str_id > max_valid_int_value<T>();
2750  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2751  if (invalid) {
2752  LOG(ERROR) << "Could not encode string: " << str
2753  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2754  << " bits. Will store NULL instead.";
2755  }
2756  str_id = inline_fixed_encoding_null_val(columnType);
2757  }
2758  *col_data = str_id;
2759  }
2760  return *col_data;
2761 }
2762 
2763 template <class T>
2764 int64_t insert_one_dict_str(T* col_data,
2765  const ColumnDescriptor* cd,
2766  const Analyzer::Constant* col_cv,
2767  const Catalog_Namespace::Catalog& catalog) {
2768  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2769 }
2770 
2771 } // namespace
2772 
2774  const ExecutionOptions& eo) {
2775  auto timer = DEBUG_TIMER(__func__);
2776  if (eo.just_explain) {
2777  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2778  }
2779 
2780  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2783  executor_->getRowSetMemoryOwner(),
2784  executor_->getCatalog(),
2785  executor_->blockSize(),
2786  executor_->gridSize());
2787 
2788  std::vector<TargetMetaInfo> empty_targets;
2789  return {rs, empty_targets};
2790 }
2791 
2793  const Analyzer::Query& query,
2795  const Catalog_Namespace::SessionInfo& session) {
2796  // Note: We currently obtain an executor for this method, but we do not need it.
2797  // Therefore, we skip the executor state setup in the regular execution path. In the
2798  // future, we will likely want to use the executor to evaluate expressions in the insert
2799  // statement.
2800 
2801  const auto& values_lists = query.get_values_lists();
2802  const int table_id = query.get_result_table_id();
2803  const auto& col_id_list = query.get_result_col_list();
2804  size_t rows_number = values_lists.size();
2805  size_t leaf_count = inserter.getLeafCount();
2806  const auto td = cat_.getMetadataForTable(table_id);
2807  CHECK(td);
2808  size_t rows_per_leaf = rows_number;
2809  if (td->nShards == 0) {
2810  rows_per_leaf =
2811  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2812  }
2813  auto max_number_of_rows_per_package =
2814  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2815 
2816  std::vector<const ColumnDescriptor*> col_descriptors;
2817  std::vector<int> col_ids;
2818  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2819  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2820  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2821  std::unordered_map<int, int> sequential_ids;
2822 
2823  for (const int col_id : col_id_list) {
2824  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2825  const auto col_enc = cd->columnType.get_compression();
2826  if (cd->columnType.is_string()) {
2827  switch (col_enc) {
2828  case kENCODING_NONE: {
2829  auto it_ok =
2830  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2831  CHECK(it_ok.second);
2832  break;
2833  }
2834  case kENCODING_DICT: {
2835  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2836  CHECK(dd);
2837  const auto it_ok = col_buffers.emplace(
2838  col_id,
2839  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2840  max_number_of_rows_per_package));
2841  CHECK(it_ok.second);
2842  break;
2843  }
2844  default:
2845  CHECK(false);
2846  }
2847  } else if (cd->columnType.is_geometry()) {
2848  auto it_ok =
2849  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2850  CHECK(it_ok.second);
2851  } else if (cd->columnType.is_array()) {
2852  auto it_ok =
2853  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2854  CHECK(it_ok.second);
2855  } else {
2856  const auto it_ok = col_buffers.emplace(
2857  col_id,
2858  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2859  max_number_of_rows_per_package]()));
2860  CHECK(it_ok.second);
2861  }
2862  col_descriptors.push_back(cd);
2863  sequential_ids[col_id] = col_ids.size();
2864  col_ids.push_back(col_id);
2865  }
2866 
2867  // mark the target table's cached item as dirty
2868  std::vector<int> table_chunk_key_prefix{cat_.getCurrentDB().dbId, table_id};
2869  auto table_key = boost::hash_value(table_chunk_key_prefix);
2872 
2873  size_t start_row = 0;
2874  size_t rows_left = rows_number;
2875  while (rows_left != 0) {
2876  // clear the buffers
2877  for (const auto& kv : col_buffers) {
2878  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2879  }
2880  for (auto& kv : str_col_buffers) {
2881  kv.second.clear();
2882  }
2883  for (auto& kv : arr_col_buffers) {
2884  kv.second.clear();
2885  }
2886 
2887  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2888  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2889  // more efficient to do the loops below column by column instead of row by row.
2890  // But for now I consider such a refactoring not worth investigating, as we have more
2891  // efficient ways to insert many rows anyway.
2892  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2893  const auto& values_list = values_lists[row_idx + start_row];
2894  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2895  CHECK(values_list.size() == col_descriptors.size());
2896  auto col_cv =
2897  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2898  if (!col_cv) {
2899  auto col_cast =
2900  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2901  CHECK(col_cast);
2902  CHECK_EQ(kCAST, col_cast->get_optype());
2903  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2904  }
2905  CHECK(col_cv);
2906  const auto cd = col_descriptors[col_idx];
2907  auto col_datum = col_cv->get_constval();
2908  auto col_type = cd->columnType.get_type();
2909  uint8_t* col_data_bytes{nullptr};
2910  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2911  (!cd->columnType.is_string() ||
2912  cd->columnType.get_compression() == kENCODING_DICT)) {
2913  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2914  CHECK(col_data_bytes_it != col_buffers.end());
2915  col_data_bytes = col_data_bytes_it->second.get();
2916  }
2917  switch (col_type) {
2918  case kBOOLEAN: {
2919  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2920  auto null_bool_val =
2921  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2922  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2923  ? inline_fixed_encoding_null_val(cd->columnType)
2924  : (col_datum.boolval ? 1 : 0);
2925  break;
2926  }
2927  case kTINYINT: {
2928  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2929  col_data[row_idx] = col_cv->get_is_null()
2930  ? inline_fixed_encoding_null_val(cd->columnType)
2931  : col_datum.tinyintval;
2932  break;
2933  }
2934  case kSMALLINT: {
2935  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2936  col_data[row_idx] = col_cv->get_is_null()
2937  ? inline_fixed_encoding_null_val(cd->columnType)
2938  : col_datum.smallintval;
2939  break;
2940  }
2941  case kINT: {
2942  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2943  col_data[row_idx] = col_cv->get_is_null()
2944  ? inline_fixed_encoding_null_val(cd->columnType)
2945  : col_datum.intval;
2946  break;
2947  }
2948  case kBIGINT:
2949  case kDECIMAL:
2950  case kNUMERIC: {
2951  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2952  col_data[row_idx] = col_cv->get_is_null()
2953  ? inline_fixed_encoding_null_val(cd->columnType)
2954  : col_datum.bigintval;
2955  break;
2956  }
2957  case kFLOAT: {
2958  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2959  col_data[row_idx] = col_datum.floatval;
2960  break;
2961  }
2962  case kDOUBLE: {
2963  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2964  col_data[row_idx] = col_datum.doubleval;
2965  break;
2966  }
2967  case kTEXT:
2968  case kVARCHAR:
2969  case kCHAR: {
2970  switch (cd->columnType.get_compression()) {
2971  case kENCODING_NONE:
2972  str_col_buffers[col_ids[col_idx]].push_back(
2973  col_datum.stringval ? *col_datum.stringval : "");
2974  break;
2975  case kENCODING_DICT: {
2976  switch (cd->columnType.get_size()) {
2977  case 1:
2979  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
2980  cd,
2981  col_cv,
2982  cat_);
2983  break;
2984  case 2:
2986  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
2987  cd,
2988  col_cv,
2989  cat_);
2990  break;
2991  case 4:
2993  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
2994  cd,
2995  col_cv,
2996  cat_);
2997  break;
2998  default:
2999  CHECK(false);
3000  }
3001  break;
3002  }
3003  default:
3004  CHECK(false);
3005  }
3006  break;
3007  }
3008  case kTIME:
3009  case kTIMESTAMP:
3010  case kDATE: {
3011  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3012  col_data[row_idx] = col_cv->get_is_null()
3013  ? inline_fixed_encoding_null_val(cd->columnType)
3014  : col_datum.bigintval;
3015  break;
3016  }
3017  case kARRAY: {
3018  const auto is_null = col_cv->get_is_null();
3019  const auto size = cd->columnType.get_size();
3020  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3021  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3022  const auto is_point_coords =
3023  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3024  if (is_null && !is_point_coords) {
3025  if (size > 0) {
3026  int8_t* buf = (int8_t*)checked_malloc(size);
3027  put_null_array(static_cast<void*>(buf), elem_ti, "");
3028  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3029  p += elem_ti.get_size()) {
3030  put_null(static_cast<void*>(p), elem_ti, "");
3031  }
3032  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3033  } else {
3034  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3035  }
3036  break;
3037  }
3038  const auto l = col_cv->get_value_list();
3039  size_t len = l.size() * elem_ti.get_size();
3040  if (size > 0 && static_cast<size_t>(size) != len) {
3041  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3042  std::to_string(size / elem_ti.get_size()) +
3043  " values, " + "received " +
3044  std::to_string(l.size()));
3045  }
3046  if (elem_ti.is_string()) {
3047  CHECK(kENCODING_DICT == elem_ti.get_compression());
3048  CHECK(4 == elem_ti.get_size());
3049 
3050  int8_t* buf = (int8_t*)checked_malloc(len);
3051  int32_t* p = reinterpret_cast<int32_t*>(buf);
3052 
3053  int elemIndex = 0;
3054  for (auto& e : l) {
3055  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3056  CHECK(c);
3058  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
3059  elemIndex++;
3060  }
3061  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3062 
3063  } else {
3064  int8_t* buf = (int8_t*)checked_malloc(len);
3065  int8_t* p = buf;
3066  for (auto& e : l) {
3067  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3068  CHECK(c);
3069  p = append_datum(p, c->get_constval(), elem_ti);
3070  CHECK(p);
3071  }
3072  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3073  }
3074  break;
3075  }
3076  case kPOINT:
3077  case kLINESTRING:
3078  case kPOLYGON:
3079  case kMULTIPOLYGON:
3080  str_col_buffers[col_ids[col_idx]].push_back(
3081  col_datum.stringval ? *col_datum.stringval : "");
3082  break;
3083  default:
3084  CHECK(false);
3085  }
3086  }
3087  }
3088  start_row += package_size;
3089  rows_left -= package_size;
3090 
3092  insert_data.databaseId = cat_.getCurrentDB().dbId;
3093  insert_data.tableId = table_id;
3094  insert_data.data.resize(col_ids.size());
3095  insert_data.columnIds = col_ids;
3096  for (const auto& kv : col_buffers) {
3097  DataBlockPtr p;
3098  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3099  insert_data.data[sequential_ids[kv.first]] = p;
3100  }
3101  for (auto& kv : str_col_buffers) {
3102  DataBlockPtr p;
3103  p.stringsPtr = &kv.second;
3104  insert_data.data[sequential_ids[kv.first]] = p;
3105  }
3106  for (auto& kv : arr_col_buffers) {
3107  DataBlockPtr p;
3108  p.arraysPtr = &kv.second;
3109  insert_data.data[sequential_ids[kv.first]] = p;
3110  }
3111  insert_data.numRows = package_size;
3112  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
3113  inserter.insertData(session, insert_data);
3114  }
3115 
3116  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3119  executor_->getRowSetMemoryOwner(),
3120  nullptr,
3121  0,
3122  0);
3123  std::vector<TargetMetaInfo> empty_targets;
3124  return {rs, empty_targets};
3125 }
3126 
3127 namespace {
3128 
3129 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
3130  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3131  if (aggregate) {
3132  return 0;
3133  }
3134  const auto compound = dynamic_cast<const RelCompound*>(ra);
3135  return (compound && compound->isAggregate()) ? 0 : limit;
3136 }
3137 
3138 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3139  return !order_entries.empty() && order_entries.front().is_desc;
3140 }
3141 
3142 } // namespace
3143 
3145  const CompilationOptions& co,
3146  const ExecutionOptions& eo,
3147  RenderInfo* render_info,
3148  const int64_t queue_time_ms) {
3149  auto timer = DEBUG_TIMER(__func__);
3151  const auto source = sort->getInput(0);
3152  const bool is_aggregate = node_is_aggregate(source);
3153  auto it = leaf_results_.find(sort->getId());
3154  auto order_entries = get_order_entries(sort);
3155  if (it != leaf_results_.end()) {
3156  // Add any transient string literals to the sdp on the agg
3157  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3158  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3159  executor_->row_set_mem_owner_);
3160  // Handle push-down for LIMIT for multi-node
3161  auto& aggregated_result = it->second;
3162  auto& result_rows = aggregated_result.rs;
3163  const size_t limit = sort->getLimit();
3164  const size_t offset = sort->getOffset();
3165  if (limit || offset) {
3166  if (!order_entries.empty()) {
3167  result_rows->sort(order_entries, limit + offset, executor_);
3168  }
3169  result_rows->dropFirstN(offset);
3170  if (limit) {
3171  result_rows->keepFirstN(limit);
3172  }
3173  }
3174 
3175  if (render_info) {
3176  // We've hit a sort step that is the very last step
3177  // in a distributed render query. We'll fill in the render targets
3178  // since we have all that data needed to do so. This is normally
3179  // done in executeWorkUnit, but that is bypassed in this case.
3180  build_render_targets(*render_info,
3181  source_work_unit.exe_unit.target_exprs,
3182  aggregated_result.targets_meta);
3183  }
3184 
3185  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3186  sort->setOutputMetainfo(aggregated_result.targets_meta);
3187 
3188  return result;
3189  }
3190 
3191  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3192  bool is_desc{false};
3193  bool use_speculative_top_n_sort{false};
3194 
3195  auto execute_sort_query = [this,
3196  sort,
3197  &source,
3198  &is_aggregate,
3199  &eo,
3200  &co,
3201  render_info,
3202  queue_time_ms,
3203  &groupby_exprs,
3204  &is_desc,
3205  &order_entries,
3206  &use_speculative_top_n_sort]() -> ExecutionResult {
3207  const size_t limit = sort->getLimit();
3208  const size_t offset = sort->getOffset();
3209  // check whether sort's input is cached
3210  auto source_node = sort->getInput(0);
3211  CHECK(source_node);
3212  ExecutionResult source_result{nullptr, {}};
3213  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3214  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3215  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3216  !sort->isEmptyResult()) {
3217  if (auto cached_resultset =
3218  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3219  source_query_plan_dag)) {
3220  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3221  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3222  << " from resultset cache";
3223  source_result =
3224  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3225  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3226  addTemporaryTable(-source_node->getId(), cached_resultset);
3227  }
3228  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3229  co.device_type == ExecutorDeviceType::GPU;
3230  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3231  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3232  }
3233  }
3234  if (!source_result.getDataPtr()) {
3235  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3236  is_desc = first_oe_is_desc(order_entries);
3237  ExecutionOptions eo_copy = {
3239  eo.keep_result,
3240  eo.allow_multifrag,
3241  eo.just_explain,
3242  eo.allow_loop_joins,
3243  eo.with_watchdog,
3244  eo.jit_debug,
3245  eo.just_validate || sort->isEmptyResult(),
3246  eo.with_dynamic_watchdog,
3247  eo.dynamic_watchdog_time_limit,
3248  eo.find_push_down_candidates,
3249  eo.just_calcite_explain,
3250  eo.gpu_input_mem_limit_percent,
3251  eo.allow_runtime_query_interrupt,
3252  eo.running_query_interrupt_freq,
3253  eo.pending_query_interrupt_freq,
3254  eo.executor_type,
3255  };
3256 
3257  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3258  source_result = executeWorkUnit(source_work_unit,
3259  source->getOutputMetainfo(),
3260  is_aggregate,
3261  co,
3262  eo_copy,
3263  render_info,
3264  queue_time_ms);
3265  use_speculative_top_n_sort =
3266  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3267  use_speculative_top_n(source_work_unit.exe_unit,
3268  source_result.getRows()->getQueryMemDesc());
3269  }
3270  if (render_info && render_info->isPotentialInSituRender()) {
3271  return source_result;
3272  }
3273  if (source_result.isFilterPushDownEnabled()) {
3274  return source_result;
3275  }
3276  auto rows_to_sort = source_result.getRows();
3277  if (eo.just_explain) {
3278  return {rows_to_sort, {}};
3279  }
3280  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3281  !use_speculative_top_n_sort) {
3282  const size_t top_n = limit == 0 ? 0 : limit + offset;
3283  rows_to_sort->sort(order_entries, top_n, executor_);
3284  }
3285  if (limit || offset) {
3286  if (g_cluster && sort->collationCount() == 0) {
3287  if (offset >= rows_to_sort->rowCount()) {
3288  rows_to_sort->dropFirstN(offset);
3289  } else {
3290  rows_to_sort->keepFirstN(limit + offset);
3291  }
3292  } else {
3293  rows_to_sort->dropFirstN(offset);
3294  if (limit) {
3295  rows_to_sort->keepFirstN(limit);
3296  }
3297  }
3298  }
3299  return {rows_to_sort, source_result.getTargetsMeta()};
3300  };
3301 
3302  try {
3303  return execute_sort_query();
3304  } catch (const SpeculativeTopNFailed& e) {
3305  CHECK_EQ(size_t(1), groupby_exprs.size());
3306  CHECK(groupby_exprs.front());
3307  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3308  return execute_sort_query();
3309  }
3310 }
3311 
3313  const RelSort* sort,
3314  std::list<Analyzer::OrderEntry>& order_entries,
3315  const ExecutionOptions& eo) {
3316  const auto source = sort->getInput(0);
3317  const size_t limit = sort->getLimit();
3318  const size_t offset = sort->getOffset();
3319  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3320  const size_t scan_total_limit =
3321  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3322  size_t max_groups_buffer_entry_guess{
3323  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3325  SortInfo sort_info{
3326  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3327  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3328  const auto& source_exe_unit = source_work_unit.exe_unit;
3329 
3330  // we do not allow sorting geometry or array types
3331  for (auto order_entry : order_entries) {
3332  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3333  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3334  const auto& ti = get_target_info(te, false);
3335  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3336  throw std::runtime_error(
3337  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3338  }
3339  }
3340 
3341  if (source_exe_unit.groupby_exprs.size() == 1) {
3342  if (!source_exe_unit.groupby_exprs.front()) {
3343  sort_algorithm = SortAlgorithm::StreamingTopN;
3344  } else {
3345  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3346  first_oe_is_desc(order_entries))) {
3347  sort_algorithm = SortAlgorithm::Default;
3348  }
3349  }
3350  }
3351 
3352  sort->setOutputMetainfo(source->getOutputMetainfo());
3353  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3354  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3355  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3356  std::move(source_exe_unit.input_col_descs),
3357  source_exe_unit.simple_quals,
3358  source_exe_unit.quals,
3359  source_exe_unit.join_quals,
3360  source_exe_unit.groupby_exprs,
3361  source_exe_unit.target_exprs,
3362  nullptr,
3363  {sort_info.order_entries,
3364  sort_algorithm,
3365  limit,
3366  offset,
3367  sort_info.limit_delivered},
3368  scan_total_limit,
3369  source_exe_unit.query_hint,
3370  source_exe_unit.query_plan_dag_hash,
3371  source_exe_unit.hash_table_build_plan_dag,
3372  source_exe_unit.table_id_to_node_map,
3373  source_exe_unit.use_bump_allocator,
3374  source_exe_unit.union_all,
3375  source_exe_unit.query_state},
3376  source,
3377  max_groups_buffer_entry_guess,
3378  std::move(source_work_unit.query_rewriter),
3379  source_work_unit.input_permutation,
3380  source_work_unit.left_deep_join_input_sizes};
3381 }
3382 
3383 namespace {
3384 
3391 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
3392  CHECK(!table_infos.empty());
3393  const auto& first_table = table_infos.front();
3394  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3395  for (const auto& table_info : table_infos) {
3396  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3397  max_num_groups = table_info.info.getNumTuplesUpperBound();
3398  }
3399  }
3400  return std::max(max_num_groups, size_t(1));
3401 }
3402 
3403 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3404  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3405 }
3406 
3408  const RenderInfo* render_info,
3409  const RelAlgNode* body) {
3410  if (!is_projection(ra_exe_unit)) {
3411  return false;
3412  }
3413  if (render_info && render_info->isPotentialInSituRender()) {
3414  return false;
3415  }
3416  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3417  // disable output columnar when we have top-sort node query
3418  return false;
3419  }
3420  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3421  // We don't currently support varlen columnar projections, so
3422  // return false if we find one
3423  if (target_expr->get_type_info().is_varlen()) {
3424  return false;
3425  }
3426  }
3427  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3428  if (top_project->isRowwiseOutputForced()) {
3429  return false;
3430  }
3431  }
3432  return true;
3433 }
3434 
3438 }
3439 
3447  for (const auto target_expr : ra_exe_unit.target_exprs) {
3448  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3449  return false;
3450  }
3451  }
3452  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
3453  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
3454  return true;
3455  }
3456  return false;
3457 }
3458 
3459 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3460  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3461  ra_exe_unit.simple_quals.empty());
3462 }
3463 
3465  const RelAlgExecutionUnit& ra_exe_unit_in,
3466  const std::vector<InputTableInfo>& table_infos,
3467  const Executor* executor,
3468  const ExecutorDeviceType device_type_in,
3469  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3470  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3471  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3472  const auto target_expr = ra_exe_unit.target_exprs[i];
3473  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3474  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3475  continue;
3476  }
3477  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3478  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3479  CHECK(arg);
3480  const auto& arg_ti = arg->get_type_info();
3481  // Avoid calling getExpressionRange for variable length types (string and array),
3482  // it'd trigger an assertion since that API expects to be called only for types
3483  // for which the notion of range is well-defined. A bit of a kludge, but the
3484  // logic to reject these types anyway is at lower levels in the stack and not
3485  // really worth pulling into a separate function for now.
3486  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3487  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3488  continue;
3489  }
3490  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3491  if (arg_range.getType() != ExpressionRangeType::Integer) {
3492  continue;
3493  }
3494  // When running distributed, the threshold for using the precise implementation
3495  // must be consistent across all leaves, otherwise we could have a mix of precise
3496  // and approximate bitmaps and we cannot aggregate them.
3497  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3498  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3499  const auto sub_bitmap_count =
3500  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3501  int64_t approx_bitmap_sz_bits{0};
3502  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3503  if (error_rate) {
3504  CHECK(error_rate->get_type_info().get_type() == kINT);
3505  CHECK_GE(error_rate->get_constval().intval, 1);
3506  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3507  } else {
3508  approx_bitmap_sz_bits = g_hll_precision_bits;
3509  }
3510  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3511  arg_range.getIntMin(),
3512  approx_bitmap_sz_bits,
3513  true,
3514  device_type,
3515  sub_bitmap_count};
3516  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3517  arg_range.getIntMin(),
3518  bitmap_sz_bits,
3519  false,
3520  device_type,
3521  sub_bitmap_count};
3522  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3523  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3524  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3525  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3526  target_exprs_owned.push_back(precise_count_distinct);
3527  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3528  }
3529  }
3530  return ra_exe_unit;
3531 }
3532 
3533 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3534  const CompilationOptions& co,
3535  const ExecutionOptions& eo) {
3537  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3538 }
3539 
3540 } // namespace
3541 
3543  const RelAlgExecutor::WorkUnit& work_unit,
3544  const std::vector<TargetMetaInfo>& targets_meta,
3545  const bool is_agg,
3546  const CompilationOptions& co_in,
3547  const ExecutionOptions& eo_in,
3548  RenderInfo* render_info,
3549  const int64_t queue_time_ms,
3550  const std::optional<size_t> previous_count) {
3552  auto timer = DEBUG_TIMER(__func__);
3553  auto query_exec_time_begin = timer_start();
3554 
3555  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3556  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3557 
3558  auto co = co_in;
3559  auto eo = eo_in;
3560  ColumnCacheMap column_cache;
3561  ScopeGuard clearWindowContextIfNecessary = [&]() {
3562  if (is_window_execution_unit(work_unit.exe_unit)) {
3564  }
3565  };
3566  if (is_window_execution_unit(work_unit.exe_unit)) {
3568  throw std::runtime_error("Window functions support is disabled");
3569  }
3571  co.allow_lazy_fetch = false;
3572  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3573  }
3574  if (!eo.just_explain && eo.find_push_down_candidates) {
3575  // find potential candidates:
3576  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3577  if (!selected_filters.empty() || eo.just_calcite_explain) {
3578  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3579  }
3580  }
3581  if (render_info && render_info->isPotentialInSituRender()) {
3582  co.allow_lazy_fetch = false;
3583  }
3584  const auto body = work_unit.body;
3585  CHECK(body);
3586  auto it = leaf_results_.find(body->getId());
3587  VLOG(3) << "body->getId()=" << body->getId()
3588  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3589  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3590  if (it != leaf_results_.end()) {
3591  executor_->addTransientStringLiterals(work_unit.exe_unit,
3592  executor_->row_set_mem_owner_);
3593  auto& aggregated_result = it->second;
3594  auto& result_rows = aggregated_result.rs;
3595  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3596  body->setOutputMetainfo(aggregated_result.targets_meta);
3597  if (render_info) {
3598  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3599  }
3600  return result;
3601  }
3602  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3603 
3605  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3606 
3607  // register query hint if query_dag_ is valid
3608  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3609  if (query_dag_) {
3610  auto candidate = query_dag_->getQueryHint(body);
3611  if (candidate) {
3612  ra_exe_unit.query_hint = *candidate;
3613  }
3614  }
3615  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3616  if (is_window_execution_unit(ra_exe_unit)) {
3617  CHECK_EQ(table_infos.size(), size_t(1));
3618  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3619  max_groups_buffer_entry_guess =
3620  table_infos.front().info.fragments.front().getNumTuples();
3621  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3622  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3623  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3624  ra_exe_unit.scan_limit = *previous_count;
3625  } else {
3626  // TODO(adb): enable bump allocator path for render queries
3627  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3628  ra_exe_unit.scan_limit = 0;
3629  ra_exe_unit.use_bump_allocator = true;
3630  } else if (eo.executor_type == ::ExecutorType::Extern) {
3631  ra_exe_unit.scan_limit = 0;
3632  } else if (!eo.just_explain) {
3633  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3634  if (filter_count_all) {
3635  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3636  }
3637  }
3638  }
3639  }
3640 
3641  // when output_columnar_hint is true here, it means either 1) columnar output
3642  // configuration is on or 2) a user hint is given but we have to disable it if some
3643  // requirements are not satisfied
3644  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3645  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3646  VLOG(1) << "Using columnar layout for projection as output size of "
3647  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3649  eo.output_columnar_hint = true;
3650  }
3651  } else {
3652  eo.output_columnar_hint = false;
3653  }
3654 
3655  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3656  co.device_type,
3658  nullptr,
3659  executor_->getCatalog(),
3660  executor_->blockSize(),
3661  executor_->gridSize()),
3662  {}};
3663 
3664  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3665  const bool has_cardinality_estimation,
3666  const bool has_ndv_estimation) -> ExecutionResult {
3667  // Note that the groups buffer entry guess may be modified during query execution.
3668  // Create a local copy so we can track those changes if we need to attempt a retry
3669  // due to OOM
3670  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3671  try {
3672  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3673  is_agg,
3674  table_infos,
3675  ra_exe_unit,
3676  co,
3677  eo,
3678  cat_,
3679  render_info,
3680  has_cardinality_estimation,
3681  column_cache),
3682  targets_meta};
3683  } catch (const QueryExecutionError& e) {
3684  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3685  throw CardinalityEstimationRequired(/*range=*/0);
3686  }
3688  return handleOutOfMemoryRetry(
3689  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3690  targets_meta,
3691  is_agg,
3692  co,
3693  eo,
3694  render_info,
3696  queue_time_ms);
3697  }
3698  };
3699 
3700  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3701  for (const auto& table_info : table_infos) {
3702  const auto td = cat_.getMetadataForTable(table_info.table_id);
3703  if (td && (td->isTemporaryTable() || td->isView)) {
3704  use_resultset_cache = false;
3705  if (eo.keep_result) {
3706  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3707  "temporary table or view";
3708  }
3709  }
3710  }
3711 
3712  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3713  try {
3714  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3715  auto card = cached_cardinality.second;
3716  if (cached_cardinality.first && card >= 0) {
3717  result = execute_and_handle_errors(
3718  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3719  } else {
3720  result = execute_and_handle_errors(
3721  max_groups_buffer_entry_guess,
3723  /*has_ndv_estimation=*/false);
3724  }
3725  } catch (const CardinalityEstimationRequired& e) {
3726  // check the cardinality cache
3727  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3728  auto card = cached_cardinality.second;
3729  if (cached_cardinality.first && card >= 0) {
3730  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3731  } else {
3732  const auto ndv_groups_estimation =
3733  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3734  const auto estimated_groups_buffer_entry_guess =
3735  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3736  : std::min(groups_approx_upper_bound(table_infos),
3738  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3739  result = execute_and_handle_errors(
3740  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3741  if (!(eo.just_validate || eo.just_explain)) {
3742  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3743  }
3744  }
3745  }
3746 
3747  result.setQueueTime(queue_time_ms);
3748  if (render_info) {
3749  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3750  if (render_info->isPotentialInSituRender()) {
3751  // return an empty result (with the same queue time, and zero render time)
3752  return {std::make_shared<ResultSet>(
3753  queue_time_ms,
3754  0,
3755  executor_->row_set_mem_owner_
3756  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3757  : nullptr),
3758  {}};
3759  }
3760  }
3761 
3762  for (auto& target_info : result.getTargetsMeta()) {
3763  if (target_info.get_type_info().is_string() &&
3764  !target_info.get_type_info().is_dict_encoded_string()) {
3765  // currently, we do not support resultset caching if non-encoded string is projected
3766  use_resultset_cache = false;
3767  if (eo.keep_result) {
3768  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3769  "string column projection";
3770  }
3771  }
3772  }
3773 
3774  const auto res = result.getDataPtr();
3775  auto allow_auto_caching_resultset =
3776  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3777  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3778  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset) &&
3779  !work_unit.exe_unit.sort_info.limit_delivered) {
3780  auto query_exec_time = timer_stop(query_exec_time_begin);
3781  res->setExecTime(query_exec_time);
3782  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3783  res->setTargetMetaInfo(body->getOutputMetainfo());
3784  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3785  res->setInputTableKeys(std::move(input_table_keys));
3786  if (allow_auto_caching_resultset) {
3787  VLOG(1) << "Automatically keep query resultset to recycler";
3788  }
3789  res->setUseSpeculativeTopNSort(
3790  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3791  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3792  ra_exe_unit.query_plan_dag_hash,
3793  res->getInputTableKeys(),
3794  res,
3795  res->getBufferSizeBytes(co.device_type),
3797  } else {
3798  if (eo.keep_result) {
3799  if (g_cluster) {
3800  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3801  "resultset recycling on distributed mode";
3802  } else if (hasStepForUnion()) {
3803  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
3804  "operator";
3805  } else if (render_info && render_info->isPotentialInSituRender()) {
3806  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
3807  "a in-situ rendering query";
3808  } else if (is_validate_or_explain_query(eo)) {
3809  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
3810  "validate or explain query";
3811  } else {
3812  VLOG(1) << "Query hint \'keep_result\' is ignored";
3813  }
3814  }
3815  }
3816 
3817  return result;
3818 }
3819 
3820 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3821  const bool is_agg,
3822  const CompilationOptions& co,
3823  const ExecutionOptions& eo) {
3824  const auto count =
3825  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3826  kCOUNT,
3827  nullptr,
3828  false,
3829  nullptr);
3830  const auto count_all_exe_unit =
3831  work_unit.exe_unit.createCountAllExecutionUnit(count.get());
3832  size_t one{1};
3833  ResultSetPtr count_all_result;
3834  try {
3835  ColumnCacheMap column_cache;
3836  count_all_result =
3837  executor_->executeWorkUnit(one,
3838  is_agg,
3839  get_table_infos(work_unit.exe_unit, executor_),
3840  count_all_exe_unit,
3841  co,
3842  eo,
3843  cat_,
3844  nullptr,
3845  false,
3846  column_cache);
3847  } catch (const foreign_storage::ForeignStorageException& error) {
3848  throw error;
3849  } catch (const QueryMustRunOnCpu&) {
3850  // force a retry of the top level query on CPU
3851  throw;
3852  } catch (const std::exception& e) {
3853  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3854  return std::nullopt;
3855  }
3856  const auto count_row = count_all_result->getNextRow(false, false);
3857  CHECK_EQ(size_t(1), count_row.size());
3858  const auto& count_tv = count_row.front();
3859  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3860  CHECK(count_scalar_tv);
3861  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3862  CHECK(count_ptr);
3863  CHECK_GE(*count_ptr, 0);
3864  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3865  return std::max(count_upper_bound, size_t(1));
3866 }
3867 
3868 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3869  const auto& ra_exe_unit = work_unit.exe_unit;
3870  if (ra_exe_unit.input_descs.size() != 1) {
3871  return false;
3872  }
3873  const auto& table_desc = ra_exe_unit.input_descs.front();
3874  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3875  return false;
3876  }
3877  const int table_id = table_desc.getTableId();
3878  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3879  const auto comp_expr =
3880  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3881  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3882  return false;
3883  }
3884  const auto lhs = comp_expr->get_left_operand();
3885  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3886  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3887  return false;
3888  }
3889  const auto rhs = comp_expr->get_right_operand();
3890  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3891  if (!rhs_const) {
3892  return false;
3893  }
3894  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3895  if (cd->isVirtualCol) {
3896  CHECK_EQ("rowid", cd->columnName);
3897  return true;
3898  }
3899  }
3900  return false;
3901 }
3902 
3904  const RelAlgExecutor::WorkUnit& work_unit,
3905  const std::vector<TargetMetaInfo>& targets_meta,
3906  const bool is_agg,
3907  const CompilationOptions& co,
3908  const ExecutionOptions& eo,
3909  RenderInfo* render_info,
3910  const bool was_multifrag_kernel_launch,
3911  const int64_t queue_time_ms) {
3912  // Disable the bump allocator
3913  // Note that this will have basically the same affect as using the bump allocator for
3914  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3915  // path and the bump allocator path for kernel per fragment execution.
3916  auto ra_exe_unit_in = work_unit.exe_unit;
3917  ra_exe_unit_in.use_bump_allocator = false;
3918 
3919  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3920  co.device_type,
3922  nullptr,
3923  executor_->getCatalog(),
3924  executor_->blockSize(),
3925  executor_->gridSize()),
3926  {}};
3927 
3928  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3929  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3930  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3931  eo.keep_result,
3932  false,
3933  false,
3934  eo.allow_loop_joins,
3935  eo.with_watchdog,
3936  eo.jit_debug,
3937  false,
3940  false,
3941  false,
3946  eo.executor_type,
3948 
3949  if (was_multifrag_kernel_launch) {
3950  try {
3951  // Attempt to retry using the kernel per fragment path. The smaller input size
3952  // required may allow the entire kernel to execute in GPU memory.
3953  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3954  "kernels disabled.";
3955  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3956  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3957  ColumnCacheMap column_cache;
3958  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3959  is_agg,
3960  table_infos,
3961  ra_exe_unit,
3962  co,
3963  eo_no_multifrag,
3964  cat_,
3965  nullptr,
3966  true,
3967  column_cache),
3968  targets_meta};
3969  result.setQueueTime(queue_time_ms);
3970  } catch (const QueryExecutionError& e) {
3972  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3973  }
3974  }
3975 
3976  if (render_info) {
3977  render_info->setForceNonInSituData();
3978  }
3979 
3980  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3981  // Only reset the group buffer entry guess if we ran out of slots, which
3982  // suggests a
3983  // highly pathological input which prevented a good estimation of distinct tuple
3984  // count. For projection queries, this will force a per-fragment scan limit, which is
3985  // compatible with the CPU path
3986  VLOG(1) << "Resetting max groups buffer entry guess.";
3987  max_groups_buffer_entry_guess = 0;
3988 
3989  int iteration_ctr = -1;
3990  while (true) {
3991  iteration_ctr++;
3993  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3994  ColumnCacheMap column_cache;
3995  try {
3996  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3997  is_agg,
3998  table_infos,
3999  ra_exe_unit,
4000  co_cpu,
4001  eo_no_multifrag,
4002  cat_,
4003  nullptr,
4004  true,
4005  column_cache),
4006  targets_meta};
4007  } catch (const QueryExecutionError& e) {
4008  // Ran out of slots
4009  if (e.getErrorCode() < 0) {
4010  // Even the conservative guess failed; it should only happen when we group
4011  // by a huge cardinality array. Maybe we should throw an exception instead?
4012  // Such a heavy query is entirely capable of exhausting all the host memory.
4013  CHECK(max_groups_buffer_entry_guess);
4014  // Only allow two iterations of increasingly large entry guesses up to a maximum
4015  // of 512MB per column per kernel
4016  if (g_enable_watchdog || iteration_ctr > 1) {
4017  throw std::runtime_error("Query ran out of output slots in the result");
4018  }
4019  max_groups_buffer_entry_guess *= 2;
4020  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4021  "groups buffer entry "
4022  "guess equal to "
4023  << max_groups_buffer_entry_guess;
4024  } else {
4026  }
4027  continue;
4028  }
4029  result.setQueueTime(queue_time_ms);
4030  return result;
4031  }
4032  return result;
4033 }
4034 
4035 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
4036  LOG(ERROR) << "Query execution failed with error "
4037  << getErrorMessageFromCode(error_code);
4038  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
4039  // We ran out of GPU memory, this doesn't count as an error if the query is
4040  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4041  // --allow-cpu-retry.
4042  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4043  if (!g_allow_cpu_retry) {
4044  throw std::runtime_error(
4045  "Query ran out of GPU memory, unable to automatically retry on CPU");
4046  }
4047  return;
4048  }
4049  throw std::runtime_error(getErrorMessageFromCode(error_code));
4050 }
4051 
4052 namespace {
4053 struct ErrorInfo {
4054  const char* code{nullptr};
4055  const char* description{nullptr};
4056 };
4057 ErrorInfo getErrorDescription(const int32_t error_code) {
4058  // 'designated initializers' don't compile on Windows for std 17
4059  // They require /std:c++20. They been removed for the windows port.
4060  switch (error_code) {
4062  return {"ERR_DIV_BY_ZERO", "Division by zero"};
4064  return {"ERR_OUT_OF_GPU_MEM",
4065 
4066  "Query couldn't keep the entire working set of columns in GPU memory"};
4068  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
4070  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
4072  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
4074  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
4076  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
4078  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4079  "Columnar conversion not supported for variable length types"};
4081  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
4083  return {"ERR_STRING_CONST_IN_RESULTSET",
4084 
4085  "NONE ENCODED String types are not supported as input result set."};
4087  return {"ERR_OUT_OF_RENDER_MEM",
4088 
4089  "Insufficient GPU memory for query results in render output buffer "
4090  "sized by render-mem-bytes"};
4092  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4093  "Streaming-Top-N not supported in Render Query"};
4095  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4096  "Multiple distinct values encountered"};
4097  case Executor::ERR_GEOS:
4098  return {"ERR_GEOS", "ERR_GEOS"};
4100  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4101 
4102  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4103  default:
4104  return {nullptr, nullptr};
4105  }
4106 }
4107 
4108 } // namespace
4109 
4110 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
4111  if (error_code < 0) {
4112  return "Ran out of slots in the query output buffer";
4113  }
4114  const auto errorInfo = getErrorDescription(error_code);
4115 
4116  if (errorInfo.code) {
4117  return errorInfo.code + ": "s + errorInfo.description;
4118  } else {
4119  return "Other error: code "s + std::to_string(error_code);
4120  }
4121 }
4122 
4125  VLOG(1) << "Running post execution callback.";
4126  (*post_execution_callback_)();
4127  }
4128 }
4129 
4131  const SortInfo& sort_info,
4132  const ExecutionOptions& eo) {
4133  const auto compound = dynamic_cast<const RelCompound*>(node);
4134  if (compound) {
4135  return createCompoundWorkUnit(compound, sort_info, eo);
4136  }
4137  const auto project = dynamic_cast<const RelProject*>(node);
4138  if (project) {
4139  return createProjectWorkUnit(project, sort_info, eo);
4140  }
4141  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4142  if (aggregate) {
4143  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4144  }
4145  const auto filter = dynamic_cast<const RelFilter*>(node);
4146  if (filter) {
4147  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4148  }
4149  LOG(FATAL) << "Unhandled node type: "
4151  return {};
4152 }
4153 
4154 namespace {
4155 
4157  auto sink = get_data_sink(ra);
4158  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4159  return join->getJoinType();
4160  }
4161  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4162  return JoinType::INNER;
4163  }
4164 
4165  return JoinType::INVALID;
4166 }
4167 
4168 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4169  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4170  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4171  return nullptr;
4172  }
4173  const auto equi_join_condition =
4174  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4175  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4176  return nullptr;
4177  }
4178  const auto both_are_null_condition =
4179  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4180  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4181  both_are_null_condition->size() != 2) {
4182  return nullptr;
4183  }
4184  const auto lhs_is_null =
4185  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4186  const auto rhs_is_null =
4187  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4188  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4189  rhs_is_null->getOperator() != kISNULL) {
4190  return nullptr;
4191  }
4192  CHECK_EQ(size_t(1), lhs_is_null->size());
4193  CHECK_EQ(size_t(1), rhs_is_null->size());
4194  CHECK_EQ(size_t(2), equi_join_condition->size());
4195  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4196  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4197  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4198  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4199  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4200  return nullptr;
4201  }
4202  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4203  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4204  RexDeepCopyVisitor deep_copy_visitor;
4205  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4206  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4207  eq_operands.emplace_back(lhs_op_copy.release());
4208  eq_operands.emplace_back(rhs_op_copy.release());
4209  return boost::make_unique<const RexOperator>(
4210  kBW_EQ, eq_operands, equi_join_condition->getType());
4211  }
4212  return nullptr;
4213 }
4214 
4215 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4216  const RexScalar* scalar) {
4217  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4218  if (condition && condition->getOperator() == kAND) {
4219  CHECK_GE(condition->size(), size_t(2));
4220  auto acc = get_bitwise_equals(condition->getOperand(0));
4221  if (!acc) {
4222  return nullptr;
4223  }
4224  for (size_t i = 1; i < condition->size(); ++i) {
4225  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4226  and_operands.emplace_back(std::move(acc));
4227  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4228  acc =
4229  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4230  }
4231  return acc;
4232  }
4233  return get_bitwise_equals(scalar);
4234 }
4235 
4236 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4237  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4238  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4239  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4240  ++nesting_level) {
4241  if (left_deep_join->getOuterCondition(nesting_level)) {
4242  join_types[nesting_level - 1] = JoinType::LEFT;
4243  }
4244  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4245  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4246  join_types[nesting_level - 1] = cur_level_join_type;
4247  }
4248  }
4249  return join_types;
4250 }
4251 
4252 template <class RA>
4253 std::vector<size_t> do_table_reordering(
4254  std::vector<InputDescriptor>& input_descs,
4255  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4256  const JoinQualsPerNestingLevel& left_deep_join_quals,
4257  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4258  const RA* node,
4259  const std::vector<InputTableInfo>& query_infos,
4260  const Executor* executor) {
4261  if (g_cluster) {
4262  // Disable table reordering in distributed mode. The aggregator does not have enough
4263  // information to break ties
4264  return {};
4265  }
4266  const auto& cat = *executor->getCatalog();
4267  for (const auto& table_info : query_infos) {
4268  if (table_info.table_id < 0) {
4269  continue;
4270  }
4271  const auto td = cat.getMetadataForTable(table_info.table_id);
4272  CHECK(td);
4273  if (table_is_replicated(td)) {
4274  return {};
4275  }
4276  }
4277  const auto input_permutation =
4278  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4279  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4280  std::tie(input_descs, input_col_descs, std::ignore) =
4281  get_input_desc(node, input_to_nest_level, input_permutation, cat);
4282  return input_permutation;
4283 }
4284 
4286  const RelLeftDeepInnerJoin* left_deep_join) {
4287  std::vector<size_t> input_sizes;
4288  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4289  const auto inputs = get_node_output(left_deep_join->getInput(i));
4290  input_sizes.push_back(inputs.size());
4291  }
4292  return input_sizes;
4293 }
4294 
4295 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4296  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4297  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4298  for (const auto& qual : quals) {
4299  const auto rewritten_qual = rewrite_expr(qual.get());
4300  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4301  }
4302  return rewritten_quals;
4303 }
4304 
4305 } // namespace
4306 
4308  const RelCompound* compound,
4309  const SortInfo& sort_info,
4310  const ExecutionOptions& eo) {
4311  std::vector<InputDescriptor> input_descs;
4312  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4313  auto input_to_nest_level = get_input_nest_levels(compound, {});
4314  std::tie(input_descs, input_col_descs, std::ignore) =
4315  get_input_desc(compound, input_to_nest_level, {}, cat_);
4316  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4317  const auto query_infos = get_table_infos(input_descs, executor_);
4318  CHECK_EQ(size_t(1), compound->inputCount());
4319  const auto left_deep_join =
4320  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4321  JoinQualsPerNestingLevel left_deep_join_quals;
4322  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4323  : std::vector<JoinType>{get_join_type(compound)};
4324  std::vector<size_t> input_permutation;
4325  std::vector<size_t> left_deep_join_input_sizes;
4326  std::optional<unsigned> left_deep_tree_id;
4327  if (left_deep_join) {
4328  left_deep_tree_id = left_deep_join->getId();
4329  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4330  left_deep_join_quals = translateLeftDeepJoinFilter(
4331  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4333  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4334  join_types.end()) {
4335  input_permutation = do_table_reordering(input_descs,
4336  input_col_descs,
4337  left_deep_join_quals,
4338  input_to_nest_level,
4339  compound,
4340  query_infos,
4341  executor_);
4342  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4343  std::tie(input_descs, input_col_descs, std::ignore) =
4344  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
4345  left_deep_join_quals = translateLeftDeepJoinFilter(
4346  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4347  }
4348  }
4349  RelAlgTranslator translator(cat_,
4350  query_state_,
4351  executor_,
4352  input_to_nest_level,
4353  join_types,
4354  now_,
4355  eo.just_explain);
4356  const auto scalar_sources =
4357  translate_scalar_sources(compound, translator, eo.executor_type);
4358  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4359  const auto quals_cf = translate_quals(compound, translator);
4360  const auto target_exprs = translate_targets(target_exprs_owned_,
4361  scalar_sources,
4362  groupby_exprs,
4363  compound,
4364  translator,
4365  eo.executor_type);
4366 
4367  auto query_hint = RegisteredQueryHint::defaults();
4368  if (query_dag_) {
4369  auto candidate = query_dag_->getQueryHint(compound);
4370  if (candidate) {
4371  query_hint = *candidate;
4372  }
4373  }
4374  CHECK_EQ(compound->size(), target_exprs.size());
4375  const RelAlgExecutionUnit exe_unit = {input_descs,
4376  input_col_descs,
4377  quals_cf.simple_quals,
4378  rewrite_quals(quals_cf.quals),
4379  left_deep_join_quals,
4380  groupby_exprs,
4381  target_exprs,
4382  nullptr,
4383  sort_info,
4384  0,
4385  query_hint,
4386  compound->getQueryPlanDagHash(),
4387  {},
4388  {},
4389  false,
4390  std::nullopt,
4391  query_state_};
4392  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4393  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4394  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4395  compound->setOutputMetainfo(targets_meta);
4396  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4397  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4398  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4399  rewritten_exe_unit.join_quals);
4400  }
4401  if (has_valid_query_plan_dag(compound)) {
4402  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4403  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4404  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4405  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4406  }
4407  return {rewritten_exe_unit,
4408  compound,
4410  std::move(query_rewriter),
4411  input_permutation,
4412  left_deep_join_input_sizes};
4413 }
4414 
4415 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4416  const RelAlgNode* node) {
4417  auto input_to_nest_level = get_input_nest_levels(node, {});
4418  const auto left_deep_join =
4419  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4420  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4421  : std::vector<JoinType>{get_join_type(node)};
4422  return std::make_shared<RelAlgTranslator>(
4423  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
4424 }
4425 
4426 namespace {
4427 
4428 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4429  CHECK(qual_expr);
4430  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4431  if (!bin_oper || bin_oper->getOperator() != kAND) {
4432  return {qual_expr};
4433  }
4434  CHECK_GE(bin_oper->size(), size_t(2));
4435  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4436  for (size_t i = 1; i < bin_oper->size(); ++i) {
4437  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4438  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4439  }
4440  return lhs_cf;
4441 }
4442 
4443 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4444  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4445  const SQLOps sql_op) {
4446  CHECK(!factors.empty());
4447  auto acc = factors.front();
4448  for (size_t i = 1; i < factors.size(); ++i) {
4449  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4450  }
4451  return acc;
4452 }
4453 
4454 template <class QualsList>
4455 bool list_contains_expression(const QualsList& haystack,
4456  const std::shared_ptr<Analyzer::Expr>& needle) {
4457  for (const auto& qual : haystack) {
4458  if (*qual == *needle) {
4459  return true;
4460  }
4461  }
4462  return false;
4463 }
4464 
4465 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4466 // evaluations of `p` and allows use of the original form in joins if `p`
4467 // can be used for hash joins.
4468 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4469  const std::shared_ptr<Analyzer::Expr>& expr) {
4470  const auto expr_terms = qual_to_disjunctive_form(expr);
4471  CHECK_GE(expr_terms.size(), size_t(1));
4472  const auto& first_term = expr_terms.front();
4473  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4474  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4475  // First, collect the conjunctive components common to all the disjunctive components.
4476  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4477  for (const auto& first_term_factor : first_term_factors.quals) {
4478  bool is_common =
4479  expr_terms.size() > 1; // Only report common factors for disjunction.
4480  for (size_t i = 1; i < expr_terms.size(); ++i) {
4481  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4482  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4483  is_common = false;
4484  break;
4485  }
4486  }
4487  if (is_common) {
4488  common_factors.push_back(first_term_factor);
4489  }
4490  }
4491  if (common_factors.empty()) {
4492  return expr;
4493  }
4494  // Now that the common expressions are known, collect the remaining expressions.
4495  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4496  for (const auto& term : expr_terms) {
4497  const auto term_cf = qual_to_conjunctive_form(term);
4498  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4499  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4500  for (const auto& qual : term_cf.quals) {
4501  if (!list_contains_expression(common_factors, qual)) {
4502  remaining_quals.push_back(qual);
4503  }
4504  }
4505  if (!remaining_quals.empty()) {
4506  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4507  }
4508  }
4509  // Reconstruct the expression with the transformation applied.
4510  const auto common_expr = build_logical_expression(common_factors, kAND);
4511  if (remaining_terms.empty()) {
4512  return common_expr;
4513  }
4514  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4515  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4516 }
4517 
4518 } // namespace
4519 
4520 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4521  const RexScalar* join_condition,
4522  const std::vector<JoinType>& join_types,
4523  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4524  const bool just_explain) const {
4525  RelAlgTranslator translator(
4526  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4527  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4528  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4529  for (const auto rex_condition_component : rex_condition_cf) {
4530  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4531  const auto join_condition =
4533  bw_equals ? bw_equals.get() : rex_condition_component));
4534  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4535 
4536  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4537  for (const auto& cf_qual : cf_quals) {
4538  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4539  }
4540  };
4541 
4542  append_folded_cf_quals(join_condition_cf.quals);
4543  append_folded_cf_quals(join_condition_cf.simple_quals);
4544  }
4545  return combine_equi_join_conditions(join_condition_quals);
4546 }
4547 
4548 // Translate left deep join filter and separate the conjunctive form qualifiers
4549 // per nesting level. The code generated for hash table lookups on each level
4550 // must dominate its uses in deeper nesting levels.
4552  const RelLeftDeepInnerJoin* join,
4553  const std::vector<InputDescriptor>& input_descs,
4554  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4555  const bool just_explain) {
4556  const auto join_types = left_deep_join_types(join);
4557  const auto join_condition_quals = makeJoinQuals(
4558  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4559  MaxRangeTableIndexVisitor rte_idx_visitor;
4560  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4561  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4562  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4563  const auto outer_condition = join->getOuterCondition(rte_idx);
4564  if (outer_condition) {
4565  result[rte_idx - 1].quals =
4566  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4567  CHECK_LE(rte_idx, join_types.size());
4568  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4569  result[rte_idx - 1].type = JoinType::LEFT;
4570  continue;
4571  }
4572  for (const auto& qual : join_condition_quals) {
4573  if (visited_quals.count(qual)) {
4574  continue;
4575  }
4576  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4577  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4578  const auto it_ok = visited_quals.emplace(qual);
4579  CHECK(it_ok.second);
4580  result[rte_idx - 1].quals.push_back(qual);
4581  }
4582  }
4583  CHECK_LE(rte_idx, join_types.size());
4584  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4585  join_types[rte_idx - 1] == JoinType::SEMI ||
4586  join_types[rte_idx - 1] == JoinType::ANTI);
4587  result[rte_idx - 1].type = join_types[rte_idx - 1];
4588  }
4589  return result;
4590 }
4591 
4592 namespace {
4593 
4594 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4595  const RelAlgNode* ra_node,
4596  const size_t nest_level,
4597  const std::vector<TargetMetaInfo>& in_metainfo,
4598  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4599  CHECK_LE(size_t(1), ra_node->inputCount());
4600  CHECK_GE(size_t(2), ra_node->inputCount());
4601  const auto input = ra_node->getInput(nest_level);
4602  const auto it_rte_idx = input_to_nest_level.find(input);
4603  CHECK(it_rte_idx != input_to_nest_level.end());
4604  const int rte_idx = it_rte_idx->second;
4605  const int table_id = table_id_from_ra(input);
4606  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4607  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4608  int input_idx = 0;
4609  for (const auto& input_meta : in_metainfo) {
4610  inputs.push_back(
4611  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4612  table_id,
4613  scan_ra ? input_idx + 1 : input_idx,
4614  rte_idx));
4615  ++input_idx;
4616  }
4617  return inputs;
4618 }
4619 
4620 std::vector<Analyzer::Expr*> get_raw_pointers(
4621  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4622  std::vector<Analyzer::Expr*> output(input.size());
4623  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4624  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4625  return output;
4626 }
4627 
4628 } // namespace
4629 
4631  const RelAggregate* aggregate,
4632  const SortInfo& sort_info,
4633  const bool just_explain) {
4634  std::vector<InputDescriptor> input_descs;
4635  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4636  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4637  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4638  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4639  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4640  const auto join_type = get_join_type(aggregate);
4641 
4642  RelAlgTranslator translator(cat_,
4643  query_state_,
4644  executor_,
4645  input_to_nest_level,
4646  {join_type},
4647  now_,
4648  just_explain);
4649  CHECK_EQ(size_t(1), aggregate->inputCount());
4650  const auto source = aggregate->getInput(0);
4651  const auto& in_metainfo = source->getOutputMetainfo();
4652  const auto scalar_sources =
4653  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4654  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4655  const auto target_exprs = translate_targets(
4656  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
4657 
4658  const auto query_infos = get_table_infos(input_descs, executor_);
4659 
4660  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4661  aggregate->setOutputMetainfo(targets_meta);
4662  auto query_hint = RegisteredQueryHint::defaults();
4663  if (query_dag_) {
4664  auto candidate = query_dag_->getQueryHint(aggregate);
4665  if (candidate) {
4666  query_hint = *candidate;
4667  }
4668  }
4669  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4670  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4671  return {RelAlgExecutionUnit{input_descs,
4672  input_col_descs,
4673  {},
4674  {},
4675  {},
4676  groupby_exprs,
4677  target_exprs,
4678  nullptr,
4679  sort_info,
4680  0,
4681  query_hint,
4682  aggregate->getQueryPlanDagHash(),
4683  join_info.hash_table_plan_dag,
4684  join_info.table_id_to_node_map,
4685  false,
4686  std::nullopt,
4687  query_state_},
4688  aggregate,
4690  nullptr};
4691 }
4692 
4694  const RelProject* project,
4695  const SortInfo& sort_info,
4696  const ExecutionOptions& eo) {
4697  std::vector<InputDescriptor> input_descs;
4698  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4699  auto input_to_nest_level = get_input_nest_levels(project, {});
4700  std::tie(input_descs, input_col_descs, std::ignore) =
4701  get_input_desc(project, input_to_nest_level, {}, cat_);
4702  const auto query_infos = get_table_infos(input_descs, executor_);
4703 
4704  const auto left_deep_join =
4705  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4706  JoinQualsPerNestingLevel left_deep_join_quals;
4707  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4708  : std::vector<JoinType>{get_join_type(project)};
4709  std::vector<size_t> input_permutation;
4710  std::vector<size_t> left_deep_join_input_sizes;
4711  std::optional<unsigned> left_deep_tree_id;
4712  if (left_deep_join) {
4713  left_deep_tree_id = left_deep_join->getId();
4714  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4715  const auto query_infos = get_table_infos(input_descs, executor_);
4716  left_deep_join_quals = translateLeftDeepJoinFilter(
4717  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4719  input_permutation = do_table_reordering(input_descs,
4720  input_col_descs,
4721  left_deep_join_quals,
4722  input_to_nest_level,
4723  project,
4724  query_infos,
4725  executor_);
4726  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4727  std::tie(input_descs, input_col_descs, std::ignore) =
4728  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4729  left_deep_join_quals = translateLeftDeepJoinFilter(
4730  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4731  }
4732  }
4733 
4734  RelAlgTranslator translator(cat_,
4735  query_state_,
4736  executor_,
4737  input_to_nest_level,
4738  join_types,
4739  now_,
4740  eo.just_explain);
4741  const auto target_exprs_owned =
4742  translate_scalar_sources(project, translator, eo.executor_type);
4743 
4744  target_exprs_owned_.insert(
4745  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4746  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4747  auto query_hint = RegisteredQueryHint::defaults();
4748  if (query_dag_) {
4749  auto candidate = query_dag_->getQueryHint(project);
4750  if (candidate) {
4751  query_hint = *candidate;
4752  }
4753  }
4754  const RelAlgExecutionUnit exe_unit = {input_descs,
4755  input_col_descs,
4756  {},
4757  {},
4758  left_deep_join_quals,
4759  {nullptr},
4760  target_exprs,
4761  nullptr,
4762  sort_info,
4763  0,
4764  query_hint,
4765  project->getQueryPlanDagHash(),
4766  {},
4767  {},
4768  false,
4769  std::nullopt,
4770  query_state_};
4771  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4772  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4773  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4774  project->setOutputMetainfo(targets_meta);
4775  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4776  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4777  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4778  rewritten_exe_unit.join_quals);
4779  }
4780  if (has_valid_query_plan_dag(project)) {
4781  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4782  project, left_deep_tree_id, left_deep_trees_info, executor_);
4783  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4784  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4785  }
4786  return {rewritten_exe_unit,
4787  project,
4789  std::move(query_rewriter),
4790  input_permutation,
4791  left_deep_join_input_sizes};
4792 }
4793 
4794 namespace {
4795 
4796 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4797  RelAlgNode const* input_node) {
4798  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4799  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4800  const int negative_node_id = -input_node->getId();
4801  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4802  target_exprs.reserve(tmis.size());
4803  for (size_t i = 0; i < tmis.size(); ++i) {
4804  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4805  tmis[i].get_type_info(), negative_node_id, i, 0));
4806  }
4807  return target_exprs;
4808 }
4809 
4810 } // namespace
4811 
4813  const RelLogicalUnion* logical_union,
4814  const SortInfo& sort_info,
4815  const ExecutionOptions& eo) {
4816  std::vector<InputDescriptor> input_descs;
4817  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4818  // Map ra input ptr to index (0, 1).
4819  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4820  std::tie(input_descs, input_col_descs, std::ignore) =
4821  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4822  const auto query_infos = get_table_infos(input_descs, executor_);
4823  auto const max_num_tuples =
4824  std::accumulate(query_infos.cbegin(),
4825  query_infos.cend(),
4826  size_t(0),
4827  [](auto max, auto const& query_info) {
4828  return std::max(max, query_info.info.getNumTuples());
4829  });
4830 
4831  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4832  for (auto& pair : input_to_nest_level) {
4833  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4834  << pair.second << ')';
4835  }
4836 
4837  RelAlgTranslator translator(
4838  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4839 
4840  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4841  // may differ on StringDictionaries.
4842  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4843  for (unsigned i = 0; i < 2; ++i) {
4844  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4845  CHECK(!input_exprs_owned.empty())
4846  << "No metainfo found for input node(" << i << ") "
4847  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
4848  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4849  for (auto& input_expr : input_exprs_owned) {
4850  VLOG(3) << " " << input_expr->toString();
4851  }
4852  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4853  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4854  }
4855 
4856  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4857  << " input_col_descs=" << shared::printContainer(input_col_descs)
4858  << " target_exprs.size()=" << target_exprs_pair[0].size()
4859  << " max_num_tuples=" << max_num_tuples;
4860 
4861  const RelAlgExecutionUnit exe_unit = {input_descs,
4862  input_col_descs,
4863  {}, // quals_cf.simple_quals,
4864  {}, // rewrite_quals(quals_cf.quals),
4865  {},
4866  {nullptr},
4867  target_exprs_pair[0],
4868  nullptr,
4869  sort_info,
4870  max_num_tuples,
4873  {},
4874  {},
4875  false,
4876  logical_union->isAll(),
4877  query_state_,
4878  target_exprs_pair[1]};
4879  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4880  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4881 
4882  RelAlgNode const* input0 = logical_union->getInput(0);
4883  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4884  logical_union->setOutputMetainfo(
4885  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4886  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4887  logical_union->setOutputMetainfo(
4888  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4889  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4890  logical_union->setOutputMetainfo(
4891  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4892  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4893  logical_union->setOutputMetainfo(
4894  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4895  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4896  logical_union->setOutputMetainfo(
4897  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4898  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4899  logical_union->setOutputMetainfo(
4900  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4901  } else if (dynamic_cast<const RelSort*>(input0)) {
4902  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4903  } else {
4904  throw QueryNotSupported("Unsupported input type: " +
4906  }
4907  VLOG(3) << "logical_union->getOutputMetainfo()="
4908  << shared::printContainer(logical_union->getOutputMetainfo())
4909  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4910  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4911 
4912  return {rewritten_exe_unit,
4913  logical_union,
4915  std::move(query_rewriter)};
4916 }
4917 
4919  const RelTableFunction* rel_table_func,
4920  const bool just_explain,
4921  const bool is_gpu) {
4922  std::vector<InputDescriptor> input_descs;
4923  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4924  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4925  std::tie(input_descs, input_col_descs, std::ignore) =
4926  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4927  const auto query_infos = get_table_infos(input_descs, executor_);
4928  RelAlgTranslator translator(
4929  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4930  const auto input_exprs_owned = translate_scalar_sources(
4931  rel_table_func, translator, ::ExecutorType::TableFunctions);
4932  target_exprs_owned_.insert(
4933  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4934  auto input_exprs = get_raw_pointers(input_exprs_owned);
4935 
4936  const auto table_function_impl_and_type_infos = [=]() {
4937  if (is_gpu) {
4938  try {
4939  return bind_table_function(
4940  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4941  } catch (ExtensionFunctionBindingError& e) {
4942  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4943  << " Redirecting " << rel_table_func->getFunctionName()
4944  << " step to run on CPU.";
4945  throw QueryMustRunOnCpu();
4946  }
4947  } else {
4948  try {
4949  return bind_table_function(
4950  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4951  } catch (ExtensionFunctionBindingError& e) {
4952  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4953  throw;
4954  }
4955  }
4956  }();
4957  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4958  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4959 
4960  size_t output_row_sizing_param = 0;
4961  if (table_function_impl
4962  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
4963  const auto parameter_index =
4964  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4965  CHECK_GT(parameter_index, size_t(0));
4966  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4967  const auto parameter_expr =
4968  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4969  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4970  if (!parameter_expr_literal) {
4971  throw std::runtime_error(
4972  "Provided output buffer sizing parameter is not a literal. Only literal "
4973  "values are supported with output buffer sizing configured table "
4974  "functions.");
4975  }
4976  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4977  if (literal_val < 0) {
4978  throw std::runtime_error("Provided output sizing parameter " +
4979  std::to_string(literal_val) +
4980  " must be positive integer.");
4981  }
4982  output_row_sizing_param = static_cast<size_t>(literal_val);
4983  } else {
4984  // RowMultiplier not specified in the SQL query. Set it to 1
4985  output_row_sizing_param = 1; // default value for RowMultiplier
4986  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
4987  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4988  makeExpr<Analyzer::Constant>(kINT, false, d);
4989  // Push the constant 1 to input_exprs
4990  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4991  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4992  }
4993  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4994  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4995  } else {
4996  UNREACHABLE();
4997  }
4998 
4999  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5000  size_t input_index = 0;
5001  size_t arg_index = 0;
5002  const auto table_func_args = table_function_impl.getInputArgs();
5003  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5004  for (const auto& ti : table_function_type_infos) {
5005  if (ti.is_column_list()) {
5006  for (int i = 0; i < ti.get_dimension(); i++) {
5007  auto& input_expr = input_exprs[input_index];
5008  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5009  CHECK(col_var);
5010 
5011  // avoid setting type info to ti here since ti doesn't have all the
5012  // properties correctly set
5013  auto type_info = input_expr->get_type_info();
5014  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5015  type_info.set_type(ti.get_type()); // set type to column list
5016  type_info.set_dimension(ti.get_dimension());
5017  input_expr->set_type_info(type_info);
5018 
5019  input_col_exprs.push_back(col_var);
5020  input_index++;
5021  }
5022  } else if (ti.is_column()) {
5023  auto& input_expr = input_exprs[input_index];
5024  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5025  CHECK(col_var);
5026 
5027  // same here! avoid setting type info to ti since it doesn't have all the
5028  // properties correctly set
5029  auto type_info = input_expr->get_type_info();
5030  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5031  type_info.set_type(ti.get_type()); // set type to column
5032  input_expr->set_type_info(type_info);
5033 
5034  input_col_exprs.push_back(col_var);
5035  input_index++;
5036  } else {
5037  auto input_expr = input_exprs[input_index];
5038  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5039  if (ext_func_arg_ti != input_expr->get_type_info()) {
5040  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
5041  }
5042  input_index++;
5043  }
5044  arg_index++;
5045  }
5046  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5047  std::vector<Analyzer::Expr*> table_func_outputs;
5048  constexpr int32_t transient_pos{-1};
5049  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5050  auto ti = table_function_impl.getOutputSQLType(i);
5051  if (ti.is_dict_encoded_string()) {
5052  auto p = table_function_impl.getInputID(i);
5053 
5054  int32_t input_pos = p.first;
5055  if (input_pos == transient_pos) {
5056  ti.set_comp_param(TRANSIENT_DICT_ID);
5057  } else {
5058  // Iterate over the list of arguments to compute the offset. Use this offset to
5059  // get the corresponding input
5060  int32_t offset = 0;
5061  for (int j = 0; j < input_pos; j++) {
5062  const auto ti = table_function_type_infos[j];
5063  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5064  }
5065  input_pos = offset + p.second;
5066 
5067  CHECK_LT(input_pos, input_exprs.size());
5068  int32_t comp_param =
5069  input_exprs_owned[input_pos]->get_type_info().get_comp_param();
5070  ti.set_comp_param(comp_param);
5071  }
5072  }
5073  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
5074  table_func_outputs.push_back(target_exprs_owned_.back().get());
5075  }
5076  const TableFunctionExecutionUnit exe_unit = {
5077  input_descs,
5078  input_col_descs,
5079  input_exprs, // table function inputs
5080  input_col_exprs, // table function column inputs (duplicates w/ above)
5081  table_func_outputs, // table function projected exprs
5082  output_row_sizing_param, // output buffer sizing param
5083  table_function_impl};
5084  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5085  rel_table_func->setOutputMetainfo(targets_meta);
5086  return {exe_unit, rel_table_func};
5087 }
5088 
5089 namespace {
5090 
5091 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5093  const RelAlgTranslator& translator,
5094  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5095  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5096  std::vector<TargetMetaInfo> in_metainfo;
5097  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5098  const auto data_sink_node = get_data_sink(filter);
5099  auto input_it = inputs_owned.begin();
5100  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5101  const auto source = data_sink_node->getInput(nest_level);
5102  const auto scan_source = dynamic_cast<const RelScan*>(source);
5103  if (scan_source) {
5104  CHECK(source->getOutputMetainfo().empty());
5105  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5106  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5107  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
5108  }
5109  const auto source_metadata =
5110  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
5111  in_metainfo.insert(
5112  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5113  exprs_owned.insert(
5114  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5115  } else {
5116  const auto& source_metadata = source->getOutputMetainfo();
5117  input_it += source_metadata.size();
5118  in_metainfo.insert(
5119  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5120  const auto scalar_sources_owned = synthesize_inputs(
5121  data_sink_node, nest_level, source_metadata, input_to_nest_level);
5122  exprs_owned.insert(
5123  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5124  }
5125  }
5126  return std::make_pair(in_metainfo, exprs_owned);
5127 }
5128 
5129 } // namespace
5130 
5132  const SortInfo& sort_info,
5133  const bool just_explain) {
5134  CHECK_EQ(size_t(1), filter->inputCount());
5135  std::vector<InputDescriptor> input_descs;
5136  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5137  std::vector<TargetMetaInfo> in_metainfo;
5138  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5139  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5140 
5141  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5142  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5143  get_input_desc(filter, input_to_nest_level, {}, cat_);
5144  const auto join_type = get_join_type(filter);
5145  RelAlgTranslator translator(cat_,
5146  query_state_,
5147  executor_,
5148  input_to_nest_level,
5149  {join_type},
5150  now_,
5151  just_explain);
5152  std::tie(in_metainfo, target_exprs_owned) =
5153  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5154  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
5155  const auto query_infos = get_table_infos(input_descs, executor_);
5156 
5157  const auto qual = fold_expr(filter_expr.get());
5158  target_exprs_owned_.insert(
5159  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5160 
5161  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5162  filter->setOutputMetainfo(in_metainfo);
5163  const auto rewritten_qual = rewrite_expr(qual.get());
5164  auto query_hint = RegisteredQueryHint::defaults();
5165  if (query_dag_) {
5166  auto candidate = query_dag_->getQueryHint(filter);
5167  if (candidate) {
5168  query_hint = *candidate;
5169  }
5170  }
5171  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5172  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5173  return {{input_descs,
5174  input_col_descs,
5175  {},
5176  {rewritten_qual ? rewritten_qual : qual},
5177  {},
5178  {nullptr},
5179  target_exprs,
5180  nullptr,
5181  sort_info,
5182  0,
5183  query_hint,
5184  filter->getQueryPlanDagHash(),
5185  join_info.hash_table_plan_dag,
5186  join_info.table_id_to_node_map},
5187  filter,
5189  nullptr};
5190 }
5191 
5193 
5195  if (auto foreign_storage_mgr =
5197  // Parallelism hints need to be reset to empty so that we don't accidentally re-use
5198  // them. This can cause attempts to fetch strings that do not shard to the correct
5199  // node in distributed mode.
5200  foreign_storage_mgr->setParallelismHints({});
5201  }
5202 }
5203 
5205  CHECK(executor_);
5206  const auto phys_inputs = get_physical_inputs(cat_, ra);
5207  const auto phys_table_ids = get_physical_table_inputs(ra);
5208  executor_->setCatalog(&cat_);
5209  executor_->setupCaching(phys_inputs, phys_table_ids);
5210 }
5211 
5213  const auto& ra = query_dag_->getRootNode();
5215 }
5216 
5217 std::unordered_set<int> RelAlgExecutor::getPhysicalTableIds() const {
5219 }
5220 
5223 }
const size_t getGroupByCount() const
Definition: RelAlgDag.h:1195
bool isAll() const
Definition: RelAlgDag.h:2251
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
Definition: RelAlgDag.h:1776
static void invalidateCachesByTable(size_t table_key)
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:18