OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
21 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RelAlgDag.h"
38 #include "QueryEngine/RexVisitor.h"
42 #include "Shared/measure.h"
43 #include "Shared/misc.h"
44 #include "Shared/shard_key.h"
45 
46 #include <boost/algorithm/cxx11/any_of.hpp>
47 #include <boost/range/adaptor/reversed.hpp>
48 
49 #include <algorithm>
50 #include <functional>
51 #include <numeric>
52 
54 bool g_enable_interop{false};
55 bool g_enable_union{true}; // DEPRECATED
59 
60 extern bool g_enable_watchdog;
62 extern bool g_enable_bump_allocator;
64 extern bool g_enable_system_tables;
65 
66 namespace {
67 
68 bool node_is_aggregate(const RelAlgNode* ra) {
69  const auto compound = dynamic_cast<const RelCompound*>(ra);
70  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
71  return ((compound && compound->isAggregate()) || aggregate);
72 }
73 
74 std::unordered_set<PhysicalInput> get_physical_inputs_with_spi_col_id(
75  const RelAlgNode* ra) {
76  const auto phys_inputs = get_physical_inputs(ra);
77  std::unordered_set<PhysicalInput> phys_inputs2;
78  for (auto& phi : phys_inputs) {
79  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(phi.db_id);
80  CHECK(catalog);
81  phys_inputs2.insert(PhysicalInput{
82  catalog->getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id, phi.db_id});
83  }
84  return phys_inputs2;
85 }
86 
87 void set_parallelism_hints(const RelAlgNode& ra_node) {
88  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
89  parallelism_hints_per_table;
90  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
91  const auto table_id = physical_input.table_id;
92  const auto catalog =
94  CHECK(catalog);
95  const auto table = catalog->getMetadataForTable(table_id, true);
96  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
97  !table->is_in_memory_system_table) {
98  const auto col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
99  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
100  const auto foreign_table = catalog->getForeignTable(table_id);
101  for (const auto& fragment :
102  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
103  Chunk_NS::Chunk chunk{col_desc};
104  ChunkKey chunk_key = {
105  physical_input.db_id, table_id, col_id, fragment.fragmentId};
106 
107  // Parallelism hints should not include fragments that are not mapped to the
108  // current node, otherwise we will try to prefetch them and run into trouble.
110  continue;
111  }
112 
113  // do not include chunk hints that are in CPU memory
114  if (!chunk.isChunkOnDevice(
115  &catalog->getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
116  parallelism_hints_per_table[{physical_input.db_id, table_id}].insert(
118  fragment.fragmentId});
119  }
120  }
121  }
122  }
123  if (!parallelism_hints_per_table.empty()) {
124  auto foreign_storage_mgr = Catalog_Namespace::SysCatalog::instance()
125  .getDataMgr()
128  CHECK(foreign_storage_mgr);
129  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
130  }
131 }
132 
134  for (const auto [col_id, table_id, db_id] : get_physical_inputs(&ra_node)) {
135  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
136  CHECK(catalog);
137  const auto table = catalog->getMetadataForTable(table_id, false);
138  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
139  const auto spi_col_id = catalog->getColumnIdBySpi(table_id, col_id);
140  foreign_storage::populate_string_dictionary(table_id, spi_col_id, db_id);
141  }
142  }
143 }
144 
146  // Iterate through ra_node inputs for types that need to be loaded pre-execution
147  // If they do not have valid metadata, load them into CPU memory to generate
148  // the metadata and leave them ready to be used by the query
149  set_parallelism_hints(ra_node);
151 }
152 
154  const CompilationOptions& co) {
156  const auto info_schema_catalog =
158  CHECK(info_schema_catalog);
159  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
160  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
161  if (info_schema_catalog->getDatabaseId() != physical_input.db_id) {
162  continue;
163  }
164  const auto table_id = physical_input.table_id;
165  const auto table = info_schema_catalog->getMetadataForTable(table_id, false);
166  CHECK(table);
167  if (table->is_in_memory_system_table) {
168  const auto column_id =
169  info_schema_catalog->getColumnIdBySpi(table_id, physical_input.col_id);
170  system_table_columns_by_table_id[table_id].emplace_back(column_id);
171  }
172  }
173  // Execute on CPU for queries involving system tables
174  if (!system_table_columns_by_table_id.empty() &&
176  throw QueryMustRunOnCpu();
177  }
178 
179  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
180  // Clear any previously cached data, since system tables depend on point in
181  // time data snapshots.
182  info_schema_catalog->getDataMgr().deleteChunksWithPrefix(
183  ChunkKey{info_schema_catalog->getDatabaseId(), table_id},
185 
186  // TODO(Misiu): This prefetching can be removed if we can add support for
187  // ExpressionRanges to reduce invalid with valid ranges (right now prefetching
188  // causes us to fetch the chunks twice). Right now if we do not prefetch (i.e. if
189  // we remove the code below) some nodes will return valid ranges and others will
190  // return unknown because they only use placeholder metadata and the LeafAggregator
191  // has no idea how to reduce the two.
192  const auto td = info_schema_catalog->getMetadataForTable(table_id);
193  CHECK(td);
194  CHECK(td->fragmenter);
195  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
196  CHECK_LE(fragment_count, static_cast<size_t>(1))
197  << "In-memory system tables are expected to have a single fragment.";
198  if (fragment_count > 0) {
199  for (auto column_id : column_ids) {
200  // Prefetch system table chunks in order to force chunk statistics metadata
201  // computation.
202  const auto cd = info_schema_catalog->getMetadataForColumn(table_id, column_id);
203  const ChunkKey chunk_key{
204  info_schema_catalog->getDatabaseId(), table_id, column_id, 0};
206  &(info_schema_catalog->getDataMgr()),
207  chunk_key,
209  0,
210  0,
211  0);
212  }
213  }
214  }
215  }
216 }
217 
220 }
221 
222 // TODO(alex): Once we're fully migrated to the relational algebra model, change
223 // the executor interface to use the collation directly and remove this conversion.
224 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
225  std::list<Analyzer::OrderEntry> result;
226  for (size_t i = 0; i < sort->collationCount(); ++i) {
227  const auto sort_field = sort->getCollation(i);
228  result.emplace_back(sort_field.getField() + 1,
229  sort_field.getSortDir() == SortDirection::Descending,
230  sort_field.getNullsPosition() == NullSortedPosition::First);
231  }
232  return result;
233 }
234 
236  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
237  const std::vector<TargetMetaInfo>& targets_meta) {
238  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
239  render_info.targets.clear();
240  for (size_t i = 0; i < targets_meta.size(); ++i) {
241  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
242  targets_meta[i].get_resname(),
243  work_unit_target_exprs[i]->get_shared_ptr(),
244  false));
245  }
246 }
247 
249  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
250 }
251 
252 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
253  public:
254  std::vector<unsigned> visitLeftDeepInnerJoin(
255  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
256  return {left_deep_join_tree->getId()};
257  }
258 
259  protected:
260  std::vector<unsigned> aggregateResult(
261  const std::vector<unsigned>& aggregate,
262  const std::vector<unsigned>& next_result) const override {
263  auto result = aggregate;
264  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
265  return result;
266  }
267 };
268 
272  TextEncodingCastCounts() : text_decoding_casts(0UL), text_encoding_casts(0UL) {}
273  TextEncodingCastCounts(const size_t text_decoding_casts,
274  const size_t text_encoding_casts)
275  : text_decoding_casts(text_decoding_casts)
276  , text_encoding_casts(text_encoding_casts) {}
277 };
278 
279 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
280  public:
281  TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
282  : default_disregard_casts_to_none_encoding_(
283  default_disregard_casts_to_none_encoding) {}
284 
285  protected:
286  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
287  TextEncodingCastCounts result = defaultResult();
288  // Save state of input disregard_casts_to_none_encoding_ as child node traversal
289  // will reset it
290  const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
291  result = aggregateResult(result, visit(u_oper->get_operand()));
292  if (u_oper->get_optype() != kCAST) {
293  return result;
294  }
295  const auto& operand_ti = u_oper->get_operand()->get_type_info();
296  const auto& casted_ti = u_oper->get_type_info();
297  if (!operand_ti.is_string() && casted_ti.is_dict_encoded_string()) {
298  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
299  }
300  if (!casted_ti.is_string()) {
301  return result;
302  }
303  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
304  if (literals_only) {
305  return result;
306  }
307  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
308  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
309  }
310  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
311  if (!disregard_casts_to_none_encoding) {
312  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
313  } else {
314  return result;
315  }
316  }
317  return result;
318  }
319 
321  const Analyzer::StringOper* string_oper) const override {
322  TextEncodingCastCounts result = defaultResult();
323  if (string_oper->getArity() > 0) {
324  result = aggregateResult(result, visit(string_oper->getArg(0)));
325  }
326  if (string_op_returns_string(string_oper->get_kind()) &&
327  string_oper->hasNoneEncodedTextArg()) {
328  result = aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
329  }
330  return result;
331  }
332 
333  TextEncodingCastCounts visitBinOper(const Analyzer::BinOper* bin_oper) const override {
334  TextEncodingCastCounts result = defaultResult();
335  // Currently the join framework handles casts between string types, and
336  // casts to none-encoded strings should be considered spurious, except
337  // when the join predicate is not a =/<> operator, in which case
338  // for both join predicates and all other instances we have to decode
339  // to a none-encoded string to do the comparison. The logic below essentially
340  // overrides the logic such as to always count none-encoded casts on strings
341  // that are children of binary operators other than =/<>
342  if (bin_oper->get_optype() != kEQ && bin_oper->get_optype() != kNE) {
343  // Override the global override so that join opers don't skip
344  // the check when there is an actual cast to none-encoded string
345  const auto prev_disregard_casts_to_none_encoding_state =
346  disregard_casts_to_none_encoding_;
347  const auto left_u_oper =
348  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
349  if (left_u_oper && left_u_oper->get_optype() == kCAST) {
350  disregard_casts_to_none_encoding_ = false;
351  result = aggregateResult(result, visitUOper(left_u_oper));
352  } else {
353  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
354  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
355  }
356 
357  const auto right_u_oper =
358  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
359  if (right_u_oper && right_u_oper->get_optype() == kCAST) {
360  disregard_casts_to_none_encoding_ = false;
361  result = aggregateResult(result, visitUOper(right_u_oper));
362  } else {
363  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
364  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
365  }
366  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
367  } else {
368  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
369  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
370  }
371  return result;
372  }
373 
375  TextEncodingCastCounts result = defaultResult();
376  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
377  const auto prev_disregard_casts_to_none_encoding_state =
378  disregard_casts_to_none_encoding_;
379  if (u_oper && u_oper->get_optype() == kCAST) {
380  disregard_casts_to_none_encoding_ = true;
381  result = aggregateResult(result, visitUOper(u_oper));
382  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
383  } else {
384  result = aggregateResult(result, visit(like->get_arg()));
385  }
386  result = aggregateResult(result, visit(like->get_like_expr()));
387  if (like->get_escape_expr()) {
388  result = aggregateResult(result, visit(like->get_escape_expr()));
389  }
390  return result;
391  }
392 
394  const TextEncodingCastCounts& aggregate,
395  const TextEncodingCastCounts& next_result) const override {
396  auto result = aggregate;
397  result.text_decoding_casts += next_result.text_decoding_casts;
398  result.text_encoding_casts += next_result.text_encoding_casts;
399  return result;
400  }
401 
402  void visitBegin() const override {
403  disregard_casts_to_none_encoding_ = default_disregard_casts_to_none_encoding_;
404  }
405 
407  return TextEncodingCastCounts();
408  }
409 
410  private:
411  mutable bool disregard_casts_to_none_encoding_ = false;
413 };
414 
416  TextEncodingCastCounts cast_counts;
417 
418  auto check_node_for_text_casts = [&cast_counts](
419  const Analyzer::Expr* expr,
420  const bool disregard_casts_to_none_encoding) {
421  if (!expr) {
422  return;
423  }
424  TextEncodingCastCountVisitor visitor(disregard_casts_to_none_encoding);
425  const auto this_node_cast_counts = visitor.visit(expr);
426  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
427  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
428  };
429 
430  for (const auto& qual : ra_exe_unit.quals) {
431  check_node_for_text_casts(qual.get(), false);
432  }
433  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
434  check_node_for_text_casts(simple_qual.get(), false);
435  }
436  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
437  check_node_for_text_casts(groupby_expr.get(), false);
438  }
439  for (const auto& target_expr : ra_exe_unit.target_exprs) {
440  check_node_for_text_casts(target_expr, false);
441  }
442  for (const auto& join_condition : ra_exe_unit.join_quals) {
443  for (const auto& join_qual : join_condition.quals) {
444  // We currently need to not count casts to none-encoded strings for join quals,
445  // as analyzer will generate these but our join framework disregards them.
446  // Some investigation was done on having analyzer issue the correct inter-string
447  // dictionary casts, but this actually causes them to get executed and so the same
448  // work gets done twice.
449  check_node_for_text_casts(join_qual.get(),
450  true /* disregard_casts_to_none_encoding */);
451  }
452  }
453  return cast_counts;
454 }
455 
457  const std::vector<InputTableInfo>& query_infos,
458  const RelAlgExecutionUnit& ra_exe_unit) {
459  if (!g_enable_watchdog) {
460  return;
461  }
462  auto const tuples_upper_bound =
463  std::accumulate(query_infos.cbegin(),
464  query_infos.cend(),
465  size_t(0),
466  [](auto max, auto const& query_info) {
467  return std::max(max, query_info.info.getNumTuples());
468  });
469  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
470  return;
471  }
472 
473  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
474  const bool has_text_casts =
475  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
476 
477  if (!has_text_casts) {
478  return;
479  }
480  std::ostringstream oss;
481  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
482  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
483  << "exceeds the configured watchdog none-encoded string translation limit of "
485  throw std::runtime_error(oss.str());
486 }
487 
488 } // namespace
489 
491  RenderInfo* render_info) const {
492  auto validate_or_explain_query = is_validate_or_explain_query(eo);
493  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
495  !validate_or_explain_query && !hasStepForUnion() &&
496  !query_for_partial_outer_frag &&
497  (!render_info || (render_info && !render_info->isInSitu()));
498 }
499 
501  const ExecutionOptions& eo) {
502  if (eo.find_push_down_candidates) {
503  return 0;
504  }
505 
506  if (eo.just_explain) {
507  return 0;
508  }
509 
510  CHECK(query_dag_);
511 
512  query_dag_->resetQueryExecutionState();
513  const auto& ra = query_dag_->getRootNode();
514 
515  auto lock = executor_->acquireExecuteMutex();
516  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
517  setupCaching(&ra);
518 
519  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
520  auto ed_seq = RaExecutionSequence(&ra, executor_);
521 
522  if (!getSubqueries().empty()) {
523  return 0;
524  }
525 
526  CHECK(!ed_seq.empty());
527  if (ed_seq.size() > 1) {
528  return 0;
529  }
530 
533  executor_->temporary_tables_ = &temporary_tables_;
534 
535  auto exec_desc_ptr = ed_seq.getDescriptor(0);
536  CHECK(exec_desc_ptr);
537  auto& exec_desc = *exec_desc_ptr;
538  const auto body = exec_desc.getBody();
539  if (body->isNop()) {
540  return 0;
541  }
542 
543  const auto project = dynamic_cast<const RelProject*>(body);
544  if (project) {
545  auto work_unit =
546  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
547  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
548  executor_);
549  }
550 
551  const auto compound = dynamic_cast<const RelCompound*>(body);
552  if (compound) {
553  if (compound->isDeleteViaSelect()) {
554  return 0;
555  } else if (compound->isUpdateViaSelect()) {
556  return 0;
557  } else {
558  if (compound->isAggregate()) {
559  return 0;
560  }
561 
562  const auto work_unit =
563  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
564 
565  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
566  executor_);
567  }
568  }
569 
570  return 0;
571 }
572 
574  const ExecutionOptions& eo,
575  const bool just_explain_plan,
576  RenderInfo* render_info) {
577  CHECK(query_dag_);
579  << static_cast<int>(query_dag_->getBuildState());
580 
581  auto timer = DEBUG_TIMER(__func__);
583 
584  auto run_query = [&](const CompilationOptions& co_in) {
585  auto execution_result =
586  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
587 
588  constexpr bool vlog_result_set_summary{false};
589  if constexpr (vlog_result_set_summary) {
590  VLOG(1) << execution_result.getRows()->summaryToString();
591  }
592 
594  VLOG(1) << "Running post execution callback.";
595  (*post_execution_callback_)();
596  }
597  return execution_result;
598  };
599 
600  try {
601  return run_query(co);
602  } catch (const QueryMustRunOnCpu&) {
603  if (!g_allow_cpu_retry) {
604  throw;
605  }
606  }
607  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
608  auto co_cpu = CompilationOptions::makeCpuOnly(co);
609 
610  if (render_info) {
611  render_info->forceNonInSitu();
612  }
613  return run_query(co_cpu);
614 }
615 
617  const ExecutionOptions& eo,
618  const bool just_explain_plan,
619  RenderInfo* render_info) {
621  auto timer = DEBUG_TIMER(__func__);
622  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
623 
624  query_dag_->resetQueryExecutionState();
625  const auto& ra = query_dag_->getRootNode();
626 
627  // capture the lock acquistion time
628  auto clock_begin = timer_start();
630  executor_->resetInterrupt();
631  }
632  std::string query_session{""};
633  std::string query_str{"N/A"};
634  std::string query_submitted_time{""};
635  // gather necessary query's info
636  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
637  query_session = query_state_->getConstSessionInfo()->get_session_id();
638  query_str = query_state_->getQueryStr();
639  query_submitted_time = query_state_->getQuerySubmittedTime();
640  }
641 
642  auto validate_or_explain_query =
643  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
644  auto interruptable = !render_info && !query_session.empty() &&
645  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
646  if (interruptable) {
647  // if we reach here, the current query which was waiting an idle executor
648  // within the dispatch queue is now scheduled to the specific executor
649  // (not UNITARY_EXECUTOR)
650  // so we update the query session's status with the executor that takes this query
651  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
652  query_session, query_str, query_submitted_time);
653 
654  // now the query is going to be executed, so update the status as
655  // "RUNNING_QUERY_KERNEL"
656  executor_->updateQuerySessionStatus(
657  query_session,
658  query_submitted_time,
659  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
660  }
661 
662  // so it should do cleanup session info after finishing its execution
663  ScopeGuard clearQuerySessionInfo =
664  [this, &query_session, &interruptable, &query_submitted_time] {
665  // reset the runtime query interrupt status after the end of query execution
666  if (interruptable) {
667  // cleanup running session's info
668  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
669  }
670  };
671 
672  auto acquire_execute_mutex = [](Executor * executor) -> auto {
673  auto ret = executor->acquireExecuteMutex();
674  return ret;
675  };
676  // now we acquire executor lock in here to make sure that this executor holds
677  // all necessary resources and at the same time protect them against other executor
678  auto lock = acquire_execute_mutex(executor_);
679 
680  if (interruptable) {
681  // check whether this query session is "already" interrupted
682  // this case occurs when there is very short gap between being interrupted and
683  // taking the execute lock
684  // if so we have to remove "all" queries initiated by this session and we do in here
685  // without running the query
686  try {
687  executor_->checkPendingQueryStatus(query_session);
688  } catch (QueryExecutionError& e) {
690  throw std::runtime_error("Query execution has been interrupted (pending query)");
691  }
692  throw e;
693  } catch (...) {
694  throw std::runtime_error("Checking pending query status failed: unknown error");
695  }
696  }
697  int64_t queue_time_ms = timer_stop(clock_begin);
698 
700 
701  // Notify foreign tables to load prior to caching
703 
704  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
705  setupCaching(&ra);
706 
707  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
708  auto ed_seq = RaExecutionSequence(&ra, executor_);
709 
710  if (just_explain_plan) {
711  std::stringstream ss;
712  std::vector<const RelAlgNode*> nodes;
713  for (size_t i = 0; i < ed_seq.size(); i++) {
714  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
715  }
716  size_t ctr_node_id_in_plan = nodes.size();
717  for (auto& body : boost::adaptors::reverse(nodes)) {
718  // we set each node's id in the query plan in advance before calling toString
719  // method to properly represent the query plan
720  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
721  body->setIdInPlanTree(node_id_in_plan_tree);
722  }
723  size_t ctr = nodes.size();
724  size_t tab_ctr = 0;
725  RelRexToStringConfig config;
726  config.skip_input_nodes = true;
727  for (auto& body : boost::adaptors::reverse(nodes)) {
728  const auto index = ctr--;
729  const auto tabs = std::string(tab_ctr++, '\t');
730  CHECK(body);
731  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
732  if (auto sort = dynamic_cast<const RelSort*>(body)) {
733  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
734  }
735  if (dynamic_cast<const RelProject*>(body) ||
736  dynamic_cast<const RelCompound*>(body)) {
737  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
738  ss << tabs << " : " << join->toString(config) << "\n";
739  }
740  }
741  }
742  const auto& subqueries = getSubqueries();
743  if (!subqueries.empty()) {
744  ss << "Subqueries: "
745  << "\n";
746  for (const auto& subquery : subqueries) {
747  const auto ra = subquery->getRelAlg();
748  ss << "\t" << ra->toString(config) << "\n";
749  }
750  }
751  auto rs = std::make_shared<ResultSet>(ss.str());
752  return {rs, {}};
753  }
754 
755  if (eo.find_push_down_candidates) {
756  // this extra logic is mainly due to current limitations on multi-step queries
757  // and/or subqueries.
759  ed_seq, co, eo, render_info, queue_time_ms);
760  }
761  timer_setup.stop();
762 
763  // Dispatch the subqueries first
764  const auto global_hints = getGlobalQueryHint();
765  for (auto subquery : getSubqueries()) {
766  const auto subquery_ra = subquery->getRelAlg();
767  CHECK(subquery_ra);
768  if (subquery_ra->hasContextData()) {
769  continue;
770  }
771  // Execute the subquery and cache the result.
772  RelAlgExecutor subquery_executor(executor_, query_state_);
773  // Propagate global and local query hint if necessary
774  const auto local_hints = getParsedQueryHint(subquery_ra);
775  if (global_hints || local_hints) {
776  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
777  if (global_hints) {
778  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
779  }
780  if (local_hints) {
781  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
782  }
783  }
784  RaExecutionSequence subquery_seq(subquery_ra, executor_);
785  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
786  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
787  }
788  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
789 }
790 
792  AggregatedColRange agg_col_range_cache;
793  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
794  return executor_->computeColRangesCache(phys_inputs);
795 }
796 
798  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
799  return executor_->computeStringDictionaryGenerations(phys_inputs);
800 }
801 
803  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
804  return executor_->computeTableGenerations(phys_table_ids);
805 }
806 
807 Executor* RelAlgExecutor::getExecutor() const {
808  return executor_;
809 }
810 
812  CHECK(executor_);
813  executor_->row_set_mem_owner_ = nullptr;
814 }
815 
816 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
818  auto sort_node = dynamic_cast<const RelSort*>(root_node);
819  if (sort_node) {
820  // we assume that test query that needs join info does not contain any sort node
821  return {};
822  }
823  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
824  RelLeftDeepTreeIdsCollector visitor;
825  auto left_deep_tree_ids = visitor.visit(root_node);
826  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
827 }
828 
829 namespace {
830 
832  CHECK_EQ(size_t(1), sort->inputCount());
833  const auto source = sort->getInput(0);
834  if (dynamic_cast<const RelSort*>(source)) {
835  throw std::runtime_error("Sort node not supported as input to another sort");
836  }
837 }
838 
839 } // namespace
840 
842  const RaExecutionSequence& seq,
843  const size_t step_idx,
844  const CompilationOptions& co,
845  const ExecutionOptions& eo,
846  RenderInfo* render_info) {
847  INJECT_TIMER(executeRelAlgQueryStep);
848 
849  auto exe_desc_ptr = seq.getDescriptor(step_idx);
850  CHECK(exe_desc_ptr);
851  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
852 
853  size_t shard_count{0};
854  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
855  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
856  };
857 
858  if (sort) {
860  auto order_entries = get_order_entries(sort);
861  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
862  shard_count =
863  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
864  if (!shard_count) {
865  // No point in sorting on the leaf, only execute the input to the sort node.
866  CHECK_EQ(size_t(1), sort->inputCount());
867  const auto source = sort->getInput(0);
868  if (sort->collationCount() || node_is_aggregate(source)) {
869  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
870  CHECK_EQ(temp_seq.size(), size_t(1));
871  ExecutionOptions eo_copy = eo;
872  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
873  // Use subseq to avoid clearing existing temporary tables
874  return {
875  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
876  merge_type(source),
877  source->getId(),
878  false};
879  }
880  }
881  }
882  QueryStepExecutionResult query_step_result{ExecutionResult{},
883  merge_type(exe_desc_ptr->getBody()),
884  exe_desc_ptr->getBody()->getId(),
885  false};
886  try {
887  query_step_result.result = executeRelAlgSubSeq(
888  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
889  } catch (QueryMustRunOnCpu const& e) {
891  auto copied_co = co;
893  LOG(INFO) << "Retry the query via CPU mode";
894  query_step_result.result = executeRelAlgSubSeq(seq,
895  std::make_pair(step_idx, step_idx + 1),
896  copied_co,
897  eo,
898  render_info,
900  }
902  VLOG(1) << "Running post execution callback.";
903  (*post_execution_callback_)();
904  }
905  return query_step_result;
906 }
907 
909  const AggregatedColRange& agg_col_range,
910  const StringDictionaryGenerations& string_dictionary_generations,
911  const TableGenerations& table_generations) {
912  // capture the lock acquistion time
913  auto clock_begin = timer_start();
915  executor_->resetInterrupt();
916  }
917  queue_time_ms_ = timer_stop(clock_begin);
918  executor_->row_set_mem_owner_ =
919  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
920  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
921  executor_->table_generations_ = table_generations;
922  executor_->agg_col_range_cache_ = agg_col_range;
923 }
924 
926  const CompilationOptions& co,
927  const ExecutionOptions& eo,
928  RenderInfo* render_info,
929  const int64_t queue_time_ms,
930  const bool with_existing_temp_tables) {
932  auto timer = DEBUG_TIMER(__func__);
933  if (!with_existing_temp_tables) {
935  }
938  executor_->temporary_tables_ = &temporary_tables_;
939 
940  time(&now_);
941  CHECK(!seq.empty());
942 
943  auto get_descriptor_count = [&seq, &eo]() -> size_t {
944  if (eo.just_explain) {
945  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
946  // run the logical values descriptor to generate the result set, then the next
947  // descriptor to generate the explain
948  CHECK_GE(seq.size(), size_t(2));
949  return 2;
950  } else {
951  return 1;
952  }
953  } else {
954  return seq.size();
955  }
956  };
957 
958  const auto exec_desc_count = get_descriptor_count();
959  auto eo_copied = eo;
960  if (seq.hasQueryStepForUnion()) {
961  // we currently do not support resultset recycling when an input query
962  // contains union (all) operation
963  eo_copied.keep_result = false;
964  }
965 
966  // we have to register resultset(s) of the skipped query step(s) as temporary table
967  // before executing the remaining query steps
968  // since they may be required during the query processing
969  // i.e., get metadata of the target expression from the skipped query step
971  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
972  const auto cached_res =
973  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
974  CHECK(cached_res);
975  addTemporaryTable(kv.first, cached_res);
976  }
977  }
978 
979  const auto num_steps = exec_desc_count - 1;
980  for (size_t i = 0; i < exec_desc_count; i++) {
981  VLOG(1) << "Executing query step " << i << " / " << num_steps;
982  try {
984  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
985  } catch (const QueryMustRunOnCpu&) {
986  // Do not allow per-step retry if flag is off or in distributed mode
987  // TODO(todd): Determine if and when we can relax this restriction
988  // for distributed
991  throw;
992  }
993  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
994  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
995  if (render_info && i == num_steps) {
996  // only render on the last step
997  render_info->forceNonInSitu();
998  }
999  executeRelAlgStep(seq,
1000  i,
1001  co_cpu,
1002  eo_copied,
1003  (i == num_steps) ? render_info : nullptr,
1004  queue_time_ms);
1005  } catch (const NativeExecutionError&) {
1006  if (!g_enable_interop) {
1007  throw;
1008  }
1009  auto eo_extern = eo_copied;
1010  eo_extern.executor_type = ::ExecutorType::Extern;
1011  auto exec_desc_ptr = seq.getDescriptor(i);
1012  const auto body = exec_desc_ptr->getBody();
1013  const auto compound = dynamic_cast<const RelCompound*>(body);
1014  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
1015  LOG(INFO) << "Also failed to run the query using interoperability";
1016  throw;
1017  }
1019  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
1020  }
1021  }
1022 
1023  return seq.getDescriptor(num_steps)->getResult();
1024 }
1025 
1027  const RaExecutionSequence& seq,
1028  const std::pair<size_t, size_t> interval,
1029  const CompilationOptions& co,
1030  const ExecutionOptions& eo,
1031  RenderInfo* render_info,
1032  const int64_t queue_time_ms) {
1034  executor_->temporary_tables_ = &temporary_tables_;
1036  time(&now_);
1037  for (size_t i = interval.first; i < interval.second; i++) {
1038  // only render on the last step
1039  try {
1040  executeRelAlgStep(seq,
1041  i,
1042  co,
1043  eo,
1044  (i == interval.second - 1) ? render_info : nullptr,
1045  queue_time_ms);
1046  } catch (const QueryMustRunOnCpu&) {
1047  // Do not allow per-step retry if flag is off or in distributed mode
1048  // TODO(todd): Determine if and when we can relax this restriction
1049  // for distributed
1052  throw;
1053  }
1054  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1055  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1056  if (render_info && i == interval.second - 1) {
1057  render_info->forceNonInSitu();
1058  }
1059  executeRelAlgStep(seq,
1060  i,
1061  co_cpu,
1062  eo,
1063  (i == interval.second - 1) ? render_info : nullptr,
1064  queue_time_ms);
1065  }
1066  }
1067 
1068  return seq.getDescriptor(interval.second - 1)->getResult();
1069 }
1070 
1072  const size_t step_idx,
1073  const CompilationOptions& co,
1074  const ExecutionOptions& eo,
1075  RenderInfo* render_info,
1076  const int64_t queue_time_ms) {
1078  auto timer = DEBUG_TIMER(__func__);
1079  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1080  CHECK(exec_desc_ptr);
1081  auto& exec_desc = *exec_desc_ptr;
1082  const auto body = exec_desc.getBody();
1083  if (body->isNop()) {
1084  handleNop(exec_desc);
1085  return;
1086  }
1087  ExecutionOptions eo_work_unit = eo;
1088  eo_work_unit.with_watchdog =
1089  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1090  eo_work_unit.outer_fragment_indices =
1091  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1092 
1093  auto handle_hint = [co,
1094  eo_work_unit,
1095  body,
1096  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1097  ExecutionOptions eo_hint_applied = eo_work_unit;
1098  CompilationOptions co_hint_applied = co;
1099  auto target_node = body;
1100  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1101  target_node = sort_body->getInput(0);
1102  }
1103  auto query_hints = getParsedQueryHint(target_node);
1104  auto columnar_output_hint_enabled = false;
1105  auto rowwise_output_hint_enabled = false;
1106  if (query_hints) {
1107  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1108  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1109  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1110  }
1111  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1112  if (!g_enable_data_recycler) {
1113  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1114  "recycler is disabled";
1115  }
1117  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1118  "resultset recycler is disabled";
1119  } else {
1120  VLOG(1) << "A user enables keeping query resultset";
1121  eo_hint_applied.keep_result = true;
1122  }
1123  }
1124  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1125  // we use this hint within the function 'executeTableFunction`
1126  if (!g_enable_data_recycler) {
1127  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1128  "since data recycler is disabled";
1129  }
1131  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1132  "since query resultset recycler is disabled";
1133  } else {
1134  VLOG(1) << "A user enables keeping table function's resultset";
1135  eo_hint_applied.keep_result = true;
1136  }
1137  }
1138  if (query_hints->isHintRegistered(QueryHint::kWatchdog)) {
1139  if (!eo_hint_applied.with_watchdog) {
1140  VLOG(1) << "A user enables watchdog for this query";
1141  eo_hint_applied.with_watchdog = true;
1142  }
1143  }
1144  if (query_hints->isHintRegistered(QueryHint::kWatchdogOff)) {
1145  if (eo_hint_applied.with_watchdog) {
1146  VLOG(1) << "A user disables watchdog for this query";
1147  eo_hint_applied.with_watchdog = false;
1148  }
1149  }
1150  if (query_hints->isHintRegistered(QueryHint::kDynamicWatchdog)) {
1151  if (!eo_hint_applied.with_dynamic_watchdog) {
1152  VLOG(1) << "A user enables dynamic watchdog for this query";
1153  eo_hint_applied.with_watchdog = true;
1154  }
1155  }
1156  if (query_hints->isHintRegistered(QueryHint::kDynamicWatchdogOff)) {
1157  if (eo_hint_applied.with_dynamic_watchdog) {
1158  VLOG(1) << "A user disables dynamic watchdog for this query";
1159  eo_hint_applied.with_watchdog = false;
1160  }
1161  }
1162  if (query_hints->isHintRegistered(QueryHint::kQueryTimeLimit)) {
1163  std::ostringstream oss;
1164  oss << "A user sets query time limit to " << query_hints->query_time_limit
1165  << " ms";
1166  eo_hint_applied.dynamic_watchdog_time_limit = query_hints->query_time_limit;
1167  if (!eo_hint_applied.with_dynamic_watchdog) {
1168  eo_hint_applied.with_dynamic_watchdog = true;
1169  oss << " (and system automatically enables dynamic watchdog to activate the "
1170  "given \"query_time_limit\" hint)";
1171  }
1172  VLOG(1) << oss.str();
1173  }
1174  if (query_hints->isHintRegistered(QueryHint::kAllowLoopJoin)) {
1175  VLOG(1) << "A user enables loop join";
1176  eo_hint_applied.allow_loop_joins = true;
1177  }
1178  if (query_hints->isHintRegistered(QueryHint::kDisableLoopJoin)) {
1179  VLOG(1) << "A user disables loop join";
1180  eo_hint_applied.allow_loop_joins = false;
1181  }
1182  if (query_hints->isHintRegistered(QueryHint::kMaxJoinHashTableSize)) {
1183  eo_hint_applied.max_join_hash_table_size = query_hints->max_join_hash_table_size;
1184  VLOG(1) << "A user forces the maximum size of a join hash table as "
1185  << eo_hint_applied.max_join_hash_table_size << " bytes";
1186  }
1187  if (query_hints->isHintRegistered(QueryHint::kOptCudaBlockAndGridSizes)) {
1188  if (query_hints->isHintRegistered(QueryHint::kCudaGridSize) ||
1189  query_hints->isHintRegistered(QueryHint::kCudaBlockSize)) {
1190  VLOG(1) << "Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1191  "of the following query hints are given simultaneously: "
1192  "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1193  } else {
1194  VLOG(1) << "A user enables optimization of cuda block and grid sizes";
1195  eo_hint_applied.optimize_cuda_block_and_grid_sizes = true;
1196  }
1197  }
1198  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1199  VLOG(1) << "A user forces the query to run with columnar output";
1200  columnar_output_hint_enabled = true;
1201  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1202  VLOG(1) << "A user forces the query to run with rowwise output";
1203  rowwise_output_hint_enabled = true;
1204  }
1205  }
1206  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1207  ? !rowwise_output_hint_enabled
1208  : columnar_output_hint_enabled;
1209  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1210  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1211  "output layout in distributed mode.";
1212  }
1213  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1214  return std::make_pair(co_hint_applied, eo_hint_applied);
1215  };
1216 
1217  auto hint_applied = handle_hint();
1219 
1220  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1221  if (auto cached_resultset =
1222  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1223  body->getQueryPlanDagHash())) {
1224  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1225  << " from resultset cache";
1226  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1227  if (render_info) {
1228  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1229  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1230  body->getQueryPlanDagHash());
1231  std::vector<Analyzer::Expr*> copied_target_exprs;
1232  for (const auto& expr : cached_target_exprs) {
1233  copied_target_exprs.push_back(expr.get());
1234  }
1236  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1237  }
1238  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1239  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1240  return;
1241  }
1242  }
1243 
1244  const auto compound = dynamic_cast<const RelCompound*>(body);
1245  if (compound) {
1246  if (compound->isDeleteViaSelect()) {
1247  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1248  } else if (compound->isUpdateViaSelect()) {
1249  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1250  } else {
1251  exec_desc.setResult(executeCompound(
1252  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1253  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1254  << static_cast<int>(-compound->getId()) << ", ...)"
1255  << " exec_desc.getResult().getDataPtr()->rowCount()="
1256  << exec_desc.getResult().getDataPtr()->rowCount();
1257  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1258  return;
1259  }
1260  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1261  }
1262  return;
1263  }
1264  const auto project = dynamic_cast<const RelProject*>(body);
1265  if (project) {
1266  if (project->isDeleteViaSelect()) {
1267  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1268  } else if (project->isUpdateViaSelect()) {
1269  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1270  } else {
1271  std::optional<size_t> prev_count;
1272  // Disabling the intermediate count optimization in distributed, as the previous
1273  // execution descriptor will likely not hold the aggregated result.
1274  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1275  // If the previous node produced a reliable count, skip the pre-flight count.
1276  RelAlgNode const* const prev_body = project->getInput(0);
1277  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1278  if (RaExecutionDesc const* const prev_exec_desc =
1279  prev_body->hasContextData()
1280  ? prev_body->getContextData()
1281  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1282  const auto& prev_exe_result = prev_exec_desc->getResult();
1283  const auto prev_result = prev_exe_result.getRows();
1284  if (prev_result) {
1285  prev_count = prev_result->rowCount();
1286  VLOG(3) << "Setting output row count for projection node to previous node ("
1287  << prev_exec_desc->getBody()->toString(
1289  << ") to " << *prev_count;
1290  }
1291  }
1292  }
1293  }
1294  exec_desc.setResult(executeProject(project,
1295  hint_applied.first,
1296  hint_applied.second,
1297  render_info,
1298  queue_time_ms,
1299  prev_count));
1300  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1301  << static_cast<int>(-project->getId()) << ", ...)"
1302  << " exec_desc.getResult().getDataPtr()->rowCount()="
1303  << exec_desc.getResult().getDataPtr()->rowCount();
1304  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1305  return;
1306  }
1307  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1308  }
1309  return;
1310  }
1311  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1312  if (aggregate) {
1313  exec_desc.setResult(executeAggregate(
1314  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1315  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1316  return;
1317  }
1318  const auto filter = dynamic_cast<const RelFilter*>(body);
1319  if (filter) {
1320  exec_desc.setResult(executeFilter(
1321  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1322  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1323  return;
1324  }
1325  const auto sort = dynamic_cast<const RelSort*>(body);
1326  if (sort) {
1327  exec_desc.setResult(executeSort(
1328  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1329  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1330  return;
1331  }
1332  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1333  return;
1334  }
1335  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1336  if (logical_values) {
1337  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1338  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1339  return;
1340  }
1341  const auto modify = dynamic_cast<const RelModify*>(body);
1342  if (modify) {
1343  exec_desc.setResult(executeModify(modify, hint_applied.second));
1344  return;
1345  }
1346  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1347  if (logical_union) {
1348  exec_desc.setResult(executeUnion(logical_union,
1349  seq,
1350  hint_applied.first,
1351  hint_applied.second,
1352  render_info,
1353  queue_time_ms));
1354  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1355  return;
1356  }
1357  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1358  if (table_func) {
1359  exec_desc.setResult(executeTableFunction(
1360  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1361  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1362  return;
1363  }
1364  LOG(FATAL) << "Unhandled body type: "
1365  << body->toString(RelRexToStringConfig::defaults());
1366 }
1367 
1369  // just set the result of the previous node as the result of no op
1370  auto body = ed.getBody();
1371  CHECK(dynamic_cast<const RelAggregate*>(body));
1372  CHECK_EQ(size_t(1), body->inputCount());
1373  const auto input = body->getInput(0);
1374  body->setOutputMetainfo(input->getOutputMetainfo());
1375  const auto it = temporary_tables_.find(-input->getId());
1376  CHECK(it != temporary_tables_.end());
1377  // set up temp table as it could be used by the outer query or next step
1378  addTemporaryTable(-body->getId(), it->second);
1379 
1380  ed.setResult({it->second, input->getOutputMetainfo()});
1381 }
1382 
1383 namespace {
1384 
1385 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1386  public:
1387  RexUsedInputsVisitor() : RexVisitor() {}
1388 
1389  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1390  return synthesized_physical_inputs_owned;
1391  }
1392 
1393  std::unordered_set<const RexInput*> visitInput(
1394  const RexInput* rex_input) const override {
1395  const auto input_ra = rex_input->getSourceNode();
1396  CHECK(input_ra);
1397  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1398  if (scan_ra) {
1399  const auto td = scan_ra->getTableDescriptor();
1400  if (td) {
1401  const auto col_id = rex_input->getIndex();
1402  const auto cd =
1403  scan_ra->getCatalog().getMetadataForColumnBySpi(td->tableId, col_id + 1);
1404  if (cd && cd->columnType.get_physical_cols() > 0) {
1405  CHECK(IS_GEO(cd->columnType.get_type()));
1406  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1407  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1408  auto physical_input =
1409  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1410  synthesized_physical_inputs_owned.emplace_back(physical_input);
1411  synthesized_physical_inputs.insert(physical_input);
1412  }
1413  return synthesized_physical_inputs;
1414  }
1415  }
1416  }
1417  return {rex_input};
1418  }
1419 
1420  protected:
1421  std::unordered_set<const RexInput*> aggregateResult(
1422  const std::unordered_set<const RexInput*>& aggregate,
1423  const std::unordered_set<const RexInput*>& next_result) const override {
1424  auto result = aggregate;
1425  result.insert(next_result.begin(), next_result.end());
1426  return result;
1427  }
1428 
1429  private:
1430  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1431 };
1432 
1433 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1434  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1435  return table_func;
1436  }
1437  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1438  CHECK_EQ(size_t(2), join->inputCount());
1439  return join;
1440  }
1441  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1442  CHECK_EQ(size_t(1), ra_node->inputCount());
1443  }
1444  auto only_src = ra_node->getInput(0);
1445  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1446  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1447  return is_join ? only_src : ra_node;
1448 }
1449 
1450 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1451 get_used_inputs(const RelCompound* compound) {
1452  RexUsedInputsVisitor visitor;
1453  const auto filter_expr = compound->getFilterExpr();
1454  std::unordered_set<const RexInput*> used_inputs =
1455  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1456  const auto sources_size = compound->getScalarSourcesSize();
1457  for (size_t i = 0; i < sources_size; ++i) {
1458  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1459  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1460  }
1461  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1462  return std::make_pair(used_inputs, used_inputs_owned);
1463 }
1464 
1465 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1466 get_used_inputs(const RelAggregate* aggregate) {
1467  CHECK_EQ(size_t(1), aggregate->inputCount());
1468  std::unordered_set<const RexInput*> used_inputs;
1469  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1470  const auto source = aggregate->getInput(0);
1471  const auto& in_metainfo = source->getOutputMetainfo();
1472  const auto group_count = aggregate->getGroupByCount();
1473  CHECK_GE(in_metainfo.size(), group_count);
1474  for (size_t i = 0; i < group_count; ++i) {
1475  auto synthesized_used_input = new RexInput(source, i);
1476  used_inputs_owned.emplace_back(synthesized_used_input);
1477  used_inputs.insert(synthesized_used_input);
1478  }
1479  for (const auto& agg_expr : aggregate->getAggExprs()) {
1480  for (size_t i = 0; i < agg_expr->size(); ++i) {
1481  const auto operand_idx = agg_expr->getOperand(i);
1482  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1483  auto synthesized_used_input = new RexInput(source, operand_idx);
1484  used_inputs_owned.emplace_back(synthesized_used_input);
1485  used_inputs.insert(synthesized_used_input);
1486  }
1487  }
1488  return std::make_pair(used_inputs, used_inputs_owned);
1489 }
1490 
1491 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1492 get_used_inputs(const RelProject* project) {
1493  RexUsedInputsVisitor visitor;
1494  std::unordered_set<const RexInput*> used_inputs;
1495  for (size_t i = 0; i < project->size(); ++i) {
1496  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1497  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1498  }
1499  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1500  return std::make_pair(used_inputs, used_inputs_owned);
1501 }
1502 
1503 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1504 get_used_inputs(const RelTableFunction* table_func) {
1505  RexUsedInputsVisitor visitor;
1506  std::unordered_set<const RexInput*> used_inputs;
1507  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1508  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1509  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1510  }
1511  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1512  return std::make_pair(used_inputs, used_inputs_owned);
1513 }
1514 
1515 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1516 get_used_inputs(const RelFilter* filter) {
1517  std::unordered_set<const RexInput*> used_inputs;
1518  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1519  const auto data_sink_node = get_data_sink(filter);
1520  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1521  const auto source = data_sink_node->getInput(nest_level);
1522  const auto scan_source = dynamic_cast<const RelScan*>(source);
1523  if (scan_source) {
1524  CHECK(source->getOutputMetainfo().empty());
1525  for (size_t i = 0; i < scan_source->size(); ++i) {
1526  auto synthesized_used_input = new RexInput(scan_source, i);
1527  used_inputs_owned.emplace_back(synthesized_used_input);
1528  used_inputs.insert(synthesized_used_input);
1529  }
1530  } else {
1531  const auto& partial_in_metadata = source->getOutputMetainfo();
1532  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1533  auto synthesized_used_input = new RexInput(source, i);
1534  used_inputs_owned.emplace_back(synthesized_used_input);
1535  used_inputs.insert(synthesized_used_input);
1536  }
1537  }
1538  }
1539  return std::make_pair(used_inputs, used_inputs_owned);
1540 }
1541 
1542 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1543 get_used_inputs(const RelLogicalUnion* logical_union) {
1544  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1545  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1546  used_inputs_owned.reserve(logical_union->inputCount());
1547  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1548  auto const n_inputs = logical_union->inputCount();
1549  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1550  auto input = logical_union->getInput(nest_level);
1551  for (size_t i = 0; i < input->size(); ++i) {
1552  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1553  used_inputs.insert(used_inputs_owned.back().get());
1554  }
1555  }
1556  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1557 }
1558 
1560  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1561  shared::TableKey table_key{0, int32_t(-ra_node->getId())};
1562  if (scan_ra) {
1563  table_key.db_id = scan_ra->getCatalog().getDatabaseId();
1564  const auto td = scan_ra->getTableDescriptor();
1565  CHECK(td);
1566  table_key.table_id = td->tableId;
1567  }
1568  return table_key;
1569 }
1570 
1571 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1572  const RelAlgNode* ra_node,
1573  const std::vector<size_t>& input_permutation) {
1574  const auto data_sink_node = get_data_sink(ra_node);
1575  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1576  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1577  const auto input_node_idx =
1578  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1579  const auto input_ra = data_sink_node->getInput(input_node_idx);
1580  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1581  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1582  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1583  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1584  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1585  CHECK(it_ok.second);
1586  LOG_IF(DEBUG1, !input_permutation.empty())
1587  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1588  << " to nest level " << input_idx;
1589  }
1590  return input_to_nest_level;
1591 }
1592 
1593 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1595  const auto data_sink_node = get_data_sink(ra_node);
1596  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1597  CHECK_EQ(join->inputCount(), 2u);
1598  const auto condition = join->getCondition();
1599  RexUsedInputsVisitor visitor;
1600  auto condition_inputs = visitor.visit(condition);
1601  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1602  visitor.get_inputs_owned());
1603  return std::make_pair(condition_inputs, condition_inputs_owned);
1604  }
1605 
1606  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1607  CHECK_GE(left_deep_join->inputCount(), 2u);
1608  const auto condition = left_deep_join->getInnerCondition();
1609  RexUsedInputsVisitor visitor;
1610  auto result = visitor.visit(condition);
1611  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1612  ++nesting_level) {
1613  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1614  if (outer_condition) {
1615  const auto outer_result = visitor.visit(outer_condition);
1616  result.insert(outer_result.begin(), outer_result.end());
1617  }
1618  }
1619  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1620  return std::make_pair(result, used_inputs_owned);
1621  }
1622 
1623  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1624  CHECK_GT(ra_node->inputCount(), 1u)
1626  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1627  // no-op
1628  CHECK_GE(ra_node->inputCount(), 0u)
1630  } else {
1631  CHECK_EQ(ra_node->inputCount(), 1u)
1633  }
1634  return std::make_pair(std::unordered_set<const RexInput*>{},
1635  std::vector<std::shared_ptr<RexInput>>{});
1636 }
1637 
1639  std::vector<InputDescriptor>& input_descs,
1640  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1641  const RelAlgNode* ra_node,
1642  const std::unordered_set<const RexInput*>& source_used_inputs,
1643  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1644  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1645  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1646  << " source_used_inputs.size()=" << source_used_inputs.size();
1647  for (const auto used_input : source_used_inputs) {
1648  const auto input_ra = used_input->getSourceNode();
1649  const auto table_key = table_key_from_ra(input_ra);
1650  auto col_id = used_input->getIndex();
1651  auto it = input_to_nest_level.find(input_ra);
1652  if (it != input_to_nest_level.end()) {
1653  const int nest_level = it->second;
1654  if (auto rel_scan = dynamic_cast<const RelScan*>(input_ra)) {
1655  const auto& catalog = rel_scan->getCatalog();
1656  col_id = catalog.getColumnIdBySpi(table_key.table_id, col_id + 1);
1657  }
1658  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1659  col_id, table_key.table_id, table_key.db_id, nest_level));
1660  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1661  throw std::runtime_error("Bushy joins not supported");
1662  }
1663  }
1664 }
1665 
1666 template <class RA>
1667 std::pair<std::vector<InputDescriptor>,
1668  std::list<std::shared_ptr<const InputColDescriptor>>>
1669 get_input_desc_impl(const RA* ra_node,
1670  const std::unordered_set<const RexInput*>& used_inputs,
1671  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1672  const std::vector<size_t>& input_permutation) {
1673  std::vector<InputDescriptor> input_descs;
1674  const auto data_sink_node = get_data_sink(ra_node);
1675  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1676  const auto input_node_idx =
1677  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1678  auto input_ra = data_sink_node->getInput(input_node_idx);
1679  const auto table_key = table_key_from_ra(input_ra);
1680  input_descs.emplace_back(table_key.db_id, table_key.table_id, input_idx);
1681  }
1682  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1683  collect_used_input_desc(input_descs,
1684  input_col_descs_unique, // modified
1685  ra_node,
1686  used_inputs,
1687  input_to_nest_level);
1688  std::unordered_set<const RexInput*> join_source_used_inputs;
1689  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1690  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1691  get_join_source_used_inputs(ra_node);
1692  collect_used_input_desc(input_descs,
1693  input_col_descs_unique, // modified
1694  ra_node,
1695  join_source_used_inputs,
1696  input_to_nest_level);
1697  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1698  input_col_descs_unique.begin(), input_col_descs_unique.end());
1699 
1700  std::sort(input_col_descs.begin(),
1701  input_col_descs.end(),
1702  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1703  std::shared_ptr<const InputColDescriptor> const& rhs) {
1704  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1705  lhs->getColId(),
1706  lhs->getScanDesc().getTableKey()) <
1707  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1708  rhs->getColId(),
1709  rhs->getScanDesc().getTableKey());
1710  });
1711  return {input_descs,
1712  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1713  input_col_descs.end())};
1714 }
1715 
1716 template <class RA>
1717 std::tuple<std::vector<InputDescriptor>,
1718  std::list<std::shared_ptr<const InputColDescriptor>>,
1719  std::vector<std::shared_ptr<RexInput>>>
1720 get_input_desc(const RA* ra_node,
1721  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1722  const std::vector<size_t>& input_permutation) {
1723  std::unordered_set<const RexInput*> used_inputs;
1724  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1725  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node);
1726  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1727  auto input_desc_pair =
1728  get_input_desc_impl(ra_node, used_inputs, input_to_nest_level, input_permutation);
1729  return std::make_tuple(
1730  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1731 }
1732 
1733 size_t get_scalar_sources_size(const RelCompound* compound) {
1734  return compound->getScalarSourcesSize();
1735 }
1736 
1737 size_t get_scalar_sources_size(const RelProject* project) {
1738  return project->size();
1739 }
1740 
1741 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1742  return table_func->getTableFuncInputsSize();
1743 }
1744 
1745 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1746  return compound->getScalarSource(i);
1747 }
1748 
1749 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1750  return project->getProjectAt(i);
1751 }
1752 
1753 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1754  return table_func->getTableFuncInputAt(i);
1755 }
1756 
1757 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1758  const std::shared_ptr<Analyzer::Expr> expr) {
1759  const auto& ti = expr->get_type_info();
1760  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1761  return expr;
1762  }
1763  auto transient_dict_ti = ti;
1764  transient_dict_ti.set_compression(kENCODING_DICT);
1765  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1766  transient_dict_ti.set_fixed_size();
1767  return expr->add_cast(transient_dict_ti);
1768 }
1769 
1771  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1772  const std::shared_ptr<Analyzer::Expr>& expr) {
1773  try {
1774  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1775  } catch (...) {
1776  scalar_sources.push_back(fold_expr(expr.get()));
1777  }
1778 }
1779 
1780 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1781  const std::shared_ptr<Analyzer::Expr>& input) {
1782  const auto& input_ti = input->get_type_info();
1783  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1784  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1785  }
1786  return input;
1787 }
1788 
1789 template <class RA>
1790 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1791  const RA* ra_node,
1792  const RelAlgTranslator& translator,
1793  const ::ExecutorType executor_type) {
1794  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1795  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1796  VLOG(3) << "get_scalar_sources_size("
1797  << ra_node->toString(RelRexToStringConfig::defaults())
1798  << ") = " << scalar_sources_size;
1799  for (size_t i = 0; i < scalar_sources_size; ++i) {
1800  const auto scalar_rex = scalar_at(i, ra_node);
1801  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1802  // RexRef are synthetic scalars we append at the end of the real ones
1803  // for the sake of taking memory ownership, no real work needed here.
1804  continue;
1805  }
1806 
1807  const auto scalar_expr =
1808  rewrite_array_elements(translator.translate(scalar_rex).get());
1809  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1810  if (executor_type == ExecutorType::Native) {
1811  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1812  } else if (executor_type == ExecutorType::TableFunctions) {
1813  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1814  } else {
1815  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1816  }
1817  }
1818 
1819  return scalar_sources;
1820 }
1821 
1822 template <class RA>
1823 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1824  const RA* ra_node,
1825  const RelAlgTranslator& translator,
1826  int32_t tableId,
1828  const ColumnNameList& colNames,
1829  size_t starting_projection_column_idx) {
1830  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1831  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1832  const auto scalar_rex = scalar_at(i, ra_node);
1833  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1834  // RexRef are synthetic scalars we append at the end of the real ones
1835  // for the sake of taking memory ownership, no real work needed here.
1836  continue;
1837  }
1838 
1839  std::shared_ptr<Analyzer::Expr> translated_expr;
1840  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1841  translated_expr = cast_to_column_type(translator.translate(scalar_rex),
1842  tableId,
1843  cat,
1844  colNames[i - starting_projection_column_idx]);
1845  } else {
1846  translated_expr = translator.translate(scalar_rex);
1847  }
1848  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1849  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1850  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1851  }
1852 
1853  return scalar_sources;
1854 }
1855 
1856 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1857  const RelCompound* compound,
1858  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1859  if (!compound->isAggregate()) {
1860  return {nullptr};
1861  }
1862  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1863  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1864  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1865  }
1866  return groupby_exprs;
1867 }
1868 
1869 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1870  const RelAggregate* aggregate,
1871  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1872  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1873  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1874  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1875  }
1876  return groupby_exprs;
1877 }
1878 
1880  const RelAlgTranslator& translator) {
1881  const auto filter_rex = compound->getFilterExpr();
1882  const auto filter_expr = filter_rex ? translator.translate(filter_rex) : nullptr;
1883  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1885 }
1886 
1887 namespace {
1888 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1889 // bother decoding it. This is done by changing the sql type to an integer.
1891  size_t target_expr_idx,
1892  std::shared_ptr<Analyzer::Expr>& target_expr,
1893  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1894  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1895  CHECK(agg_expr);
1896  if (agg_expr->get_is_distinct()) {
1897  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1898  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1899  target_exprs_type_infos.emplace(target_expr_idx, ti);
1900  target_expr = target_expr->deep_copy();
1901  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1903  return;
1904  }
1905  }
1906  target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1907 }
1908 } // namespace
1909 
1910 std::vector<Analyzer::Expr*> translate_targets(
1911  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1912  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1913  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1914  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1915  const RelCompound* compound,
1916  const RelAlgTranslator& translator,
1917  const ExecutorType executor_type) {
1918  std::vector<Analyzer::Expr*> target_exprs;
1919  for (size_t i = 0; i < compound->size(); ++i) {
1920  const auto target_rex = compound->getTargetExpr(i);
1921  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1922  std::shared_ptr<Analyzer::Expr> target_expr;
1923  if (target_rex_agg) {
1924  target_expr =
1925  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1926  conditionally_change_arg_to_int_type(i, target_expr, target_exprs_type_infos);
1927  } else {
1928  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1929  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1930  if (target_rex_ref) {
1931  const auto ref_idx = target_rex_ref->getIndex();
1932  CHECK_GE(ref_idx, size_t(1));
1933  CHECK_LE(ref_idx, groupby_exprs.size());
1934  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1935  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1936  } else {
1937  target_expr =
1938  rewrite_array_elements(translator.translate(target_rex_scalar).get());
1939  auto rewritten_expr = rewrite_expr(target_expr.get());
1940  target_expr = fold_expr(rewritten_expr.get());
1941  if (executor_type == ExecutorType::Native) {
1942  try {
1943  target_expr = set_transient_dict(target_expr);
1944  } catch (...) {
1945  // noop
1946  }
1947  } else {
1948  target_expr = cast_dict_to_none(target_expr);
1949  }
1950  }
1951  target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1952  }
1953  CHECK(target_expr);
1954  target_exprs_owned.push_back(target_expr);
1955  target_exprs.push_back(target_expr.get());
1956  }
1957  return target_exprs;
1958 }
1959 
1960 std::vector<Analyzer::Expr*> translate_targets(
1961  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1962  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1963  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1964  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1965  const RelAggregate* aggregate,
1966  const RelAlgTranslator& translator) {
1967  std::vector<Analyzer::Expr*> target_exprs;
1968  size_t group_key_idx = 1;
1969  for (const auto& groupby_expr : groupby_exprs) {
1970  auto target_expr =
1971  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1972  target_exprs_owned.push_back(target_expr);
1973  target_exprs.push_back(target_expr.get());
1974  }
1975 
1976  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1977  auto target_expr =
1978  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1979  CHECK(target_expr);
1980  target_expr = fold_expr(target_expr.get());
1981  target_exprs_owned.push_back(target_expr);
1982  target_exprs.push_back(target_expr.get());
1983  }
1984  return target_exprs;
1985 }
1986 
1988  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1989  return agg_expr && agg_expr->get_is_distinct();
1990 }
1991 
1992 bool is_agg(const Analyzer::Expr* expr) {
1993  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1994  if (agg_expr && agg_expr->get_contains_agg()) {
1995  auto agg_type = agg_expr->get_aggtype();
1996  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1997  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kSUM_IF ||
1998  agg_type == SQLAgg::kAVG) {
1999  return true;
2000  }
2001  }
2002  return false;
2003 }
2004 
2006  if (is_count_distinct(&expr)) {
2007  return SQLTypeInfo(kBIGINT, false);
2008  } else if (is_agg(&expr)) {
2010  }
2011  return get_logical_type_info(expr.get_type_info());
2012 }
2013 
2014 template <class RA>
2015 std::vector<TargetMetaInfo> get_targets_meta(
2016  const RA* ra_node,
2017  const std::vector<Analyzer::Expr*>& target_exprs) {
2018  std::vector<TargetMetaInfo> targets_meta;
2019  CHECK_EQ(ra_node->size(), target_exprs.size());
2020  for (size_t i = 0; i < ra_node->size(); ++i) {
2021  CHECK(target_exprs[i]);
2022  // TODO(alex): remove the count distinct type fixup.
2023  targets_meta.emplace_back(ra_node->getFieldName(i),
2024  get_logical_type_for_expr(*target_exprs[i]),
2025  target_exprs[i]->get_type_info());
2026  }
2027  return targets_meta;
2028 }
2029 
2030 template <>
2031 std::vector<TargetMetaInfo> get_targets_meta(
2032  const RelFilter* filter,
2033  const std::vector<Analyzer::Expr*>& target_exprs) {
2034  RelAlgNode const* input0 = filter->getInput(0);
2035  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
2036  return get_targets_meta(input, target_exprs);
2037  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
2038  return get_targets_meta(input, target_exprs);
2039  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
2040  return get_targets_meta(input, target_exprs);
2041  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
2042  return get_targets_meta(input, target_exprs);
2043  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
2044  return get_targets_meta(input, target_exprs);
2045  } else if (auto const* input = dynamic_cast<RelLogicalValues const*>(input0)) {
2046  return get_targets_meta(input, target_exprs);
2047  }
2048  UNREACHABLE() << "Unhandled node type: "
2050  return {};
2051 }
2052 
2053 } // namespace
2054 
2056  const CompilationOptions& co_in,
2057  const ExecutionOptions& eo_in,
2058  const int64_t queue_time_ms) {
2059  CHECK(node);
2060  auto timer = DEBUG_TIMER(__func__);
2061 
2062  auto co = co_in;
2063  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2064  // encoded string updates
2065 
2066  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2067  auto& work_unit,
2068  const bool is_aggregate) {
2069  auto table_descriptor = node->getModifiedTableDescriptor();
2070  CHECK(table_descriptor);
2071  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2072  throw std::runtime_error(
2073  "UPDATE queries involving variable length columns are only supported on tables "
2074  "with the vacuum attribute set to 'delayed'");
2075  }
2076 
2077  auto catalog = node->getModifiedTableCatalog();
2078  CHECK(catalog);
2079  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2080 
2082  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2083  *catalog,
2084  node->getTargetColumns(),
2085  node->getOutputMetainfo(),
2086  node->isVarlenUpdateRequired());
2087 
2088  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2089 
2090  auto execute_update_ra_exe_unit =
2091  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2092  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2094 
2095  auto eo = eo_in;
2096  if (dml_transaction_parameters_->tableIsTemporary()) {
2097  eo.output_columnar_hint = true;
2098  co_project.allow_lazy_fetch = false;
2099  co_project.filter_on_deleted_column =
2100  false; // project the entire delete column for columnar update
2101  }
2102 
2103  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2104  dml_transaction_parameters_.get());
2105  update_transaction_parameters->setInputSourceNode(node);
2106  CHECK(update_transaction_parameters);
2107  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2108  try {
2109  auto table_update_metadata =
2110  executor_->executeUpdate(ra_exe_unit,
2111  table_infos,
2112  table_descriptor,
2113  co_project,
2114  eo,
2115  *catalog,
2116  executor_->row_set_mem_owner_,
2117  update_callback,
2118  is_aggregate);
2119  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2120  dml_transaction_parameters_->finalizeTransaction(*catalog);
2121  TableOptimizer table_optimizer{
2122  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2123  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2124  };
2125  } catch (const QueryExecutionError& e) {
2126  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2127  }
2128  };
2129 
2130  if (dml_transaction_parameters_->tableIsTemporary()) {
2131  // hold owned target exprs during execution if rewriting
2132  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2133  // rewrite temp table updates to generate the full column by moving the where
2134  // clause into a case if such a rewrite is not possible, bail on the update
2135  // operation build an expr for the update target
2136  auto update_transaction_params =
2137  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2138  CHECK(update_transaction_params);
2139  const auto td = update_transaction_params->getTableDescriptor();
2140  CHECK(td);
2141  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2142  if (update_column_names.size() > 1) {
2143  throw std::runtime_error(
2144  "Multi-column update is not yet supported for temporary tables.");
2145  }
2146 
2147  const auto cd =
2148  catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2149  CHECK(cd);
2150  auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2151  cd->columnType,
2152  shared::ColumnKey{catalog->getDatabaseId(), td->tableId, cd->columnId},
2153  0);
2154  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2155  work_unit.exe_unit, projected_column_to_update);
2156  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2157  throw std::runtime_error(
2158  "Variable length updates not yet supported on temporary tables.");
2159  }
2160  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2161  } else {
2162  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2163  }
2164  };
2165 
2166  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2167  auto work_unit =
2168  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2169 
2170  execute_update_for_node(compound, work_unit, compound->isAggregate());
2171  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2172  auto work_unit =
2173  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2174 
2175  if (project->isSimple()) {
2176  CHECK_EQ(size_t(1), project->inputCount());
2177  const auto input_ra = project->getInput(0);
2178  if (dynamic_cast<const RelSort*>(input_ra)) {
2179  const auto& input_table =
2180  get_temporary_table(&temporary_tables_, -input_ra->getId());
2181  CHECK(input_table);
2182  work_unit.exe_unit.scan_limit = input_table->rowCount();
2183  }
2184  }
2185  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2186  // the first condition means this project node has at least one window function
2187  // and the second condition indicates that this project node falls into
2188  // one of the following cases:
2189  // 1) window function expression on a multi-fragmented table
2190  // 2) window function expression is too complex to evaluate without codegen:
2191  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2192  // evaluate such a complex window function expression
2193  // 3) nested window function expression
2194  // but currently we do not support update on a multi-fragmented table having
2195  // window function, so the second condition only refers to non-fragmented table with
2196  // cases 2) or 3)
2197  // if at least one of two conditions satisfy, we must compute corresponding window
2198  // context before entering `execute_update_for_node` to properly update the table
2199  if (!leaf_results_.empty()) {
2200  throw std::runtime_error(
2201  "Update query having window function is not yet supported in distributed "
2202  "mode.");
2203  }
2204  ColumnCacheMap column_cache;
2205  co.device_type = ExecutorDeviceType::CPU;
2206  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2207  }
2208  execute_update_for_node(project, work_unit, false);
2209  } else {
2210  throw std::runtime_error("Unsupported parent node for update: " +
2211  node->toString(RelRexToStringConfig::defaults()));
2212  }
2213 }
2214 
2216  const CompilationOptions& co,
2217  const ExecutionOptions& eo_in,
2218  const int64_t queue_time_ms) {
2219  CHECK(node);
2220  auto timer = DEBUG_TIMER(__func__);
2221 
2222  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2223  auto& work_unit,
2224  const bool is_aggregate) {
2225  auto* table_descriptor = node->getModifiedTableDescriptor();
2226  CHECK(table_descriptor);
2227  if (!table_descriptor->hasDeletedCol) {
2228  throw std::runtime_error(
2229  "DELETE queries are only supported on tables with the vacuum attribute set to "
2230  "'delayed'");
2231  }
2232 
2233  const auto catalog = node->getModifiedTableCatalog();
2234  CHECK(catalog);
2235  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2236 
2237  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2238 
2239  auto execute_delete_ra_exe_unit =
2240  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2241  const auto& exe_unit, const bool is_aggregate) {
2243  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2244  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2246  CHECK(delete_params);
2247  auto delete_callback = yieldDeleteCallback(*delete_params);
2249 
2250  auto eo = eo_in;
2251  if (dml_transaction_parameters_->tableIsTemporary()) {
2252  eo.output_columnar_hint = true;
2253  co_delete.filter_on_deleted_column =
2254  false; // project the entire delete column for columnar update
2255  } else {
2256  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2257  }
2258 
2259  try {
2260  auto table_update_metadata =
2261  executor_->executeUpdate(exe_unit,
2262  table_infos,
2263  table_descriptor,
2264  co_delete,
2265  eo,
2266  *catalog,
2267  executor_->row_set_mem_owner_,
2268  delete_callback,
2269  is_aggregate);
2270  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2271  dml_transaction_parameters_->finalizeTransaction(*catalog);
2272  TableOptimizer table_optimizer{
2273  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2274  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2275  };
2276  } catch (const QueryExecutionError& e) {
2277  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2278  }
2279  };
2280 
2281  if (table_is_temporary(table_descriptor)) {
2282  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2283  const auto cd = catalog->getDeletedColumn(table_descriptor);
2284  CHECK(cd);
2285  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2286  cd->columnType,
2288  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2289  0);
2290  const auto rewritten_exe_unit =
2291  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2292  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2293  } else {
2294  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2295  }
2296  };
2297 
2298  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2299  const auto work_unit =
2300  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2301  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2302  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2303  auto work_unit =
2304  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2305  if (project->isSimple()) {
2306  CHECK_EQ(size_t(1), project->inputCount());
2307  const auto input_ra = project->getInput(0);
2308  if (dynamic_cast<const RelSort*>(input_ra)) {
2309  const auto& input_table =
2310  get_temporary_table(&temporary_tables_, -input_ra->getId());
2311  CHECK(input_table);
2312  work_unit.exe_unit.scan_limit = input_table->rowCount();
2313  }
2314  }
2315  execute_delete_for_node(project, work_unit, false);
2316  } else {
2317  throw std::runtime_error("Unsupported parent node for delete: " +
2318  node->toString(RelRexToStringConfig::defaults()));
2319  }
2320 }
2321 
2323  const CompilationOptions& co,
2324  const ExecutionOptions& eo,
2325  RenderInfo* render_info,
2326  const int64_t queue_time_ms) {
2327  auto timer = DEBUG_TIMER(__func__);
2328  const auto work_unit =
2329  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2330  CompilationOptions co_compound = co;
2331  return executeWorkUnit(work_unit,
2332  compound->getOutputMetainfo(),
2333  compound->isAggregate(),
2334  co_compound,
2335  eo,
2336  render_info,
2337  queue_time_ms);
2338 }
2339 
2341  const CompilationOptions& co,
2342  const ExecutionOptions& eo,
2343  RenderInfo* render_info,
2344  const int64_t queue_time_ms) {
2345  auto timer = DEBUG_TIMER(__func__);
2346  const auto work_unit = createAggregateWorkUnit(
2347  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2348  return executeWorkUnit(work_unit,
2349  aggregate->getOutputMetainfo(),
2350  true,
2351  co,
2352  eo,
2353  render_info,
2354  queue_time_ms);
2355 }
2356 
2357 namespace {
2358 
2359 // Returns true iff the execution unit contains window functions.
2361  return std::any_of(ra_exe_unit.target_exprs.begin(),
2362  ra_exe_unit.target_exprs.end(),
2363  [](const Analyzer::Expr* expr) {
2364  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2365  });
2366 }
2367 
2368 } // namespace
2369 
2371  const RelProject* project,
2372  const CompilationOptions& co,
2373  const ExecutionOptions& eo,
2374  RenderInfo* render_info,
2375  const int64_t queue_time_ms,
2376  const std::optional<size_t> previous_count) {
2377  auto timer = DEBUG_TIMER(__func__);
2378  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2379  CompilationOptions co_project = co;
2380  if (project->isSimple()) {
2381  CHECK_EQ(size_t(1), project->inputCount());
2382  const auto input_ra = project->getInput(0);
2383  if (dynamic_cast<const RelSort*>(input_ra)) {
2384  co_project.device_type = ExecutorDeviceType::CPU;
2385  const auto& input_table =
2386  get_temporary_table(&temporary_tables_, -input_ra->getId());
2387  CHECK(input_table);
2388  work_unit.exe_unit.scan_limit =
2389  std::min(input_table->getLimit(), input_table->rowCount());
2390  }
2391  }
2392  return executeWorkUnit(work_unit,
2393  project->getOutputMetainfo(),
2394  false,
2395  co_project,
2396  eo,
2397  render_info,
2398  queue_time_ms,
2399  previous_count);
2400 }
2401 
2403  const CompilationOptions& co_in,
2404  const ExecutionOptions& eo,
2405  const int64_t queue_time_ms) {
2407  auto timer = DEBUG_TIMER(__func__);
2408 
2409  auto co = co_in;
2410 
2411  if (g_cluster) {
2412  throw std::runtime_error("Table functions not supported in distributed mode yet");
2413  }
2414  if (!g_enable_table_functions) {
2415  throw std::runtime_error("Table function support is disabled");
2416  }
2417  auto table_func_work_unit = createTableFunctionWorkUnit(
2418  table_func,
2419  eo.just_explain,
2420  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2421  const auto body = table_func_work_unit.body;
2422  CHECK(body);
2423 
2424  const auto table_infos =
2425  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2426 
2427  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2428  co.device_type,
2430  nullptr,
2431  executor_->blockSize(),
2432  executor_->gridSize()),
2433  {}};
2434 
2435  auto global_hint = getGlobalQueryHint();
2436  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2437  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2438  auto cached_resultset =
2439  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2440  table_func->getQueryPlanDagHash());
2441  if (cached_resultset) {
2442  VLOG(1) << "recycle table function's resultset of the root node "
2443  << table_func->getRelNodeDagId() << " from resultset cache";
2444  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2445  addTemporaryTable(-body->getId(), result.getDataPtr());
2446  return result;
2447  }
2448  }
2449 
2450  auto query_exec_time_begin = timer_start();
2451  try {
2452  result = {executor_->executeTableFunction(
2453  table_func_work_unit.exe_unit, table_infos, co, eo),
2454  body->getOutputMetainfo()};
2455  } catch (const QueryExecutionError& e) {
2458  throw std::runtime_error("Table function ran out of memory during execution");
2459  }
2460  auto query_exec_time = timer_stop(query_exec_time_begin);
2461  result.setQueueTime(queue_time_ms);
2462  auto resultset_ptr = result.getDataPtr();
2463  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2465  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2467  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2468  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2469  !hasStepForUnion()) {
2470  resultset_ptr->setExecTime(query_exec_time);
2471  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2472  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2473  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2474  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2475  if (allow_auto_caching_resultset) {
2476  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2477  }
2478  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2479  table_func_work_unit.exe_unit.query_plan_dag_hash,
2480  resultset_ptr->getInputTableKeys(),
2481  resultset_ptr,
2482  resultset_ptr->getBufferSizeBytes(co.device_type),
2484  } else {
2485  if (eo.keep_result) {
2486  if (g_cluster) {
2487  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2488  "support resultset recycling on distributed mode";
2489  } else if (hasStepForUnion()) {
2490  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2491  "has union-(all) operator";
2492  } else if (is_validate_or_explain_query(eo)) {
2493  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2494  "is either validate or explain query";
2495  } else {
2496  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2497  }
2498  }
2499  }
2500 
2501  return result;
2502 }
2503 
2504 namespace {
2505 
2506 // Creates a new expression which has the range table index set to 1. This is needed to
2507 // reuse the hash join construction helpers to generate a hash table for the window
2508 // function partition: create an equals expression with left and right sides identical
2509 // except for the range table index.
2510 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2511  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2512  if (tuple) {
2513  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2514  for (const auto& element : tuple->getTuple()) {
2515  transformed_tuple.push_back(transform_to_inner(element.get()));
2516  }
2517  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2518  }
2519  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2520  if (!col) {
2521  throw std::runtime_error("Only columns supported in the window partition for now");
2522  }
2523  return makeExpr<Analyzer::ColumnVar>(col->get_type_info(), col->getColumnKey(), 1);
2524 }
2525 
2526 } // namespace
2527 
2529  const CompilationOptions& co,
2530  const ExecutionOptions& eo,
2531  ColumnCacheMap& column_cache_map,
2532  const int64_t queue_time_ms) {
2533  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2534  CHECK_EQ(query_infos.size(), size_t(1));
2535  if (query_infos.front().info.fragments.size() != 1) {
2536  throw std::runtime_error(
2537  "Only single fragment tables supported for window functions for now");
2538  }
2539  if (eo.executor_type == ::ExecutorType::Extern) {
2540  return;
2541  }
2542  query_infos.push_back(query_infos.front());
2543  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2544  // a query may hold multiple window functions having the same partition by condition
2545  // then after building the first hash partition we can reuse it for the rest of
2546  // the window functions
2547  // here, a cached partition can be shared via multiple window function contexts as is
2548  // but sorted partition should be copied to reuse since we use it for (intermediate)
2549  // output buffer
2550  // todo (yoonmin) : support recycler for window function computation?
2551  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2552  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2553  sorted_partition_cache;
2554  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2555  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2556  window_function_context_map;
2557  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2558  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2559  ++target_index) {
2560  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2561  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2562  if (!window_func) {
2563  continue;
2564  }
2565  // Always use baseline layout hash tables for now, make the expression a tuple.
2566  const auto& partition_keys = window_func->getPartitionKeys();
2567  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2568  if (partition_keys.size() >= 1) {
2569  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2570  if (partition_keys.size() > 1) {
2571  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2572  } else {
2573  CHECK_EQ(partition_keys.size(), size_t(1));
2574  partition_key_tuple = partition_keys.front();
2575  }
2576  // Creates a tautology equality with the partition expression on both sides.
2577  partition_key_cond =
2578  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2579  kBW_EQ,
2580  kONE,
2581  partition_key_tuple,
2582  transform_to_inner(partition_key_tuple.get()));
2583  }
2584  auto context =
2585  createWindowFunctionContext(window_func,
2586  partition_key_cond /*nullptr if no partition key*/,
2587  partition_cache,
2588  sorted_partition_key_ref_count_map,
2589  work_unit,
2590  query_infos,
2591  co,
2592  column_cache_map,
2593  executor_->getRowSetMemoryOwner());
2594  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2595  }
2596 
2597  for (auto& kv : window_function_context_map) {
2598  kv.second->compute(
2599  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2600  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2601  }
2602 }
2603 
2604 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2605  const Analyzer::WindowFunction* window_func,
2606  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2607  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2608  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2609  const WorkUnit& work_unit,
2610  const std::vector<InputTableInfo>& query_infos,
2611  const CompilationOptions& co,
2612  ColumnCacheMap& column_cache_map,
2613  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2614  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2615  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2618  std::unique_ptr<WindowFunctionContext> context;
2619  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2620  if (partition_key_cond) {
2621  auto partition_cond_str = partition_key_cond->toString();
2622  auto partition_key_hash = boost::hash_value(partition_cond_str);
2623  boost::hash_combine(partition_cache_key, partition_key_hash);
2624  std::shared_ptr<HashJoin> partition_ptr;
2625  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2626  if (cached_hash_table_it != partition_cache.end()) {
2627  partition_ptr = cached_hash_table_it->second;
2628  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2629  << partition_cache_key << ", partition condition: " << partition_cond_str
2630  << ")";
2631  } else {
2632  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2635  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2636  partition_key_cond,
2637  query_infos,
2638  memory_level,
2639  window_partition_type,
2641  column_cache_map,
2643  work_unit.exe_unit.query_hint,
2644  work_unit.exe_unit.table_id_to_node_map);
2645  if (!hash_table_or_err.fail_reason.empty()) {
2646  throw std::runtime_error(hash_table_or_err.fail_reason);
2647  }
2648  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2649  partition_ptr = hash_table_or_err.hash_table;
2650  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2651  .second);
2652  VLOG(1) << "Put a generated hash table for computing window function context to "
2653  "cache (key: "
2654  << partition_cache_key << ", partition condition: " << partition_cond_str
2655  << ")";
2656  }
2657  CHECK(partition_ptr);
2658  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2659  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2660  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2661  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2662  }
2663  context = std::make_unique<WindowFunctionContext>(window_func,
2664  partition_cache_key,
2665  partition_ptr,
2666  elem_count,
2667  co.device_type,
2668  row_set_mem_owner,
2669  aggregate_tree_fanout);
2670  } else {
2671  context = std::make_unique<WindowFunctionContext>(
2672  window_func, elem_count, co.device_type, row_set_mem_owner);
2673  }
2674  const auto& order_keys = window_func->getOrderKeys();
2675  if (!order_keys.empty()) {
2676  auto sorted_partition_cache_key = partition_cache_key;
2677  for (auto& order_key : order_keys) {
2678  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2679  }
2680  for (auto& collation : window_func->getCollation()) {
2681  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2682  }
2683  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2684  auto cache_key_cnt_it =
2685  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2686  if (!cache_key_cnt_it.second) {
2687  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2688  cache_key_cnt_it.first->second + 1;
2689  }
2690 
2691  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2692  for (const auto& order_key : order_keys) {
2693  const auto order_col =
2694  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2695  if (!order_col) {
2696  throw std::runtime_error("Only order by columns supported for now");
2697  }
2698  auto const [column, col_elem_count] =
2700  *order_col,
2701  query_infos.front().info.fragments.front(),
2702  memory_level,
2703  0,
2704  nullptr,
2705  /*thread_idx=*/0,
2706  chunks_owner,
2707  column_cache_map);
2708 
2709  CHECK_EQ(col_elem_count, elem_count);
2710  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2711  }
2712  }
2713  if (context->getWindowFunction()->hasFraming()) {
2714  // todo (yoonmin) : if we try to support generic window function expression without
2715  // extra project node, we need to revisit here b/c the current logic assumes that
2716  // window function expression has a single input source
2717  auto& window_function_expression_args = window_func->getArgs();
2718  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2719  for (auto& expr : window_function_expression_args) {
2720  if (const auto arg_col_var =
2721  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2722  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2723  executor_,
2724  *arg_col_var,
2725  query_infos.front().info.fragments.front(),
2726  memory_level,
2727  0,
2728  nullptr,
2729  /*thread_idx=*/0,
2730  chunks_owner,
2731  column_cache_map);
2732  CHECK_EQ(col_elem_count, elem_count);
2733  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2734  }
2735  }
2736  }
2737  return context;
2738 }
2739 
2741  const CompilationOptions& co,
2742  const ExecutionOptions& eo,
2743  RenderInfo* render_info,
2744  const int64_t queue_time_ms) {
2745  auto timer = DEBUG_TIMER(__func__);
2746  const auto work_unit =
2747  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2748  return executeWorkUnit(
2749  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2750 }
2751 
2752 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2753  std::vector<TargetMetaInfo> const& rhs) {
2754  if (lhs.size() == rhs.size()) {
2755  for (size_t i = 0; i < lhs.size(); ++i) {
2756  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2757  return false;
2758  }
2759  }
2760  return true;
2761  }
2762  return false;
2763 }
2764 
2765 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2766  return target_meta_info.get_type_info().is_geometry();
2767 }
2768 
2770  const RaExecutionSequence& seq,
2771  const CompilationOptions& co,
2772  const ExecutionOptions& eo,
2773  RenderInfo* render_info,
2774  const int64_t queue_time_ms) {
2775  auto timer = DEBUG_TIMER(__func__);
2776  if (!logical_union->isAll()) {
2777  throw std::runtime_error("UNION without ALL is not supported yet.");
2778  }
2779  // Will throw a std::runtime_error if types don't match.
2780  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2781  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2782  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2783  }
2784  auto work_unit =
2785  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2786  return executeWorkUnit(work_unit,
2787  logical_union->getOutputMetainfo(),
2788  false,
2790  eo,
2791  render_info,
2792  queue_time_ms);
2793 }
2794 
2796  const RelLogicalValues* logical_values,
2797  const ExecutionOptions& eo) {
2798  auto timer = DEBUG_TIMER(__func__);
2800  logical_values->getNumRows(),
2802  /*is_table_function=*/false);
2803 
2804  auto tuple_type = logical_values->getTupleType();
2805  for (size_t i = 0; i < tuple_type.size(); ++i) {
2806  auto& target_meta_info = tuple_type[i];
2807  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2808  // replace w/ bigint
2809  tuple_type[i] =
2810  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2811  }
2812  query_mem_desc.addColSlotInfo(
2813  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2814  }
2815  logical_values->setOutputMetainfo(tuple_type);
2816 
2817  std::vector<TargetInfo> target_infos;
2818  for (const auto& tuple_type_component : tuple_type) {
2819  target_infos.emplace_back(TargetInfo{false,
2820  kCOUNT,
2821  tuple_type_component.get_type_info(),
2822  SQLTypeInfo(kNULLT, false),
2823  false,
2824  false,
2825  /*is_varlen_projection=*/false});
2826  }
2827 
2828  std::shared_ptr<ResultSet> rs{
2829  ResultSetLogicalValuesBuilder{logical_values,
2830  target_infos,
2833  executor_->getRowSetMemoryOwner(),
2834  executor_}
2835  .build()};
2836 
2837  return {rs, tuple_type};
2838 }
2839 
2840 namespace {
2841 
2842 template <class T>
2843 int64_t insert_one_dict_str(T* col_data,
2844  const std::string& columnName,
2845  const SQLTypeInfo& columnType,
2846  const Analyzer::Constant* col_cv,
2847  const Catalog_Namespace::Catalog& catalog) {
2848  if (col_cv->get_is_null()) {
2849  *col_data = inline_fixed_encoding_null_val(columnType);
2850  } else {
2851  const int dict_id = columnType.get_comp_param();
2852  const auto col_datum = col_cv->get_constval();
2853  const auto& str = *col_datum.stringval;
2854  const auto dd = catalog.getMetadataForDict(dict_id);
2855  CHECK(dd && dd->stringDict);
2856  int32_t str_id = dd->stringDict->getOrAdd(str);
2857  if (!dd->dictIsTemp) {
2858  const auto checkpoint_ok = dd->stringDict->checkpoint();
2859  if (!checkpoint_ok) {
2860  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2861  columnName);
2862  }
2863  }
2864  const bool invalid = str_id > max_valid_int_value<T>();
2865  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2866  if (invalid) {
2867  LOG(ERROR) << "Could not encode string: " << str
2868  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2869  << " bits. Will store NULL instead.";
2870  }
2871  str_id = inline_fixed_encoding_null_val(columnType);
2872  }
2873  *col_data = str_id;
2874  }
2875  return *col_data;
2876 }
2877 
2878 template <class T>
2879 int64_t insert_one_dict_str(T* col_data,
2880  const ColumnDescriptor* cd,
2881  const Analyzer::Constant* col_cv,
2882  const Catalog_Namespace::Catalog& catalog) {
2883  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2884 }
2885 
2886 } // namespace
2887 
2889  const ExecutionOptions& eo) {
2890  auto timer = DEBUG_TIMER(__func__);
2891  if (eo.just_explain) {
2892  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2893  }
2894 
2895  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2898  executor_->getRowSetMemoryOwner(),
2899  executor_->blockSize(),
2900  executor_->gridSize());
2901 
2902  std::vector<TargetMetaInfo> empty_targets;
2903  return {rs, empty_targets};
2904 }
2905 
2907  const Analyzer::Query& query,
2909  const Catalog_Namespace::SessionInfo& session) {
2910  // Note: We currently obtain an executor for this method, but we do not need it.
2911  // Therefore, we skip the executor state setup in the regular execution path. In the
2912  // future, we will likely want to use the executor to evaluate expressions in the insert
2913  // statement.
2914 
2915  const auto& values_lists = query.get_values_lists();
2916  const int table_id = query.get_result_table_id();
2917  const auto& col_id_list = query.get_result_col_list();
2918  size_t rows_number = values_lists.size();
2919  size_t leaf_count = inserter.getLeafCount();
2920  const auto& catalog = session.getCatalog();
2921  const auto td = catalog.getMetadataForTable(table_id);
2922  CHECK(td);
2923  size_t rows_per_leaf = rows_number;
2924  if (td->nShards == 0) {
2925  rows_per_leaf =
2926  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2927  }
2928  auto max_number_of_rows_per_package =
2929  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2930 
2931  std::vector<const ColumnDescriptor*> col_descriptors;
2932  std::vector<int> col_ids;
2933  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2934  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2935  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2936  std::unordered_map<int, int> sequential_ids;
2937 
2938  for (const int col_id : col_id_list) {
2939  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2940  const auto col_enc = cd->columnType.get_compression();
2941  if (cd->columnType.is_string()) {
2942  switch (col_enc) {
2943  case kENCODING_NONE: {
2944  auto it_ok =
2945  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2946  CHECK(it_ok.second);
2947  break;
2948  }
2949  case kENCODING_DICT: {
2950  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2951  CHECK(dd);
2952  const auto it_ok = col_buffers.emplace(
2953  col_id,
2954  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2955  max_number_of_rows_per_package));
2956  CHECK(it_ok.second);
2957  break;
2958  }
2959  default:
2960  CHECK(false);
2961  }
2962  } else if (cd->columnType.is_geometry()) {
2963  auto it_ok =
2964  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2965  CHECK(it_ok.second);
2966  } else if (cd->columnType.is_array()) {
2967  auto it_ok =
2968  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2969  CHECK(it_ok.second);
2970  } else {
2971  const auto it_ok = col_buffers.emplace(
2972  col_id,
2973  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2974  max_number_of_rows_per_package]()));
2975  CHECK(it_ok.second);
2976  }
2977  col_descriptors.push_back(cd);
2978  sequential_ids[col_id] = col_ids.size();
2979  col_ids.push_back(col_id);
2980  }
2981 
2982  // mark the target table's cached item as dirty
2983  std::vector<int> table_chunk_key_prefix{catalog.getCurrentDB().dbId, table_id};
2984  auto table_key = boost::hash_value(table_chunk_key_prefix);
2987 
2988  size_t start_row = 0;
2989  size_t rows_left = rows_number;
2990  while (rows_left != 0) {
2991  // clear the buffers
2992  for (const auto& kv : col_buffers) {
2993  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2994  }
2995  for (auto& kv : str_col_buffers) {
2996  kv.second.clear();
2997  }
2998  for (auto& kv : arr_col_buffers) {
2999  kv.second.clear();
3000  }
3001 
3002  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
3003  // Note: if there will be use cases with batch inserts with lots of rows, it might be
3004  // more efficient to do the loops below column by column instead of row by row.
3005  // But for now I consider such a refactoring not worth investigating, as we have more
3006  // efficient ways to insert many rows anyway.
3007  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
3008  const auto& values_list = values_lists[row_idx + start_row];
3009  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
3010  CHECK(values_list.size() == col_descriptors.size());
3011  auto col_cv =
3012  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
3013  if (!col_cv) {
3014  auto col_cast =
3015  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
3016  CHECK(col_cast);
3017  CHECK_EQ(kCAST, col_cast->get_optype());
3018  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
3019  }
3020  CHECK(col_cv);
3021  const auto cd = col_descriptors[col_idx];
3022  auto col_datum = col_cv->get_constval();
3023  auto col_type = cd->columnType.get_type();
3024  uint8_t* col_data_bytes{nullptr};
3025  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
3026  (!cd->columnType.is_string() ||
3027  cd->columnType.get_compression() == kENCODING_DICT)) {
3028  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
3029  CHECK(col_data_bytes_it != col_buffers.end());
3030  col_data_bytes = col_data_bytes_it->second.get();
3031  }
3032  switch (col_type) {
3033  case kBOOLEAN: {
3034  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3035  auto null_bool_val =
3036  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
3037  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
3038  ? inline_fixed_encoding_null_val(cd->columnType)
3039  : (col_datum.boolval ? 1 : 0);
3040  break;
3041  }
3042  case kTINYINT: {
3043  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3044  col_data[row_idx] = col_cv->get_is_null()
3045  ? inline_fixed_encoding_null_val(cd->columnType)
3046  : col_datum.tinyintval;
3047  break;
3048  }
3049  case kSMALLINT: {
3050  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3051  col_data[row_idx] = col_cv->get_is_null()
3052  ? inline_fixed_encoding_null_val(cd->columnType)
3053  : col_datum.smallintval;
3054  break;
3055  }
3056  case kINT: {
3057  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3058  col_data[row_idx] = col_cv->get_is_null()
3059  ? inline_fixed_encoding_null_val(cd->columnType)
3060  : col_datum.intval;
3061  break;
3062  }
3063  case kBIGINT:
3064  case kDECIMAL:
3065  case kNUMERIC: {
3066  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3067  col_data[row_idx] = col_cv->get_is_null()
3068  ? inline_fixed_encoding_null_val(cd->columnType)
3069  : col_datum.bigintval;
3070  break;
3071  }
3072  case kFLOAT: {
3073  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3074  col_data[row_idx] = col_datum.floatval;
3075  break;
3076  }
3077  case kDOUBLE: {
3078  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3079  col_data[row_idx] = col_datum.doubleval;
3080  break;
3081  }
3082  case kTEXT:
3083  case kVARCHAR:
3084  case kCHAR: {
3085  switch (cd->columnType.get_compression()) {
3086  case kENCODING_NONE:
3087  str_col_buffers[col_ids[col_idx]].push_back(
3088  col_datum.stringval ? *col_datum.stringval : "");
3089  break;
3090  case kENCODING_DICT: {
3091  switch (cd->columnType.get_size()) {
3092  case 1:
3094  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3095  cd,
3096  col_cv,
3097  catalog);
3098  break;
3099  case 2:
3101  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3102  cd,
3103  col_cv,
3104  catalog);
3105  break;
3106  case 4:
3108  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3109  cd,
3110  col_cv,
3111  catalog);
3112  break;
3113  default:
3114  CHECK(false);
3115  }
3116  break;
3117  }
3118  default:
3119  CHECK(false);
3120  }
3121  break;
3122  }
3123  case kTIME:
3124  case kTIMESTAMP:
3125  case kDATE: {
3126  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3127  col_data[row_idx] = col_cv->get_is_null()
3128  ? inline_fixed_encoding_null_val(cd->columnType)
3129  : col_datum.bigintval;
3130  break;
3131  }
3132  case kARRAY: {
3133  const auto is_null = col_cv->get_is_null();
3134  const auto size = cd->columnType.get_size();
3135  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3136  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3137  const auto is_point_coords =
3138  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3139  if (is_null && !is_point_coords) {
3140  if (size > 0) {
3141  int8_t* buf = (int8_t*)checked_malloc(size);
3142  put_null_array(static_cast<void*>(buf), elem_ti, "");
3143  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3144  p += elem_ti.get_size()) {
3145  put_null(static_cast<void*>(p), elem_ti, "");
3146  }
3147  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3148  } else {
3149  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3150  }
3151  break;
3152  }
3153  const auto l = col_cv->get_value_list();
3154  size_t len = l.size() * elem_ti.get_size();
3155  if (size > 0 && static_cast<size_t>(size) != len) {
3156  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3157  std::to_string(size / elem_ti.get_size()) +
3158  " values, " + "received " +
3159  std::to_string(l.size()));
3160  }
3161  if (elem_ti.is_string()) {
3162  CHECK(kENCODING_DICT == elem_ti.get_compression());
3163  CHECK(4 == elem_ti.get_size());
3164 
3165  int8_t* buf = (int8_t*)checked_malloc(len);
3166  int32_t* p = reinterpret_cast<int32_t*>(buf);
3167 
3168  int elemIndex = 0;
3169  for (auto& e : l) {
3170  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3171  CHECK(c);
3173  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3174  elemIndex++;
3175  }
3176  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3177 
3178  } else {
3179  int8_t* buf = (int8_t*)checked_malloc(len);
3180  int8_t* p = buf;
3181  for (auto& e : l) {
3182  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3183  CHECK(c);
3184  p = append_datum(p, c->get_constval(), elem_ti);
3185  CHECK(p);
3186  }
3187  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3188  }
3189  break;
3190  }
3191  case kPOINT:
3192  case kMULTIPOINT:
3193  case kLINESTRING:
3194  case kMULTILINESTRING:
3195  case kPOLYGON:
3196  case kMULTIPOLYGON:
3197  str_col_buffers[col_ids[col_idx]].push_back(
3198  col_datum.stringval ? *col_datum.stringval : "");
3199  break;
3200  default:
3201  CHECK(false);
3202  }
3203  }
3204  }
3205  start_row += package_size;
3206  rows_left -= package_size;
3207 
3209  insert_data.databaseId = catalog.getCurrentDB().dbId;
3210  insert_data.tableId = table_id;
3211  insert_data.data.resize(col_ids.size());
3212  insert_data.columnIds = col_ids;
3213  for (const auto& kv : col_buffers) {
3214  DataBlockPtr p;
3215  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3216  insert_data.data[sequential_ids[kv.first]] = p;
3217  }
3218  for (auto& kv : str_col_buffers) {
3219  DataBlockPtr p;
3220  p.stringsPtr = &kv.second;
3221  insert_data.data[sequential_ids[kv.first]] = p;
3222  }
3223  for (auto& kv : arr_col_buffers) {
3224  DataBlockPtr p;
3225  p.arraysPtr = &kv.second;
3226  insert_data.data[sequential_ids[kv.first]] = p;
3227  }
3228  insert_data.numRows = package_size;
3229  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3230  inserter.insertData(session, insert_data);
3231  }
3232 
3233  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3236  executor_->getRowSetMemoryOwner(),
3237  0,
3238  0);
3239  std::vector<TargetMetaInfo> empty_targets;
3240  return {rs, empty_targets};
3241 }
3242 
3243 namespace {
3244 
3245 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
3246  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3247  if (aggregate) {
3248  return 0;
3249  }
3250  const auto compound = dynamic_cast<const RelCompound*>(ra);
3251  return (compound && compound->isAggregate()) ? 0 : limit;
3252 }
3253 
3254 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3255  return !order_entries.empty() && order_entries.front().is_desc;
3256 }
3257 
3258 } // namespace
3259 
3261  const CompilationOptions& co,
3262  const ExecutionOptions& eo,
3263  RenderInfo* render_info,
3264  const int64_t queue_time_ms) {
3265  auto timer = DEBUG_TIMER(__func__);
3267  const auto source = sort->getInput(0);
3268  const bool is_aggregate = node_is_aggregate(source);
3269  auto it = leaf_results_.find(sort->getId());
3270  auto order_entries = get_order_entries(sort);
3271  if (it != leaf_results_.end()) {
3272  // Add any transient string literals to the sdp on the agg
3273  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3274  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3275  executor_->row_set_mem_owner_);
3276  // Handle push-down for LIMIT for multi-node
3277  auto& aggregated_result = it->second;
3278  auto& result_rows = aggregated_result.rs;
3279  const size_t limit = sort->getLimit();
3280  const size_t offset = sort->getOffset();
3281  if (limit || offset) {
3282  if (!order_entries.empty()) {
3283  result_rows->sort(order_entries, limit + offset, executor_);
3284  }
3285  result_rows->dropFirstN(offset);
3286  if (limit) {
3287  result_rows->keepFirstN(limit);
3288  }
3289  }
3290 
3291  if (render_info) {
3292  // We've hit a sort step that is the very last step
3293  // in a distributed render query. We'll fill in the render targets
3294  // since we have all that data needed to do so. This is normally
3295  // done in executeWorkUnit, but that is bypassed in this case.
3296  build_render_targets(*render_info,
3297  source_work_unit.exe_unit.target_exprs,
3298  aggregated_result.targets_meta);
3299  }
3300 
3301  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3302  sort->setOutputMetainfo(aggregated_result.targets_meta);
3303 
3304  return result;
3305  }
3306 
3307  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3308  bool is_desc{false};
3309  bool use_speculative_top_n_sort{false};
3310 
3311  auto execute_sort_query = [this,
3312  sort,
3313  &source,
3314  &is_aggregate,
3315  &eo,
3316  &co,
3317  render_info,
3318  queue_time_ms,
3319  &groupby_exprs,
3320  &is_desc,
3321  &order_entries,
3322  &use_speculative_top_n_sort]() -> ExecutionResult {
3323  const size_t limit = sort->getLimit();
3324  const size_t offset = sort->getOffset();
3325  // check whether sort's input is cached
3326  auto source_node = sort->getInput(0);
3327  CHECK(source_node);
3328  ExecutionResult source_result{nullptr, {}};
3329  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3330  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3331  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3332  !sort->isEmptyResult()) {
3333  if (auto cached_resultset =
3334  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3335  source_query_plan_dag)) {
3336  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3337  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3338  << " from resultset cache";
3339  source_result =
3340  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3341  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3342  addTemporaryTable(-source_node->getId(), cached_resultset);
3343  }
3344  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3345  co.device_type == ExecutorDeviceType::GPU;
3346  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3347  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3348  }
3349  }
3350  if (!source_result.getDataPtr()) {
3351  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3352  is_desc = first_oe_is_desc(order_entries);
3353  ExecutionOptions eo_copy = {
3355  eo.keep_result,
3356  eo.allow_multifrag,
3357  eo.just_explain,
3358  eo.allow_loop_joins,
3359  eo.with_watchdog,
3360  eo.jit_debug,
3361  eo.just_validate || sort->isEmptyResult(),
3362  eo.with_dynamic_watchdog,
3363  eo.dynamic_watchdog_time_limit,
3364  eo.find_push_down_candidates,
3365  eo.just_calcite_explain,
3366  eo.gpu_input_mem_limit_percent,
3367  eo.allow_runtime_query_interrupt,
3368  eo.running_query_interrupt_freq,
3369  eo.pending_query_interrupt_freq,
3370  eo.optimize_cuda_block_and_grid_sizes,
3371  eo.max_join_hash_table_size,
3372  eo.executor_type,
3373  };
3374 
3375  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3376  source_result = executeWorkUnit(source_work_unit,
3377  source->getOutputMetainfo(),
3378  is_aggregate,
3379  co,
3380  eo_copy,
3381  render_info,
3382  queue_time_ms);
3383  use_speculative_top_n_sort =
3384  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3385  use_speculative_top_n(source_work_unit.exe_unit,
3386  source_result.getRows()->getQueryMemDesc());
3387  }
3388  if (render_info && render_info->isInSitu()) {
3389  return source_result;
3390  }
3391  if (source_result.isFilterPushDownEnabled()) {
3392  return source_result;
3393  }
3394  auto rows_to_sort = source_result.getRows();
3395  if (eo.just_explain) {
3396  return {rows_to_sort, {}};
3397  }
3398  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3399  !use_speculative_top_n_sort) {
3400  const size_t top_n = limit == 0 ? 0 : limit + offset;
3401  rows_to_sort->sort(order_entries, top_n, executor_);
3402  }
3403  if (limit || offset) {
3404  if (g_cluster && sort->collationCount() == 0) {
3405  if (offset >= rows_to_sort->rowCount()) {
3406  rows_to_sort->dropFirstN(offset);
3407  } else {
3408  rows_to_sort->keepFirstN(limit + offset);
3409  }
3410  } else {
3411  rows_to_sort->dropFirstN(offset);
3412  if (limit) {
3413  rows_to_sort->keepFirstN(limit);
3414  }
3415  }
3416  }
3417  return {rows_to_sort, source_result.getTargetsMeta()};
3418  };
3419 
3420  try {
3421  return execute_sort_query();
3422  } catch (const SpeculativeTopNFailed& e) {
3423  CHECK_EQ(size_t(1), groupby_exprs.size());
3424  CHECK(groupby_exprs.front());
3425  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3426  return execute_sort_query();
3427  }
3428 }
3429 
3431  const RelSort* sort,
3432  std::list<Analyzer::OrderEntry>& order_entries,
3433  const ExecutionOptions& eo) {
3434  const auto source = sort->getInput(0);
3435  const size_t limit = sort->getLimit();
3436  const size_t offset = sort->getOffset();
3437  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3438  const size_t scan_total_limit =
3439  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3440  size_t max_groups_buffer_entry_guess{
3441  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3443  SortInfo sort_info{
3444  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3445  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3446  const auto& source_exe_unit = source_work_unit.exe_unit;
3447 
3448  // we do not allow sorting geometry or array types
3449  for (auto order_entry : order_entries) {
3450  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3451  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3452  const auto& ti = get_target_info(te, false);
3453  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3454  throw std::runtime_error(
3455  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3456  }
3457  }
3458 
3459  if (source_exe_unit.groupby_exprs.size() == 1) {
3460  if (!source_exe_unit.groupby_exprs.front()) {
3461  sort_algorithm = SortAlgorithm::StreamingTopN;
3462  } else {
3463  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3464  first_oe_is_desc(order_entries))) {
3465  sort_algorithm = SortAlgorithm::Default;
3466  }
3467  }
3468  }
3469 
3470  sort->setOutputMetainfo(source->getOutputMetainfo());
3471  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3472  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3473  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3474  std::move(source_exe_unit.input_col_descs),
3475  source_exe_unit.simple_quals,
3476  source_exe_unit.quals,
3477  source_exe_unit.join_quals,
3478  source_exe_unit.groupby_exprs,
3479  source_exe_unit.target_exprs,
3480  source_exe_unit.target_exprs_original_type_infos,
3481  nullptr,
3482  {sort_info.order_entries,
3483  sort_algorithm,
3484  limit,
3485  offset,
3486  sort_info.limit_delivered},
3487  scan_total_limit,
3488  source_exe_unit.query_hint,
3489  source_exe_unit.query_plan_dag_hash,
3490  source_exe_unit.hash_table_build_plan_dag,
3491  source_exe_unit.table_id_to_node_map,
3492  source_exe_unit.use_bump_allocator,
3493  source_exe_unit.union_all,
3494  source_exe_unit.query_state},
3495  source,
3496  max_groups_buffer_entry_guess,
3497  std::move(source_work_unit.query_rewriter),
3498  source_work_unit.input_permutation,
3499  source_work_unit.left_deep_join_input_sizes};
3500 }
3501 
3502 namespace {
3503 
3510 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
3511  CHECK(!table_infos.empty());
3512  const auto& first_table = table_infos.front();
3513  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3514  for (const auto& table_info : table_infos) {
3515  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3516  max_num_groups = table_info.info.getNumTuplesUpperBound();
3517  }
3518  }
3519  return std::max(max_num_groups, size_t(1));
3520 }
3521 
3522 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3523  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3524 }
3525 
3527  const RenderInfo* render_info,
3528  const RelAlgNode* body) {
3529  if (!is_projection(ra_exe_unit)) {
3530  return false;
3531  }
3532  if (render_info && render_info->isInSitu()) {
3533  return false;
3534  }
3535  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3536  // disable output columnar when we have top-sort node query
3537  return false;
3538  }
3539  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3540  // We don't currently support varlen columnar projections, so
3541  // return false if we find one
3542  if (target_expr->get_type_info().is_varlen()) {
3543  return false;
3544  }
3545  }
3546  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3547  if (top_project->isRowwiseOutputForced()) {
3548  return false;
3549  }
3550  }
3551  return true;
3552 }
3553 
3557 }
3558 
3566  for (const auto target_expr : ra_exe_unit.target_exprs) {
3567  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3568  return false;
3569  }
3570  }
3571  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
3572  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
3573  return true;
3574  }
3575  return false;
3576 }
3577 
3578 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3579  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3580  ra_exe_unit.simple_quals.empty());
3581 }
3582 
3584  const RelAlgExecutionUnit& ra_exe_unit_in,
3585  const std::vector<InputTableInfo>& table_infos,
3586  const Executor* executor,
3587  const ExecutorDeviceType device_type_in,
3588  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3589  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3590  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3591  const auto target_expr = ra_exe_unit.target_exprs[i];
3592  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3593  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3594  continue;
3595  }
3596  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3597  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3598  CHECK(arg);
3599  const auto& arg_ti = arg->get_type_info();
3600  // Avoid calling getExpressionRange for variable length types (string and array),
3601  // it'd trigger an assertion since that API expects to be called only for types
3602  // for which the notion of range is well-defined. A bit of a kludge, but the
3603  // logic to reject these types anyway is at lower levels in the stack and not
3604  // really worth pulling into a separate function for now.
3605  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3606  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3607  continue;
3608  }
3609  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3610  if (arg_range.getType() != ExpressionRangeType::Integer) {
3611  continue;
3612  }
3613  // When running distributed, the threshold for using the precise implementation
3614  // must be consistent across all leaves, otherwise we could have a mix of precise
3615  // and approximate bitmaps and we cannot aggregate them.
3616  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3617  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3618  const auto sub_bitmap_count =
3619  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3620  int64_t approx_bitmap_sz_bits{0};
3621  const auto error_rate_expr = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3622  if (error_rate_expr) {
3623  CHECK(error_rate_expr->get_type_info().get_type() == kINT);
3624  auto const error_rate =
3625  dynamic_cast<Analyzer::Constant const*>(error_rate_expr.get());
3626  CHECK(error_rate);
3627  CHECK_GE(error_rate->get_constval().intval, 1);
3628  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3629  } else {
3630  approx_bitmap_sz_bits = g_hll_precision_bits;
3631  }
3632  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3633  arg_range.getIntMin(),
3634  approx_bitmap_sz_bits,
3635  true,
3636  device_type,
3637  sub_bitmap_count};
3638  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3639  arg_range.getIntMin(),
3640  bitmap_sz_bits,
3641  false,
3642  device_type,
3643  sub_bitmap_count};
3644  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3645  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3646  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3647  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3648  target_exprs_owned.push_back(precise_count_distinct);
3649  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3650  }
3651  }
3652  return ra_exe_unit;
3653 }
3654 
3655 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3656  const CompilationOptions& co,
3657  const ExecutionOptions& eo) {
3659  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3660 }
3661 
3662 } // namespace
3663 
3665  const RelAlgExecutor::WorkUnit& work_unit,
3666  const std::vector<TargetMetaInfo>& targets_meta,
3667  const bool is_agg,
3668  const CompilationOptions& co_in,
3669  const ExecutionOptions& eo_in,
3670  RenderInfo* render_info,
3671  const int64_t queue_time_ms,
3672  const std::optional<size_t> previous_count) {
3674  auto timer = DEBUG_TIMER(__func__);
3675  auto query_exec_time_begin = timer_start();
3676 
3677  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3678  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3679 
3680  auto co = co_in;
3681  auto eo = eo_in;
3682  ColumnCacheMap column_cache;
3683  ScopeGuard clearWindowContextIfNecessary = [&]() {
3684  if (is_window_execution_unit(work_unit.exe_unit)) {
3686  }
3687  };
3688  if (is_window_execution_unit(work_unit.exe_unit)) {
3690  throw std::runtime_error("Window functions support is disabled");
3691  }
3692  co.device_type = ExecutorDeviceType::CPU;
3693  co.allow_lazy_fetch = false;
3694  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3695  }
3696  if (!eo.just_explain && eo.find_push_down_candidates) {
3697  // find potential candidates:
3698  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3699  if (!selected_filters.empty() || eo.just_calcite_explain) {
3700  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3701  }
3702  }
3703  if (render_info && render_info->isInSitu()) {
3704  co.allow_lazy_fetch = false;
3705  }
3706  const auto body = work_unit.body;
3707  CHECK(body);
3708  auto it = leaf_results_.find(body->getId());
3709  VLOG(3) << "body->getId()=" << body->getId()
3710  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3711  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3712  if (it != leaf_results_.end()) {
3713  executor_->addTransientStringLiterals(work_unit.exe_unit,
3714  executor_->row_set_mem_owner_);
3715  auto& aggregated_result = it->second;
3716  auto& result_rows = aggregated_result.rs;
3717  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3718  body->setOutputMetainfo(aggregated_result.targets_meta);
3719  if (render_info) {
3720  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3721  }
3722  return result;
3723  }
3724  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3725 
3727  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3728 
3729  // register query hint if query_dag_ is valid
3730  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3731  if (query_dag_) {
3732  auto candidate = query_dag_->getQueryHint(body);
3733  if (candidate) {
3734  ra_exe_unit.query_hint = *candidate;
3735  }
3736  }
3737 
3738  const auto& query_hints = ra_exe_unit.query_hint;
3739  ScopeGuard reset_cuda_block_grid_sizes = [&,
3740  orig_block_size = executor_->blockSize(),
3741  orig_grid_size = executor_->gridSize()]() {
3742  if (executor_->getDataMgr()->getCudaMgr()) {
3743  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3744  if (orig_block_size) {
3745  executor_->setBlockSize(orig_block_size);
3746  } else {
3747  executor_->resetBlockSize();
3748  }
3749  }
3750  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3751  if (orig_grid_size) {
3752  executor_->setGridSize(orig_grid_size);
3753  } else {
3754  executor_->resetGridSize();
3755  }
3756  }
3757  }
3758  };
3759 
3760  if (co.device_type == ExecutorDeviceType::GPU) {
3761  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3762  if (!executor_->getDataMgr()->getCudaMgr()) {
3763  VLOG(1) << "Skip CUDA grid size query hint: cannot detect CUDA device";
3764  } else {
3765  const auto num_sms = executor_->cudaMgr()->getMinNumMPsForAllDevices();
3766  const auto new_grid_size = static_cast<unsigned>(
3767  std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3768  const auto default_grid_size = executor_->gridSize();
3769  if (new_grid_size != default_grid_size) {
3770  VLOG(1) << "Change CUDA grid size: " << default_grid_size
3771  << " (default_grid_size) -> " << new_grid_size << " (# SMs * "
3772  << query_hints.cuda_grid_size_multiplier << ")";
3773  // todo (yoonmin): do we need to check a hard limit?
3774  executor_->setGridSize(new_grid_size);
3775  } else {
3776  VLOG(1) << "Skip CUDA grid size query hint: invalid grid size";
3777  }
3778  }
3779  }
3780  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3781  if (!executor_->getDataMgr()->getCudaMgr()) {
3782  VLOG(1) << "Skip CUDA block size query hint: cannot detect CUDA device";
3783  } else {
3784  int cuda_block_size = query_hints.cuda_block_size;
3785  int warp_size = executor_->warpSize();
3786  if (cuda_block_size >= warp_size) {
3787  cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3788  VLOG(1) << "Change CUDA block size w.r.t warp size (" << warp_size
3789  << "): " << executor_->blockSize() << " -> " << cuda_block_size;
3790  } else {
3791  VLOG(1) << "Change CUDA block size: " << executor_->blockSize() << " -> "
3792  << cuda_block_size;
3793  }
3794  executor_->setBlockSize(cuda_block_size);
3795  }
3796  }
3797  }
3798 
3799  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3800  if (is_window_execution_unit(ra_exe_unit)) {
3801  CHECK_EQ(table_infos.size(), size_t(1));
3802  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3803  max_groups_buffer_entry_guess =
3804  table_infos.front().info.fragments.front().getNumTuples();
3805  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3806  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3807  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3808  ra_exe_unit.scan_limit = *previous_count;
3809  } else {
3810  // TODO(adb): enable bump allocator path for render queries
3811  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3812  ra_exe_unit.scan_limit = 0;
3813  ra_exe_unit.use_bump_allocator = true;
3814  } else if (eo.executor_type == ::ExecutorType::Extern) {
3815  ra_exe_unit.scan_limit = 0;
3816  } else if (!eo.just_explain) {
3817  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3818  if (filter_count_all) {
3819  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3820  }
3821  }
3822  }
3823  }
3824 
3825  // when output_columnar_hint is true here, it means either 1) columnar output
3826  // configuration is on or 2) a user hint is given but we have to disable it if some
3827  // requirements are not satisfied
3828  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3829  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3830  VLOG(1) << "Using columnar layout for projection as output size of "
3831  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3833  eo.output_columnar_hint = true;
3834  }
3835  } else {
3836  eo.output_columnar_hint = false;
3837  }
3838 
3839  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3840  co.device_type,
3842  nullptr,
3843  executor_->blockSize(),
3844  executor_->gridSize()),
3845  {}};
3846 
3847  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3848  const bool has_cardinality_estimation,
3849  const bool has_ndv_estimation) -> ExecutionResult {
3850  // Note that the groups buffer entry guess may be modified during query execution.
3851  // Create a local copy so we can track those changes if we need to attempt a retry
3852  // due to OOM
3853  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3854  try {
3855  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3856  is_agg,
3857  table_infos,
3858  ra_exe_unit,
3859  co,
3860  eo,
3861  render_info,
3862  has_cardinality_estimation,
3863  column_cache),
3864  targets_meta};
3865  } catch (const QueryExecutionError& e) {
3866  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3867  throw CardinalityEstimationRequired(/*range=*/0);
3868  }
3870  return handleOutOfMemoryRetry(
3871  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3872  targets_meta,
3873  is_agg,
3874  co,
3875  eo,
3876  render_info,
3878  queue_time_ms);
3879  }
3880  };
3881 
3882  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3883  for (const auto& table_info : table_infos) {
3884  const auto db_id = table_info.table_key.db_id;
3885  if (db_id > 0) {
3886  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
3887  if (td && (td->isTemporaryTable() || td->isView)) {
3888  use_resultset_cache = false;
3889  if (eo.keep_result) {
3890  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3891  "temporary table or view";
3892  }
3893  }
3894  }
3895  }
3896 
3897  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3898  try {
3899  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3900  auto card = cached_cardinality.second;
3901  if (cached_cardinality.first && card >= 0) {
3902  result = execute_and_handle_errors(
3903  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3904  } else {
3905  result = execute_and_handle_errors(
3906  max_groups_buffer_entry_guess,
3908  /*has_ndv_estimation=*/false);
3909  }
3910  } catch (const CardinalityEstimationRequired& e) {
3911  // check the cardinality cache
3912  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3913  auto card = cached_cardinality.second;
3914  if (cached_cardinality.first && card >= 0) {
3915  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3916  } else {
3917  const auto ndv_groups_estimation =
3918  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3919  const auto estimated_groups_buffer_entry_guess =
3920  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3921  : std::min(groups_approx_upper_bound(table_infos),
3923  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3924  result = execute_and_handle_errors(
3925  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3926  if (!(eo.just_validate || eo.just_explain)) {
3927  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3928  }
3929  }
3930  }
3931 
3932  result.setQueueTime(queue_time_ms);
3933  if (render_info) {
3934  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3935  if (render_info->isInSitu()) {
3936  // return an empty result (with the same queue time, and zero render time)
3937  return {std::make_shared<ResultSet>(
3938  queue_time_ms,
3939  0,
3940  executor_->row_set_mem_owner_
3941  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3942  : nullptr),
3943  {}};
3944  }
3945  }
3946 
3947  for (auto& target_info : result.getTargetsMeta()) {
3948  if (target_info.get_type_info().is_string() &&
3949  !target_info.get_type_info().is_dict_encoded_string()) {
3950  // currently, we do not support resultset caching if non-encoded string is projected
3951  use_resultset_cache = false;
3952  if (eo.keep_result) {
3953  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3954  "string column projection";
3955  }
3956  }
3957  }
3958 
3959  const auto res = result.getDataPtr();
3960  auto allow_auto_caching_resultset =
3961  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3962  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3963  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset) &&
3964  !work_unit.exe_unit.sort_info.limit_delivered) {
3965  auto query_exec_time = timer_stop(query_exec_time_begin);
3966  res->setExecTime(query_exec_time);
3967  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3968  res->setTargetMetaInfo(body->getOutputMetainfo());
3969  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3970  res->setInputTableKeys(std::move(input_table_keys));
3971  if (allow_auto_caching_resultset) {
3972  VLOG(1) << "Automatically keep query resultset to recycler";
3973  }
3974  res->setUseSpeculativeTopNSort(
3975  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3976  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3977  ra_exe_unit.query_plan_dag_hash,
3978  res->getInputTableKeys(),
3979  res,
3980  res->getBufferSizeBytes(co.device_type),
3982  } else {
3983  if (eo.keep_result) {
3984  if (g_cluster) {
3985  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3986  "resultset recycling on distributed mode";
3987  } else if (hasStepForUnion()) {
3988  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
3989  "operator";
3990  } else if (render_info && render_info->isInSitu()) {
3991  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
3992  "a in-situ rendering query";
3993  } else if (is_validate_or_explain_query(eo)) {
3994  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
3995  "validate or explain query";
3996  } else {
3997  VLOG(1) << "Query hint \'keep_result\' is ignored";
3998  }
3999  }
4000  }
4001 
4002  return result;
4003 }
4004 
4005 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
4006  const bool is_agg,
4007  const CompilationOptions& co,
4008  const ExecutionOptions& eo) {
4009  const auto count =
4010  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
4011  kCOUNT,
4012  nullptr,
4013  false,
4014  nullptr);
4015  const auto count_all_exe_unit =
4016  work_unit.exe_unit.createCountAllExecutionUnit(count.get());
4017  size_t one{1};
4018  ResultSetPtr count_all_result;
4019  try {
4020  ColumnCacheMap column_cache;
4021  count_all_result =
4022  executor_->executeWorkUnit(one,
4023  is_agg,
4024  get_table_infos(work_unit.exe_unit, executor_),
4025  count_all_exe_unit,
4026  co,
4027  eo,
4028  nullptr,
4029  false,
4030  column_cache);
4031  } catch (const foreign_storage::ForeignStorageException& error) {
4032  throw error;
4033  } catch (const QueryMustRunOnCpu&) {
4034  // force a retry of the top level query on CPU
4035  throw;
4036  } catch (const std::exception& e) {
4037  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
4038  return std::nullopt;
4039  }
4040  const auto count_row = count_all_result->getNextRow(false, false);
4041  CHECK_EQ(size_t(1), count_row.size());
4042  const auto& count_tv = count_row.front();
4043  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4044  CHECK(count_scalar_tv);
4045  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4046  CHECK(count_ptr);
4047  CHECK_GE(*count_ptr, 0);
4048  auto count_upper_bound = static_cast<size_t>(*count_ptr);
4049  return std::max(count_upper_bound, size_t(1));
4050 }
4051 
4052 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
4053  const auto& ra_exe_unit = work_unit.exe_unit;
4054  if (ra_exe_unit.input_descs.size() != 1) {
4055  return false;
4056  }
4057  const auto& table_desc = ra_exe_unit.input_descs.front();
4058  if (table_desc.getSourceType() != InputSourceType::TABLE) {
4059  return false;
4060  }
4061  const auto& table_key = table_desc.getTableKey();
4062  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
4063  const auto comp_expr =
4064  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4065  if (!comp_expr || comp_expr->get_optype() != kEQ) {
4066  return false;
4067  }
4068  const auto lhs = comp_expr->get_left_operand();
4069  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4070  if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4071  return false;
4072  }
4073  const auto rhs = comp_expr->get_right_operand();
4074  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4075  if (!rhs_const) {
4076  return false;
4077  }
4078  const auto cd = get_column_descriptor(
4079  {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4080  if (cd->isVirtualCol) {
4081  CHECK_EQ("rowid", cd->columnName);
4082  return true;
4083  }
4084  }
4085  return false;
4086 }
4087 
4089  const RelAlgExecutor::WorkUnit& work_unit,
4090  const std::vector<TargetMetaInfo>& targets_meta,
4091  const bool is_agg,
4092  const CompilationOptions& co,
4093  const ExecutionOptions& eo,
4094  RenderInfo* render_info,
4095  const bool was_multifrag_kernel_launch,
4096  const int64_t queue_time_ms) {
4097  // Disable the bump allocator
4098  // Note that this will have basically the same affect as using the bump allocator for
4099  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
4100  // path and the bump allocator path for kernel per fragment execution.
4101  auto ra_exe_unit_in = work_unit.exe_unit;
4102  ra_exe_unit_in.use_bump_allocator = false;
4103 
4104  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4105  co.device_type,
4107  nullptr,
4108  executor_->blockSize(),
4109  executor_->gridSize()),
4110  {}};
4111 
4112  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
4113  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
4114  auto eo_no_multifrag = eo;
4115  eo_no_multifrag.setNoExplainExecutionOptions(true);
4116  eo_no_multifrag.allow_multifrag = false;
4117  eo_no_multifrag.find_push_down_candidates = false;
4118  if (was_multifrag_kernel_launch) {
4119  try {
4120  // Attempt to retry using the kernel per fragment path. The smaller input size
4121  // required may allow the entire kernel to execute in GPU memory.
4122  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
4123  "kernels disabled.";
4124  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
4125  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
4126  ColumnCacheMap column_cache;
4127  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4128  is_agg,
4129  table_infos,
4130  ra_exe_unit,
4131  co,
4132  eo_no_multifrag,
4133  nullptr,
4134  true,
4135  column_cache),
4136  targets_meta};
4137  result.setQueueTime(queue_time_ms);
4138  } catch (const QueryExecutionError& e) {
4140  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
4141  }
4142  }
4143 
4144  if (render_info) {
4145  render_info->forceNonInSitu();
4146  }
4147 
4148  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
4149  // Only reset the group buffer entry guess if we ran out of slots, which
4150  // suggests a
4151  // highly pathological input which prevented a good estimation of distinct tuple
4152  // count. For projection queries, this will force a per-fragment scan limit, which is
4153  // compatible with the CPU path
4154  VLOG(1) << "Resetting max groups buffer entry guess.";
4155  max_groups_buffer_entry_guess = 0;
4156 
4157  int iteration_ctr = -1;
4158  while (true) {
4159  iteration_ctr++;
4161  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
4162  ColumnCacheMap column_cache;
4163  try {
4164  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4165  is_agg,
4166  table_infos,
4167  ra_exe_unit,
4168  co_cpu,
4169  eo_no_multifrag,
4170  nullptr,
4171  true,
4172  column_cache),
4173  targets_meta};
4174  } catch (const QueryExecutionError& e) {
4175  // Ran out of slots
4176  if (e.getErrorCode() < 0) {
4177  // Even the conservative guess failed; it should only happen when we group
4178  // by a huge cardinality array. Maybe we should throw an exception instead?
4179  // Such a heavy query is entirely capable of exhausting all the host memory.
4180  CHECK(max_groups_buffer_entry_guess);
4181  // Only allow two iterations of increasingly large entry guesses up to a maximum
4182  // of 512MB per column per kernel
4183  if (g_enable_watchdog || iteration_ctr > 1) {
4184  throw std::runtime_error("Query ran out of output slots in the result");
4185  }
4186  max_groups_buffer_entry_guess *= 2;
4187  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4188  "groups buffer entry "
4189  "guess equal to "
4190  << max_groups_buffer_entry_guess;
4191  } else {
4193  }
4194  continue;
4195  }
4196  result.setQueueTime(queue_time_ms);
4197  return result;
4198  }
4199  return result;
4200 }
4201 
4202 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
4203  LOG(ERROR) << "Query execution failed with error "
4204  << getErrorMessageFromCode(error_code);
4205  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
4206  // We ran out of GPU memory, this doesn't count as an error if the query is
4207  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4208  // --allow-cpu-retry.
4209  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4210  if (!g_allow_cpu_retry) {
4211  throw std::runtime_error(
4212  "Query ran out of GPU memory, unable to automatically retry on CPU");
4213  }
4214  return;
4215  }
4216  throw std::runtime_error(getErrorMessageFromCode(error_code));
4217 }
4218 
4219 namespace {
4220 struct ErrorInfo {
4221  const char* code{nullptr};
4222  const char* description{nullptr};
4223 };
4224 ErrorInfo getErrorDescription(const int32_t error_code) {
4225  // 'designated initializers' don't compile on Windows for std 17
4226  // They require /std:c++20. They been removed for the windows port.
4227  switch (error_code) {
4229  return {"ERR_DIV_BY_ZERO", "Division by zero"};
4231  return {"ERR_OUT_OF_GPU_MEM",
4232 
4233  "Query couldn't keep the entire working set of columns in GPU memory"};
4235  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
4237  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
4239  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
4241  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
4243  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
4245  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4246  "Columnar conversion not supported for variable length types"};
4248  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
4250  return {"ERR_STRING_CONST_IN_RESULTSET",
4251 
4252  "NONE ENCODED String types are not supported as input result set."};
4254  return {"ERR_OUT_OF_RENDER_MEM",
4255 
4256  "Insufficient GPU memory for query results in render output buffer "
4257  "sized by render-mem-bytes"};
4259  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4260  "Streaming-Top-N not supported in Render Query"};
4262  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4263  "Multiple distinct values encountered"};
4264  case Executor::ERR_GEOS:
4265  return {"ERR_GEOS", "ERR_GEOS"};
4267  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4268 
4269  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4270  default:
4271  return {nullptr, nullptr};
4272  }
4273 }
4274 
4275 } // namespace
4276 
4277 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
4278  if (error_code < 0) {
4279  return "Ran out of slots in the query output buffer";
4280  }
4281  const auto errorInfo = getErrorDescription(error_code);
4282 
4283  if (errorInfo.code) {
4284  return errorInfo.code + ": "s + errorInfo.description;
4285  } else {
4286  return "Other error: code "s + std::to_string(error_code);
4287  }
4288 }
4289 
4292  VLOG(1) << "Running post execution callback.";
4293  (*post_execution_callback_)();
4294  }
4295 }
4296 
4298  const SortInfo& sort_info,
4299  const ExecutionOptions& eo) {
4300  const auto compound = dynamic_cast<const RelCompound*>(node);
4301  if (compound) {
4302  return createCompoundWorkUnit(compound, sort_info, eo);
4303  }
4304  const auto project = dynamic_cast<const RelProject*>(node);
4305  if (project) {
4306  return createProjectWorkUnit(project, sort_info, eo);
4307  }
4308  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4309  if (aggregate) {
4310  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4311  }
4312  const auto filter = dynamic_cast<const RelFilter*>(node);
4313  if (filter) {
4314  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4315  }
4316  LOG(FATAL) << "Unhandled node type: "
4318  return {};
4319 }
4320 
4321 namespace {
4322 
4324  auto sink = get_data_sink(ra);
4325  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4326  return join->getJoinType();
4327  }
4328  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4329  return JoinType::INNER;
4330  }
4331 
4332  return JoinType::INVALID;
4333 }
4334 
4335 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4336  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4337  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4338  return nullptr;
4339  }
4340  const auto equi_join_condition =
4341  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4342  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4343  return nullptr;
4344  }
4345  const auto both_are_null_condition =
4346  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4347  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4348  both_are_null_condition->size() != 2) {
4349  return nullptr;
4350  }
4351  const auto lhs_is_null =
4352  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4353  const auto rhs_is_null =
4354  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4355  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4356  rhs_is_null->getOperator() != kISNULL) {
4357  return nullptr;
4358  }
4359  CHECK_EQ(size_t(1), lhs_is_null->size());
4360  CHECK_EQ(size_t(1), rhs_is_null->size());
4361  CHECK_EQ(size_t(2), equi_join_condition->size());
4362  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4363  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4364  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4365  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4366  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4367  return nullptr;
4368  }
4369  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4370  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4371  RexDeepCopyVisitor deep_copy_visitor;
4372  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4373  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4374  eq_operands.emplace_back(lhs_op_copy.release());
4375  eq_operands.emplace_back(rhs_op_copy.release());
4376  return boost::make_unique<const RexOperator>(
4377  kBW_EQ, eq_operands, equi_join_condition->getType());
4378  }
4379  return nullptr;
4380 }
4381 
4382 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4383  const RexScalar* scalar) {
4384  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4385  if (condition && condition->getOperator() == kAND) {
4386  CHECK_GE(condition->size(), size_t(2));
4387  auto acc = get_bitwise_equals(condition->getOperand(0));
4388  if (!acc) {
4389  return nullptr;
4390  }
4391  for (size_t i = 1; i < condition->size(); ++i) {
4392  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4393  and_operands.emplace_back(std::move(acc));
4394  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4395  acc =
4396  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4397  }
4398  return acc;
4399  }
4400  return get_bitwise_equals(scalar);
4401 }
4402 
4403 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4404  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4405  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4406  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4407  ++nesting_level) {
4408  if (left_deep_join->getOuterCondition(nesting_level)) {
4409  join_types[nesting_level - 1] = JoinType::LEFT;
4410  }
4411  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4412  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4413  join_types[nesting_level - 1] = cur_level_join_type;
4414  }
4415  }
4416  return join_types;
4417 }
4418 
4419 template <class RA>
4420 std::vector<size_t> do_table_reordering(
4421  std::vector<InputDescriptor>& input_descs,
4422  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4423  const JoinQualsPerNestingLevel& left_deep_join_quals,
4424  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4425  const RA* node,
4426  const std::vector<InputTableInfo>& query_infos,
4427  const Executor* executor) {
4428  if (g_cluster) {
4429  // Disable table reordering in distributed mode. The aggregator does not have enough
4430  // information to break ties
4431  return {};
4432  }
4433  for (const auto& table_info : query_infos) {
4434  if (table_info.table_key.table_id < 0) {
4435  continue;
4436  }
4437  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
4438  CHECK(td);
4439  if (table_is_replicated(td)) {
4440  return {};
4441  }
4442  }
4443  const auto input_permutation =
4444  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4445  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4446  std::tie(input_descs, input_col_descs, std::ignore) =
4447  get_input_desc(node, input_to_nest_level, input_permutation);
4448  return input_permutation;
4449 }
4450 
4452  const RelLeftDeepInnerJoin* left_deep_join) {
4453  std::vector<size_t> input_sizes;
4454  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4455  const auto inputs = get_node_output(left_deep_join->getInput(i));
4456  input_sizes.push_back(inputs.size());
4457  }
4458  return input_sizes;
4459 }
4460 
4461 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4462  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4463  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4464  for (const auto& qual : quals) {
4465  const auto rewritten_qual = rewrite_expr(qual.get());
4466  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4467  }
4468  return rewritten_quals;
4469 }
4470 
4471 } // namespace
4472 
4474  const RelCompound* compound,
4475  const SortInfo& sort_info,
4476  const ExecutionOptions& eo) {
4477  std::vector<InputDescriptor> input_descs;
4478  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4479  auto input_to_nest_level = get_input_nest_levels(compound, {});
4480  std::tie(input_descs, input_col_descs, std::ignore) =
4481  get_input_desc(compound, input_to_nest_level, {});
4482  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4483  const auto query_infos = get_table_infos(input_descs, executor_);
4484  CHECK_EQ(size_t(1), compound->inputCount());
4485  const auto left_deep_join =
4486  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4487  JoinQualsPerNestingLevel left_deep_join_quals;
4488  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4489  : std::vector<JoinType>{get_join_type(compound)};
4490  std::vector<size_t> input_permutation;
4491  std::vector<size_t> left_deep_join_input_sizes;
4492  std::optional<unsigned> left_deep_tree_id;
4493  if (left_deep_join) {
4494  left_deep_tree_id = left_deep_join->getId();
4495  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4496  left_deep_join_quals = translateLeftDeepJoinFilter(
4497  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4499  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4500  join_types.end()) {
4501  input_permutation = do_table_reordering(input_descs,
4502  input_col_descs,
4503  left_deep_join_quals,
4504  input_to_nest_level,
4505  compound,
4506  query_infos,
4507  executor_);
4508  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4509  std::tie(input_descs, input_col_descs, std::ignore) =
4510  get_input_desc(compound, input_to_nest_level, input_permutation);
4511  left_deep_join_quals = translateLeftDeepJoinFilter(
4512  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4513  }
4514  }
4515  RelAlgTranslator translator(
4516  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4517  const auto scalar_sources =
4518  translate_scalar_sources(compound, translator, eo.executor_type);
4519  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4520  const auto quals_cf = translate_quals(compound, translator);
4521  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4522  const auto target_exprs = translate_targets(target_exprs_owned_,
4523  target_exprs_type_infos,
4524  scalar_sources,
4525  groupby_exprs,
4526  compound,
4527  translator,
4528  eo.executor_type);
4529 
4530  auto query_hint = RegisteredQueryHint::defaults();
4531  if (query_dag_) {
4532  auto candidate = query_dag_->getQueryHint(compound);
4533  if (candidate) {
4534  query_hint = *candidate;
4535  }
4536  }
4537  CHECK_EQ(compound->size(), target_exprs.size());
4538  const RelAlgExecutionUnit exe_unit = {input_descs,
4539  input_col_descs,
4540  quals_cf.simple_quals,
4541  rewrite_quals(quals_cf.quals),
4542  left_deep_join_quals,
4543  groupby_exprs,
4544  target_exprs,
4545  target_exprs_type_infos,
4546  nullptr,
4547  sort_info,
4548  0,
4549  query_hint,
4550  compound->getQueryPlanDagHash(),
4551  {},
4552  {},
4553  false,
4554  std::nullopt,
4555  query_state_};
4556  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4557  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4558  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4559  compound->setOutputMetainfo(targets_meta);
4560  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4561  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4562  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4563  rewritten_exe_unit.join_quals);
4564  }
4565  if (has_valid_query_plan_dag(compound)) {
4566  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4567  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4568  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4569  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4570  }
4571  return {rewritten_exe_unit,
4572  compound,
4574  std::move(query_rewriter),
4575  input_permutation,
4576  left_deep_join_input_sizes};
4577 }
4578 
4579 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4580  const RelAlgNode* node) {
4581  auto input_to_nest_level = get_input_nest_levels(node, {});
4582  const auto left_deep_join =
4583  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4584  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4585  : std::vector<JoinType>{get_join_type(node)};
4586  return std::make_shared<RelAlgTranslator>(
4587  query_state_, executor_, input_to_nest_level, join_types, now_, false);
4588 }
4589 
4590 namespace {
4591 
4592 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4593  CHECK(qual_expr);
4594  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4595  if (!bin_oper || bin_oper->getOperator() != kAND) {
4596  return {qual_expr};
4597  }
4598  CHECK_GE(bin_oper->size(), size_t(2));
4599  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4600  for (size_t i = 1; i < bin_oper->size(); ++i) {
4601  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4602  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4603  }
4604  return lhs_cf;
4605 }
4606 
4607 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4608  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4609  const SQLOps sql_op) {
4610  CHECK(!factors.empty());
4611  auto acc = factors.front();
4612  for (size_t i = 1; i < factors.size(); ++i) {
4613  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4614  }
4615  return acc;
4616 }
4617 
4618 template <class QualsList>
4619 bool list_contains_expression(const QualsList& haystack,
4620  const std::shared_ptr<Analyzer::Expr>& needle) {
4621  for (const auto& qual : haystack) {
4622  if (*qual == *needle) {
4623  return true;
4624  }
4625  }
4626  return false;
4627 }
4628 
4629 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4630 // evaluations of `p` and allows use of the original form in joins if `p`
4631 // can be used for hash joins.
4632 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4633  const std::shared_ptr<Analyzer::Expr>& expr) {
4634  const auto expr_terms = qual_to_disjunctive_form(expr);
4635  CHECK_GE(expr_terms.size(), size_t(1));
4636  const auto& first_term = expr_terms.front();
4637  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4638  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4639  // First, collect the conjunctive components common to all the disjunctive components.
4640  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4641  for (const auto& first_term_factor : first_term_factors.quals) {
4642  bool is_common =
4643  expr_terms.size() > 1; // Only report common factors for disjunction.
4644  for (size_t i = 1; i < expr_terms.size(); ++i) {
4645  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4646  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4647  is_common = false;
4648  break;
4649  }
4650  }
4651  if (is_common) {
4652  common_factors.push_back(first_term_factor);
4653  }
4654  }
4655  if (common_factors.empty()) {
4656  return expr;
4657  }
4658  // Now that the common expressions are known, collect the remaining expressions.
4659  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4660  for (const auto& term : expr_terms) {
4661  const auto term_cf = qual_to_conjunctive_form(term);
4662  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4663  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4664  for (const auto& qual : term_cf.quals) {
4665  if (!list_contains_expression(common_factors, qual)) {
4666  remaining_quals.push_back(qual);
4667  }
4668  }
4669  if (!remaining_quals.empty()) {
4670  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4671  }
4672  }
4673  // Reconstruct the expression with the transformation applied.
4674  const auto common_expr = build_logical_expression(common_factors, kAND);
4675  if (remaining_terms.empty()) {
4676  return common_expr;
4677  }
4678  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4679  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4680 }
4681 
4682 } // namespace
4683 
4684 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4685  const RexScalar* join_condition,
4686  const std::vector<JoinType>& join_types,
4687  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4688  const bool just_explain) const {
4689  RelAlgTranslator translator(
4690  query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4691  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4692  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4693  for (const auto rex_condition_component : rex_condition_cf) {
4694  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4695  const auto join_condition = reverse_logical_distribution(
4696  translator.translate(bw_equals ? bw_equals.get() : rex_condition_component));
4697  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4698 
4699  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4700  for (const auto& cf_qual : cf_quals) {
4701  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4702  }
4703  };
4704 
4705  append_folded_cf_quals(join_condition_cf.quals);
4706  append_folded_cf_quals(join_condition_cf.simple_quals);
4707  }
4708  return combine_equi_join_conditions(join_condition_quals);
4709 }
4710 
4711 // Translate left deep join filter and separate the conjunctive form qualifiers
4712 // per nesting level. The code generated for hash table lookups on each level
4713 // must dominate its uses in deeper nesting levels.
4715  const RelLeftDeepInnerJoin* join,
4716  const std::vector<InputDescriptor>& input_descs,
4717  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4718  const bool just_explain) {
4719  const auto join_types = left_deep_join_types(join);
4720  const auto join_condition_quals = makeJoinQuals(
4721  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4722  MaxRangeTableIndexVisitor rte_idx_visitor;
4723  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4724  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4725  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4726  const auto outer_condition = join->getOuterCondition(rte_idx);
4727  if (outer_condition) {
4728  result[rte_idx - 1].quals =
4729  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4730  CHECK_LE(rte_idx, join_types.size());
4731  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4732  result[rte_idx - 1].type = JoinType::LEFT;
4733  continue;
4734  }
4735  for (const auto& qual : join_condition_quals) {
4736  if (visited_quals.count(qual)) {
4737  continue;
4738  }
4739  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4740  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4741  const auto it_ok = visited_quals.emplace(qual);
4742  CHECK(it_ok.second);
4743  result[rte_idx - 1].quals.push_back(qual);
4744  }
4745  }
4746  CHECK_LE(rte_idx, join_types.size());
4747  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4748  join_types[rte_idx - 1] == JoinType::SEMI ||
4749  join_types[rte_idx - 1] == JoinType::ANTI);
4750  result[rte_idx - 1].type = join_types[rte_idx - 1];
4751  }
4752  return result;
4753 }
4754 
4755 namespace {
4756 
4757 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4758  const RelAlgNode* ra_node,
4759  const size_t nest_level,
4760  const std::vector<TargetMetaInfo>& in_metainfo,
4761  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4762  CHECK_LE(size_t(1), ra_node->inputCount());
4763  CHECK_GE(size_t(2), ra_node->inputCount());
4764  const auto input = ra_node->getInput(nest_level);
4765  const auto it_rte_idx = input_to_nest_level.find(input);
4766  CHECK(it_rte_idx != input_to_nest_level.end());
4767  const int rte_idx = it_rte_idx->second;
4768  const auto table_key = table_key_from_ra(input);
4769  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4770  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4771  int input_idx = 0;
4772  for (const auto& input_meta : in_metainfo) {
4773  inputs.push_back(std::make_shared<Analyzer::ColumnVar>(
4774  input_meta.get_type_info(),
4775  shared::ColumnKey{table_key, scan_ra ? input_idx + 1 : input_idx},
4776  rte_idx));
4777  ++input_idx;
4778  }
4779  return inputs;
4780 }
4781 
4782 std::vector<Analyzer::Expr*> get_raw_pointers(
4783  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4784  std::vector<Analyzer::Expr*> output(input.size());
4785  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4786  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4787  return output;
4788 }
4789 
4790 } // namespace
4791 
4793  const RelAggregate* aggregate,
4794  const SortInfo& sort_info,
4795  const bool just_explain) {
4796  std::vector<InputDescriptor> input_descs;
4797  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4798  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4799  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4800  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4801  get_input_desc(aggregate, input_to_nest_level, {});
4802  const auto join_type = get_join_type(aggregate);
4803 
4804  RelAlgTranslator translator(
4805  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4806  CHECK_EQ(size_t(1), aggregate->inputCount());
4807  const auto source = aggregate->getInput(0);
4808  const auto& in_metainfo = source->getOutputMetainfo();
4809  const auto scalar_sources =
4810  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4811  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4812  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4813  const auto target_exprs = translate_targets(target_exprs_owned_,
4814  target_exprs_type_infos,
4815  scalar_sources,
4816  groupby_exprs,
4817  aggregate,
4818  translator);
4819 
4820  const auto query_infos = get_table_infos(input_descs, executor_);
4821 
4822  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4823  aggregate->setOutputMetainfo(targets_meta);
4824  auto query_hint = RegisteredQueryHint::defaults();
4825  if (query_dag_) {
4826  auto candidate = query_dag_->getQueryHint(aggregate);
4827  if (candidate) {
4828  query_hint = *candidate;
4829  }
4830  }
4831  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4832  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4833  return {RelAlgExecutionUnit{input_descs,
4834  input_col_descs,
4835  {},
4836  {},
4837  {},
4838  groupby_exprs,
4839  target_exprs,
4840  target_exprs_type_infos,
4841  nullptr,
4842  sort_info,
4843  0,
4844  query_hint,
4845  aggregate->getQueryPlanDagHash(),
4846  join_info.hash_table_plan_dag,
4847  join_info.table_id_to_node_map,
4848  false,
4849  std::nullopt,
4850  query_state_},
4851  aggregate,
4853  nullptr};
4854 }
4855 
4857  const RelProject* project,
4858  const SortInfo& sort_info,
4859  const ExecutionOptions& eo) {
4860  std::vector<InputDescriptor> input_descs;
4861  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4862  auto input_to_nest_level = get_input_nest_levels(project, {});
4863  std::tie(input_descs, input_col_descs, std::ignore) =
4864  get_input_desc(project, input_to_nest_level, {});
4865  const auto query_infos = get_table_infos(input_descs, executor_);
4866 
4867  const auto left_deep_join =
4868  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4869  JoinQualsPerNestingLevel left_deep_join_quals;
4870  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4871  : std::vector<JoinType>{get_join_type(project)};
4872  std::vector<size_t> input_permutation;
4873  std::vector<size_t> left_deep_join_input_sizes;
4874  std::optional<unsigned> left_deep_tree_id;
4875  if (left_deep_join) {
4876  left_deep_tree_id = left_deep_join->getId();
4877  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4878  const auto query_infos = get_table_infos(input_descs, executor_);
4879  left_deep_join_quals = translateLeftDeepJoinFilter(
4880  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4882  input_permutation = do_table_reordering(input_descs,
4883  input_col_descs,
4884  left_deep_join_quals,
4885  input_to_nest_level,
4886  project,
4887  query_infos,
4888  executor_);
4889  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4890  std::tie(input_descs, input_col_descs, std::ignore) =
4891  get_input_desc(project, input_to_nest_level, input_permutation);
4892  left_deep_join_quals = translateLeftDeepJoinFilter(
4893  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4894  }
4895  }
4896 
4897  RelAlgTranslator translator(
4898  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4899  const auto target_exprs_owned =
4900  translate_scalar_sources(project, translator, eo.executor_type);
4901 
4902  target_exprs_owned_.insert(
4903  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4904  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4905  auto query_hint = RegisteredQueryHint::defaults();
4906  if (query_dag_) {
4907  auto candidate = query_dag_->getQueryHint(project);
4908  if (candidate) {
4909  query_hint = *candidate;
4910  }
4911  }
4912  const RelAlgExecutionUnit exe_unit = {input_descs,
4913  input_col_descs,
4914  {},
4915  {},
4916  left_deep_join_quals,
4917  {nullptr},
4918  target_exprs,
4919  {},
4920  nullptr,
4921  sort_info,
4922  0,
4923  query_hint,
4924  project->getQueryPlanDagHash(),
4925  {},
4926  {},
4927  false,
4928  std::nullopt,
4929  query_state_};
4930  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4931  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4932  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4933  project->setOutputMetainfo(targets_meta);
4934  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4935  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4936  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4937  rewritten_exe_unit.join_quals);
4938  }
4939  if (has_valid_query_plan_dag(project)) {
4940  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4941  project, left_deep_tree_id, left_deep_trees_info, executor_);
4942  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4943  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4944  }
4945  return {rewritten_exe_unit,
4946  project,
4948  std::move(query_rewriter),
4949  input_permutation,
4950  left_deep_join_input_sizes};
4951 }
4952 
4953 namespace {
4954 
4955 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4956  RelAlgNode const* input_node) {
4957  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4958  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4959  const int negative_node_id = -input_node->getId();
4960  int32_t db_id{0};
4961  if (auto rel_scan = dynamic_cast<const RelScan*>(input_node)) {
4962  db_id = rel_scan->getCatalog().getDatabaseId();
4963  }
4964  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4965  target_exprs.reserve(tmis.size());
4966  for (size_t i = 0; i < tmis.size(); ++i) {
4967  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4968  tmis[i].get_type_info(),
4969  shared::ColumnKey{db_id, negative_node_id, int32_t(i)},
4970  0));
4971  }
4972  return target_exprs;
4973 }
4974 
4975 } // namespace
4976 
4978  const RelLogicalUnion* logical_union,
4979  const SortInfo& sort_info,
4980  const ExecutionOptions& eo) {
4981  std::vector<InputDescriptor> input_descs;
4982  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4983  // Map ra input ptr to index (0, 1).
4984  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4985  std::tie(input_descs, input_col_descs, std::ignore) =
4986  get_input_desc(logical_union, input_to_nest_level, {});
4987  const auto query_infos = get_table_infos(input_descs, executor_);
4988  auto const max_num_tuples =
4989  std::accumulate(query_infos.cbegin(),
4990  query_infos.cend(),
4991  size_t(0),
4992  [](auto max, auto const& query_info) {
4993  return std::max(max, query_info.info.getNumTuples());
4994  });
4995 
4996  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4997  for (auto& pair : input_to_nest_level) {
4998  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4999  << pair.second << ')';
5000  }
5001 
5002  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5003  // may differ on StringDictionaries.
5004  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5005  for (unsigned i = 0; i < 2; ++i) {
5006  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5007  CHECK(!input_exprs_owned.empty())
5008  << "No metainfo found for input node(" << i << ") "
5009  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5010  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5011  for (auto& input_expr : input_exprs_owned) {
5012  VLOG(3) << " " << input_expr->toString();
5013  }
5014  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5015  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5016  }
5017 
5018  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5019  << " input_col_descs=" << shared::printContainer(input_col_descs)
5020  << " target_exprs.size()=" << target_exprs_pair[0].size()
5021  << " max_num_tuples=" << max_num_tuples;
5022 
5023  const RelAlgExecutionUnit exe_unit = {input_descs,
5024  input_col_descs,
5025  {}, // quals_cf.simple_quals,
5026  {}, // rewrite_quals(quals_cf.quals),
5027  {},
5028  {nullptr},
5029  target_exprs_pair[0],
5030  {},
5031  nullptr,
5032  sort_info,
5033  max_num_tuples,
5036  {},
5037  {},
5038  false,
5039  logical_union->isAll(),
5040  query_state_,
5041  target_exprs_pair[1]};
5042  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5043  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5044 
5045  RelAlgNode const* input0 = logical_union->getInput(0);
5046  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5047  logical_union->setOutputMetainfo(
5048  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5049  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5050  logical_union->setOutputMetainfo(
5051  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5052  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5053  logical_union->setOutputMetainfo(
5054  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5055  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5056  logical_union->setOutputMetainfo(
5057  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5058  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5059  logical_union->setOutputMetainfo(
5060  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5061  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5062  logical_union->setOutputMetainfo(
5063  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5064  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5065  logical_union->setOutputMetainfo(
5066  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5067  } else if (dynamic_cast<const RelSort*>(input0)) {
5068  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5069  } else {
5070  throw QueryNotSupported("Unsupported input type: " +
5072  }
5073  VLOG(3) << "logical_union->getOutputMetainfo()="
5074  << shared::printContainer(logical_union->getOutputMetainfo())
5075  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5076  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5077 
5078  return {rewritten_exe_unit,
5079  logical_union,
5081  std::move(query_rewriter)};
5082 }
5083 
5085  const RelTableFunction* rel_table_func,
5086  const bool just_explain,
5087  const bool is_gpu) {
5088  std::vector<InputDescriptor> input_descs;
5089  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5090  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5091  std::tie(input_descs, input_col_descs, std::ignore) =
5092  get_input_desc(rel_table_func, input_to_nest_level, {});
5093  const auto query_infos = get_table_infos(input_descs, executor_);
5094  RelAlgTranslator translator(
5095  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5096  auto input_exprs_owned = translate_scalar_sources(
5097  rel_table_func, translator, ::ExecutorType::TableFunctions);
5098  target_exprs_owned_.insert(
5099  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5100 
5101  const auto table_function_impl_and_type_infos = [=]() {
5102  if (is_gpu) {
5103  try {
5104  return bind_table_function(
5105  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5106  } catch (ExtensionFunctionBindingError& e) {
5107  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5108  << " Redirecting " << rel_table_func->getFunctionName()
5109  << " step to run on CPU.";
5110  throw QueryMustRunOnCpu();
5111  }
5112  } else {
5113  try {
5114  return bind_table_function(
5115  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5116  } catch (ExtensionFunctionBindingError& e) {
5117  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5118  throw;
5119  }
5120  }
5121  }();
5122  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5123  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5124  size_t output_row_sizing_param = 0;
5125  if (table_function_impl
5126  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5127  const auto parameter_index =
5128  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5129  CHECK_GT(parameter_index, size_t(0));
5130  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5131  const auto parameter_expr =
5132  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5133  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5134  if (!parameter_expr_literal) {
5135  throw std::runtime_error(
5136  "Provided output buffer sizing parameter is not a literal. Only literal "
5137  "values are supported with output buffer sizing configured table "
5138  "functions.");
5139  }
5140  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5141  if (literal_val < 0) {
5142  throw std::runtime_error("Provided output sizing parameter " +
5143  std::to_string(literal_val) +
5144  " must be positive integer.");
5145  }
5146  output_row_sizing_param = static_cast<size_t>(literal_val);
5147  } else {
5148  // RowMultiplier not specified in the SQL query. Set it to 1
5149  output_row_sizing_param = 1; // default value for RowMultiplier
5150  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5151  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5152  makeExpr<Analyzer::Constant>(kINT, false, d);
5153  // Push the constant 1 to input_exprs
5154  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5155  DEFAULT_ROW_MULTIPLIER_EXPR);
5156  }
5157  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5158  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5159  } else {
5160  UNREACHABLE();
5161  }
5162 
5163  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5164  size_t input_index = 0;
5165  size_t arg_index = 0;
5166  const auto table_func_args = table_function_impl.getInputArgs();
5167  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5168  for (const auto& ti : table_function_type_infos) {
5169  if (ti.is_column_list()) {
5170  for (int i = 0; i < ti.get_dimension(); i++) {
5171  auto& input_expr = input_exprs_owned[input_index];
5172  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5173  CHECK(col_var);
5174 
5175  // avoid setting type info to ti here since ti doesn't have all the
5176  // properties correctly set
5177  auto type_info = input_expr->get_type_info();
5178  if (ti.is_column_array()) {
5179  type_info.set_compression(kENCODING_ARRAY);
5180  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5181  } else {
5182  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5183  }
5184  type_info.set_type(ti.get_type()); // set type to column list
5185  type_info.set_dimension(ti.get_dimension());
5186  input_expr->set_type_info(type_info);
5187 
5188  input_col_exprs.push_back(col_var);
5189  input_index++;
5190  }
5191  } else if (ti.is_column()) {
5192  auto& input_expr = input_exprs_owned[input_index];
5193  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5194  CHECK(col_var);
5195  // same here! avoid setting type info to ti since it doesn't have all the
5196  // properties correctly set
5197  auto type_info = input_expr->get_type_info();
5198  if (ti.is_column_array()) {
5199  type_info.set_compression(kENCODING_ARRAY);
5200  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5201  } else {
5202  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5203  }
5204  type_info.set_type(ti.get_type()); // set type to column
5205  input_expr->set_type_info(type_info);
5206  input_col_exprs.push_back(col_var);
5207  input_index++;
5208  } else {
5209  auto input_expr = input_exprs_owned[input_index];
5210  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5211  if (ext_func_arg_ti != input_expr->get_type_info()) {
5212  input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5213  }
5214  input_index++;
5215  }
5216  arg_index++;
5217  }
5218  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5219  std::vector<Analyzer::Expr*> table_func_outputs;
5220  constexpr int32_t transient_pos{-1};
5221  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5222  auto ti = table_function_impl.getOutputSQLType(i);
5223  if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5224  auto p = table_function_impl.getInputID(i);
5225 
5226  int32_t input_pos = p.first;
5227  if (input_pos == transient_pos) {
5228  ti.set_comp_param(TRANSIENT_DICT_ID);
5229  } else {
5230  // Iterate over the list of arguments to compute the offset. Use this offset to
5231  // get the corresponding input
5232  int32_t offset = 0;
5233  for (int j = 0; j < input_pos; j++) {
5234  const auto ti = table_function_type_infos[j];
5235  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5236  }
5237  input_pos = offset + p.second;
5238 
5239  CHECK_LT(input_pos, input_exprs_owned.size());
5240  const auto& dict_key =
5241  input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5242  ti.set_comp_param(dict_key.dict_id);
5243  ti.setStringDictKey(dict_key);
5244  }
5245  }
5246  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(
5247  ti, shared::ColumnKey{0, 0, int32_t(i)}, -1));
5248  table_func_outputs.push_back(target_exprs_owned_.back().get());
5249  }
5250  auto input_exprs = get_raw_pointers(input_exprs_owned);
5251  const TableFunctionExecutionUnit exe_unit = {
5252  input_descs,
5253  input_col_descs,
5254  input_exprs, // table function inputs
5255  input_col_exprs, // table function column inputs (duplicates w/ above)
5256  table_func_outputs, // table function projected exprs
5257  output_row_sizing_param, // output buffer sizing param
5258  table_function_impl};
5259  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5260  rel_table_func->setOutputMetainfo(targets_meta);
5261  return {exe_unit, rel_table_func};
5262 }
5263 
5264 namespace {
5265 
5266 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5268  const RelAlgTranslator& translator,
5269  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5270  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5271  std::vector<TargetMetaInfo> in_metainfo;
5272  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5273  const auto data_sink_node = get_data_sink(filter);
5274  auto input_it = inputs_owned.begin();
5275  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5276  const auto source = data_sink_node->getInput(nest_level);
5277  const auto scan_source = dynamic_cast<const RelScan*>(source);
5278  if (scan_source) {
5279  CHECK(source->getOutputMetainfo().empty());
5280  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5281  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5282  scalar_sources_owned.push_back(translator.translate(input_it->get()));
5283  }
5284  const auto source_metadata =
5285  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
5286  in_metainfo.insert(
5287  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5288  exprs_owned.insert(
5289  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5290  } else {
5291  const auto& source_metadata = source->getOutputMetainfo();
5292  input_it += source_metadata.size();
5293  in_metainfo.insert(
5294  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5295  const auto scalar_sources_owned = synthesize_inputs(
5296  data_sink_node, nest_level, source_metadata, input_to_nest_level);
5297  exprs_owned.insert(
5298  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5299  }
5300  }
5301  return std::make_pair(in_metainfo, exprs_owned);
5302 }
5303 
5304 } // namespace