OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
21 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RelAlgDag.h"
38 #include "QueryEngine/RexVisitor.h"
43 #include "Shared/measure.h"
44 #include "Shared/misc.h"
45 #include "Shared/shard_key.h"
46 
47 #include <boost/algorithm/cxx11/any_of.hpp>
48 #include <boost/range/adaptor/reversed.hpp>
49 
50 #include <algorithm>
51 #include <functional>
52 #include <numeric>
53 
55 bool g_enable_interop{false};
56 bool g_enable_union{true}; // DEPRECATED
60 
61 extern bool g_enable_watchdog;
64 extern bool g_enable_bump_allocator;
66 extern bool g_enable_system_tables;
67 
68 namespace {
69 
70 bool node_is_aggregate(const RelAlgNode* ra) {
71  const auto compound = dynamic_cast<const RelCompound*>(ra);
72  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
73  return ((compound && compound->isAggregate()) || aggregate);
74 }
75 
76 std::unordered_set<PhysicalInput> get_physical_inputs_with_spi_col_id(
77  const RelAlgNode* ra) {
78  const auto phys_inputs = get_physical_inputs(ra);
79  std::unordered_set<PhysicalInput> phys_inputs2;
80  for (auto& phi : phys_inputs) {
81  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(phi.db_id);
82  CHECK(catalog);
83  phys_inputs2.insert(PhysicalInput{
84  catalog->getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id, phi.db_id});
85  }
86  return phys_inputs2;
87 }
88 
89 void set_parallelism_hints(const RelAlgNode& ra_node) {
90  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
91  parallelism_hints_per_table;
92  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
93  const auto table_id = physical_input.table_id;
94  const auto catalog =
96  CHECK(catalog);
97  const auto table = catalog->getMetadataForTable(table_id, true);
98  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
99  !table->is_in_memory_system_table) {
100  const auto col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
101  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
102  const auto foreign_table = catalog->getForeignTable(table_id);
103  for (const auto& fragment :
104  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
105  Chunk_NS::Chunk chunk{col_desc};
106  ChunkKey chunk_key = {
107  physical_input.db_id, table_id, col_id, fragment.fragmentId};
108 
109  // Parallelism hints should not include fragments that are not mapped to the
110  // current node, otherwise we will try to prefetch them and run into trouble.
112  continue;
113  }
114 
115  // do not include chunk hints that are in CPU memory
116  if (!chunk.isChunkOnDevice(
117  &catalog->getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
118  parallelism_hints_per_table[{physical_input.db_id, table_id}].insert(
120  fragment.fragmentId});
121  }
122  }
123  }
124  }
125  if (!parallelism_hints_per_table.empty()) {
126  auto foreign_storage_mgr = Catalog_Namespace::SysCatalog::instance()
127  .getDataMgr()
130  CHECK(foreign_storage_mgr);
131  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
132  }
133 }
134 
136  for (const auto [col_id, table_id, db_id] : get_physical_inputs(&ra_node)) {
137  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
138  CHECK(catalog);
139  const auto table = catalog->getMetadataForTable(table_id, false);
140  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
141  const auto spi_col_id = catalog->getColumnIdBySpi(table_id, col_id);
142  foreign_storage::populate_string_dictionary(table_id, spi_col_id, db_id);
143  }
144  }
145 }
146 
148  // Iterate through ra_node inputs for types that need to be loaded pre-execution
149  // If they do not have valid metadata, load them into CPU memory to generate
150  // the metadata and leave them ready to be used by the query
151  set_parallelism_hints(ra_node);
153 }
154 
156  const CompilationOptions& co) {
158  const auto info_schema_catalog =
160  CHECK(info_schema_catalog);
161  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
162  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
163  if (info_schema_catalog->getDatabaseId() != physical_input.db_id) {
164  continue;
165  }
166  const auto table_id = physical_input.table_id;
167  const auto table = info_schema_catalog->getMetadataForTable(table_id, false);
168  CHECK(table);
169  if (table->is_in_memory_system_table) {
170  const auto column_id =
171  info_schema_catalog->getColumnIdBySpi(table_id, physical_input.col_id);
172  system_table_columns_by_table_id[table_id].emplace_back(column_id);
173  }
174  }
175  // Execute on CPU for queries involving system tables
176  if (!system_table_columns_by_table_id.empty() &&
178  throw QueryMustRunOnCpu();
179  }
180 
181  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
182  // Clear any previously cached data, since system tables depend on point in
183  // time data snapshots.
184  info_schema_catalog->getDataMgr().deleteChunksWithPrefix(
185  ChunkKey{info_schema_catalog->getDatabaseId(), table_id},
187 
188  // TODO(Misiu): This prefetching can be removed if we can add support for
189  // ExpressionRanges to reduce invalid with valid ranges (right now prefetching
190  // causes us to fetch the chunks twice). Right now if we do not prefetch (i.e. if
191  // we remove the code below) some nodes will return valid ranges and others will
192  // return unknown because they only use placeholder metadata and the LeafAggregator
193  // has no idea how to reduce the two.
194  const auto td = info_schema_catalog->getMetadataForTable(table_id);
195  CHECK(td);
196  CHECK(td->fragmenter);
197  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
198  CHECK_LE(fragment_count, static_cast<size_t>(1))
199  << "In-memory system tables are expected to have a single fragment.";
200  if (fragment_count > 0) {
201  for (auto column_id : column_ids) {
202  // Prefetch system table chunks in order to force chunk statistics metadata
203  // computation.
204  const auto cd = info_schema_catalog->getMetadataForColumn(table_id, column_id);
205  const ChunkKey chunk_key{
206  info_schema_catalog->getDatabaseId(), table_id, column_id, 0};
208  &(info_schema_catalog->getDataMgr()),
209  chunk_key,
211  0,
212  0,
213  0);
214  }
215  }
216  }
217  }
218 }
219 
222 }
223 } // namespace
224 
226  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
227  const std::vector<TargetMetaInfo>& targets_meta) {
228  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
229  render_info.targets.clear();
230  for (size_t i = 0; i < targets_meta.size(); ++i) {
231  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
232  targets_meta[i].get_resname(),
233  work_unit_target_exprs[i]->get_shared_ptr(),
234  false));
235  }
236 }
237 
239  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
240 }
241 
242 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
243  public:
244  std::vector<unsigned> visitLeftDeepInnerJoin(
245  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
246  return {left_deep_join_tree->getId()};
247  }
248 
249  protected:
250  std::vector<unsigned> aggregateResult(
251  const std::vector<unsigned>& aggregate,
252  const std::vector<unsigned>& next_result) const override {
253  auto result = aggregate;
254  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
255  return result;
256  }
257 };
258 
264  const size_t text_encoding_casts)
265  : text_decoding_casts(text_decoding_casts)
266  , text_encoding_casts(text_encoding_casts) {}
267 };
268 
269 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
270  public:
271  TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
273  default_disregard_casts_to_none_encoding) {}
274 
275  protected:
276  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
278  // Save state of input disregard_casts_to_none_encoding_ as child node traversal
279  // will reset it
280  const bool disregard_casts_to_none_encoding = disregard_casts_to_none_encoding_;
281  result = aggregateResult(result, visit(u_oper->get_operand()));
282  if (u_oper->get_optype() != kCAST) {
283  return result;
284  }
285  const auto& operand_ti = u_oper->get_operand()->get_type_info();
286  const auto& casted_ti = u_oper->get_type_info();
287  if (!operand_ti.is_string() && casted_ti.is_dict_encoded_string()) {
288  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
289  }
290  if (!casted_ti.is_string()) {
291  return result;
292  }
293  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
294  if (literals_only) {
295  return result;
296  }
297  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
298  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
299  }
300  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
301  if (!disregard_casts_to_none_encoding) {
302  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
303  } else {
304  return result;
305  }
306  }
307  return result;
308  }
309 
311  const Analyzer::StringOper* string_oper) const override {
313  if (string_oper->getArity() > 0) {
314  result = aggregateResult(result, visit(string_oper->getArg(0)));
315  }
316  if (string_op_returns_string(string_oper->get_kind()) &&
317  string_oper->hasNoneEncodedTextArg()) {
318  result = aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
319  }
320  return result;
321  }
322 
323  TextEncodingCastCounts visitBinOper(const Analyzer::BinOper* bin_oper) const override {
325  // Currently the join framework handles casts between string types, and
326  // casts to none-encoded strings should be considered spurious, except
327  // when the join predicate is not a =/<> operator, in which case
328  // for both join predicates and all other instances we have to decode
329  // to a none-encoded string to do the comparison. The logic below essentially
330  // overrides the logic such as to always count none-encoded casts on strings
331  // that are children of binary operators other than =/<>
332  if (bin_oper->get_optype() != kEQ && bin_oper->get_optype() != kNE) {
333  // Override the global override so that join opers don't skip
334  // the check when there is an actual cast to none-encoded string
335  const auto prev_disregard_casts_to_none_encoding_state =
337  const auto left_u_oper =
338  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
339  if (left_u_oper && left_u_oper->get_optype() == kCAST) {
341  result = aggregateResult(result, visitUOper(left_u_oper));
342  } else {
343  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
344  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
345  }
346 
347  const auto right_u_oper =
348  dynamic_cast<const Analyzer::UOper*>(bin_oper->get_left_operand());
349  if (right_u_oper && right_u_oper->get_optype() == kCAST) {
351  result = aggregateResult(result, visitUOper(right_u_oper));
352  } else {
353  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
354  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
355  }
356  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
357  } else {
358  result = aggregateResult(result, visit(bin_oper->get_left_operand()));
359  result = aggregateResult(result, visit(bin_oper->get_right_operand()));
360  }
361  return result;
362  }
363 
366  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
367  const auto prev_disregard_casts_to_none_encoding_state =
369  if (u_oper && u_oper->get_optype() == kCAST) {
371  result = aggregateResult(result, visitUOper(u_oper));
372  disregard_casts_to_none_encoding_ = prev_disregard_casts_to_none_encoding_state;
373  } else {
374  result = aggregateResult(result, visit(like->get_arg()));
375  }
376  result = aggregateResult(result, visit(like->get_like_expr()));
377  if (like->get_escape_expr()) {
378  result = aggregateResult(result, visit(like->get_escape_expr()));
379  }
380  return result;
381  }
382 
384  const TextEncodingCastCounts& aggregate,
385  const TextEncodingCastCounts& next_result) const override {
386  auto result = aggregate;
387  result.text_decoding_casts += next_result.text_decoding_casts;
388  result.text_encoding_casts += next_result.text_encoding_casts;
389  return result;
390  }
391 
392  void visitBegin() const override {
394  }
395 
397  return TextEncodingCastCounts();
398  }
399 
400  private:
401  mutable bool disregard_casts_to_none_encoding_ = false;
403 };
404 
406  TextEncodingCastCounts cast_counts;
407 
408  auto check_node_for_text_casts = [&cast_counts](
409  const Analyzer::Expr* expr,
410  const bool disregard_casts_to_none_encoding) {
411  if (!expr) {
412  return;
413  }
414  TextEncodingCastCountVisitor visitor(disregard_casts_to_none_encoding);
415  const auto this_node_cast_counts = visitor.visit(expr);
416  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
417  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
418  };
419 
420  for (const auto& qual : ra_exe_unit.quals) {
421  check_node_for_text_casts(qual.get(), false);
422  }
423  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
424  check_node_for_text_casts(simple_qual.get(), false);
425  }
426  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
427  check_node_for_text_casts(groupby_expr.get(), false);
428  }
429  for (const auto& target_expr : ra_exe_unit.target_exprs) {
430  check_node_for_text_casts(target_expr, false);
431  }
432  for (const auto& join_condition : ra_exe_unit.join_quals) {
433  for (const auto& join_qual : join_condition.quals) {
434  // We currently need to not count casts to none-encoded strings for join quals,
435  // as analyzer will generate these but our join framework disregards them.
436  // Some investigation was done on having analyzer issue the correct inter-string
437  // dictionary casts, but this actually causes them to get executed and so the same
438  // work gets done twice.
439  check_node_for_text_casts(join_qual.get(),
440  true /* disregard_casts_to_none_encoding */);
441  }
442  }
443  return cast_counts;
444 }
445 
446 namespace {
447 
449  const std::vector<InputTableInfo>& query_infos,
450  const RelAlgExecutionUnit& ra_exe_unit) {
451  if (!g_enable_watchdog) {
452  return;
453  }
454  auto const tuples_upper_bound =
455  std::accumulate(query_infos.cbegin(),
456  query_infos.cend(),
457  size_t(0),
458  [](auto max, auto const& query_info) {
459  return std::max(max, query_info.info.getNumTuples());
460  });
461  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
462  return;
463  }
464 
465  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
466  const bool has_text_casts =
467  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
468 
469  if (!has_text_casts) {
470  return;
471  }
472  std::ostringstream oss;
473  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
474  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
475  << "exceeds the configured watchdog none-encoded string translation limit of "
477  throw std::runtime_error(oss.str());
478 }
479 
480 } // namespace
481 
483  RenderInfo* render_info) const {
484  auto validate_or_explain_query = is_validate_or_explain_query(eo);
485  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
487  !validate_or_explain_query && !hasStepForUnion() &&
488  !query_for_partial_outer_frag &&
489  (!render_info || (render_info && !render_info->isInSitu()));
490 }
491 
493  const ExecutionOptions& eo) {
494  if (eo.find_push_down_candidates) {
495  return 0;
496  }
497 
498  if (eo.just_explain) {
499  return 0;
500  }
501 
502  CHECK(query_dag_);
503 
504  query_dag_->resetQueryExecutionState();
505  const auto& ra = query_dag_->getRootNode();
506 
507  auto lock = executor_->acquireExecuteMutex();
508  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
509  setupCaching(&ra);
510 
511  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
512  auto ed_seq = RaExecutionSequence(&ra, executor_);
513 
514  if (!getSubqueries().empty()) {
515  return 0;
516  }
517 
518  CHECK(!ed_seq.empty());
519  if (ed_seq.size() > 1) {
520  return 0;
521  }
522 
525  executor_->temporary_tables_ = &temporary_tables_;
526 
527  auto exec_desc_ptr = ed_seq.getDescriptor(0);
528  CHECK(exec_desc_ptr);
529  auto& exec_desc = *exec_desc_ptr;
530  const auto body = exec_desc.getBody();
531  if (body->isNop()) {
532  return 0;
533  }
534 
535  const auto project = dynamic_cast<const RelProject*>(body);
536  if (project) {
537  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
538  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
539  executor_);
540  }
541 
542  const auto compound = dynamic_cast<const RelCompound*>(body);
543  if (compound) {
544  if (compound->isDeleteViaSelect()) {
545  return 0;
546  } else if (compound->isUpdateViaSelect()) {
547  return 0;
548  } else {
549  if (compound->isAggregate()) {
550  return 0;
551  }
552 
553  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
554 
555  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
556  executor_);
557  }
558  }
559 
560  return 0;
561 }
562 
564  const ExecutionOptions& eo,
565  const bool just_explain_plan,
566  const bool explain_verbose,
567  RenderInfo* render_info) {
568  CHECK(query_dag_);
570  << static_cast<int>(query_dag_->getBuildState());
571 
572  auto timer = DEBUG_TIMER(__func__);
574 
575  auto run_query = [&](const CompilationOptions& co_in) {
576  auto execution_result = executeRelAlgQueryNoRetry(
577  co_in, eo, just_explain_plan, explain_verbose, render_info);
578 
579  constexpr bool vlog_result_set_summary{false};
580  if constexpr (vlog_result_set_summary) {
581  VLOG(1) << execution_result.getRows()->summaryToString();
582  }
583 
585  VLOG(1) << "Running post execution callback.";
586  (*post_execution_callback_)();
587  }
588  return execution_result;
589  };
590 
591  try {
592  return run_query(co);
593  } catch (const QueryMustRunOnCpu&) {
594  if (!g_allow_cpu_retry) {
595  throw;
596  }
597  }
598  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
599  auto co_cpu = CompilationOptions::makeCpuOnly(co);
600 
601  if (render_info) {
602  render_info->forceNonInSitu();
603  }
604  return run_query(co_cpu);
605 }
606 
608  const ExecutionOptions& eo,
609  const bool just_explain_plan,
610  const bool explain_verbose,
611  RenderInfo* render_info) {
613  auto timer = DEBUG_TIMER(__func__);
614  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
615 
616  query_dag_->resetQueryExecutionState();
617  const auto& ra = query_dag_->getRootNode();
618 
619  // capture the lock acquistion time
620  auto clock_begin = timer_start();
622  executor_->resetInterrupt();
623  }
624  std::string query_session{""};
625  std::string query_str{"N/A"};
626  std::string query_submitted_time{""};
627  // gather necessary query's info
628  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
629  query_session = query_state_->getConstSessionInfo()->get_session_id();
630  query_str = query_state_->getQueryStr();
631  query_submitted_time = query_state_->getQuerySubmittedTime();
632  }
633 
634  auto validate_or_explain_query =
635  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
636  auto interruptable = !render_info && !query_session.empty() &&
637  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
638  if (interruptable) {
639  // if we reach here, the current query which was waiting an idle executor
640  // within the dispatch queue is now scheduled to the specific executor
641  // (not UNITARY_EXECUTOR)
642  // so we update the query session's status with the executor that takes this query
643  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
644  query_session, query_str, query_submitted_time);
645 
646  // now the query is going to be executed, so update the status as
647  // "RUNNING_QUERY_KERNEL"
648  executor_->updateQuerySessionStatus(
649  query_session,
650  query_submitted_time,
651  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
652  }
653 
654  // so it should do cleanup session info after finishing its execution
655  ScopeGuard clearQuerySessionInfo =
656  [this, &query_session, &interruptable, &query_submitted_time] {
657  // reset the runtime query interrupt status after the end of query execution
658  if (interruptable) {
659  // cleanup running session's info
660  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
661  }
662  };
663 
664  auto acquire_execute_mutex = [](Executor * executor) -> auto{
665  auto ret = executor->acquireExecuteMutex();
666  return ret;
667  };
668  // now we acquire executor lock in here to make sure that this executor holds
669  // all necessary resources and at the same time protect them against other executor
670  auto lock = acquire_execute_mutex(executor_);
671 
672  if (interruptable) {
673  // check whether this query session is "already" interrupted
674  // this case occurs when there is very short gap between being interrupted and
675  // taking the execute lock
676  // if so we have to remove "all" queries initiated by this session and we do in here
677  // without running the query
678  try {
679  executor_->checkPendingQueryStatus(query_session);
680  } catch (QueryExecutionError& e) {
682  throw std::runtime_error("Query execution has been interrupted (pending query)");
683  }
684  throw e;
685  } catch (...) {
686  throw std::runtime_error("Checking pending query status failed: unknown error");
687  }
688  }
689  int64_t queue_time_ms = timer_stop(clock_begin);
690 
692 
693  // Notify foreign tables to load prior to caching
695 
696  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
697  setupCaching(&ra);
698 
699  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
700  auto ed_seq = RaExecutionSequence(&ra, executor_);
701 
702  if (just_explain_plan) {
703  const auto& ra = getRootRelAlgNode();
704  auto ed_seq = RaExecutionSequence(&ra, executor_);
705  std::vector<const RelAlgNode*> steps;
706  for (size_t i = 0; i < ed_seq.size(); i++) {
707  steps.emplace_back(ed_seq.getDescriptor(i)->getBody());
708  steps.back()->setStepNumber(i + 1);
709  }
710  std::stringstream ss;
711  auto& nodes = getRelAlgDag()->getNodes();
712  RelAlgDagViewer dagv(false, explain_verbose);
713  dagv.handleQueryEngineVector(nodes);
714  ss << dagv;
715  auto rs = std::make_shared<ResultSet>(ss.str());
716  return {rs, {}};
717  }
718 
719  if (eo.find_push_down_candidates) {
720  // this extra logic is mainly due to current limitations on multi-step queries
721  // and/or subqueries.
723  ed_seq, co, eo, render_info, queue_time_ms);
724  }
725  timer_setup.stop();
726 
727  // Dispatch the subqueries first
728  const auto global_hints = getGlobalQueryHint();
729  for (auto subquery : getSubqueries()) {
730  const auto subquery_ra = subquery->getRelAlg();
731  CHECK(subquery_ra);
732  if (subquery_ra->hasContextData()) {
733  continue;
734  }
735  // Execute the subquery and cache the result.
736  RelAlgExecutor subquery_executor(executor_, query_state_);
737  // Propagate global and local query hint if necessary
738  const auto local_hints = getParsedQueryHint(subquery_ra);
739  if (global_hints || local_hints) {
740  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
741  if (global_hints) {
742  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
743  }
744  if (local_hints) {
745  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
746  }
747  }
748  RaExecutionSequence subquery_seq(subquery_ra, executor_);
749  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
750  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
751  }
752  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
753 }
754 
756  AggregatedColRange agg_col_range_cache;
757  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
758  return executor_->computeColRangesCache(phys_inputs);
759 }
760 
762  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
763  return executor_->computeStringDictionaryGenerations(phys_inputs);
764 }
765 
767  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
768  return executor_->computeTableGenerations(phys_table_ids);
769 }
770 
771 Executor* RelAlgExecutor::getExecutor() const {
772  return executor_;
773 }
774 
776  CHECK(executor_);
777  executor_->row_set_mem_owner_ = nullptr;
778 }
779 
780 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
782  auto sort_node = dynamic_cast<const RelSort*>(root_node);
783  if (sort_node) {
784  // we assume that test query that needs join info does not contain any sort node
785  return {};
786  }
787  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
789  auto left_deep_tree_ids = visitor.visit(root_node);
790  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
791 }
792 
793 namespace {
794 
796  CHECK_EQ(size_t(1), sort->inputCount());
797  const auto source = sort->getInput(0);
798  if (dynamic_cast<const RelSort*>(source)) {
799  throw std::runtime_error("Sort node not supported as input to another sort");
800  }
801 }
802 
803 } // namespace
804 
806  const RaExecutionSequence& seq,
807  const size_t step_idx,
808  const CompilationOptions& co,
809  const ExecutionOptions& eo,
810  RenderInfo* render_info) {
811  INJECT_TIMER(executeRelAlgQueryStep);
812 
813  auto exe_desc_ptr = seq.getDescriptor(step_idx);
814  CHECK(exe_desc_ptr);
815  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
816 
817  size_t shard_count{0};
818  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
819  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
820  };
821 
822  if (sort) {
824  auto order_entries = sort->getOrderEntries();
825  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
826  shard_count =
827  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
828  if (!shard_count) {
829  // No point in sorting on the leaf, only execute the input to the sort node.
830  CHECK_EQ(size_t(1), sort->inputCount());
831  const auto source = sort->getInput(0);
832  if (sort->collationCount() || node_is_aggregate(source)) {
833  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
834  CHECK_EQ(temp_seq.size(), size_t(1));
835  ExecutionOptions eo_copy = eo;
836  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
837  // Use subseq to avoid clearing existing temporary tables
838  return {
839  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
840  merge_type(source),
841  source->getId(),
842  false};
843  }
844  }
845  }
846  QueryStepExecutionResult query_step_result{ExecutionResult{},
847  merge_type(exe_desc_ptr->getBody()),
848  exe_desc_ptr->getBody()->getId(),
849  false};
850  try {
851  query_step_result.result = executeRelAlgSubSeq(
852  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
853  } catch (QueryMustRunOnCpu const& e) {
855  auto copied_co = co;
857  LOG(INFO) << "Retry the query via CPU mode";
858  query_step_result.result = executeRelAlgSubSeq(seq,
859  std::make_pair(step_idx, step_idx + 1),
860  copied_co,
861  eo,
862  render_info,
864  }
866  VLOG(1) << "Running post execution callback.";
867  (*post_execution_callback_)();
868  }
869  return query_step_result;
870 }
871 
873  const AggregatedColRange& agg_col_range,
874  const StringDictionaryGenerations& string_dictionary_generations,
875  const TableGenerations& table_generations) {
876  // capture the lock acquistion time
877  auto clock_begin = timer_start();
879  executor_->resetInterrupt();
880  }
881  queue_time_ms_ = timer_stop(clock_begin);
882  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(
884  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
885  executor_->table_generations_ = table_generations;
886  executor_->agg_col_range_cache_ = agg_col_range;
887 }
888 
890  const CompilationOptions& co,
891  const ExecutionOptions& eo,
892  RenderInfo* render_info,
893  const int64_t queue_time_ms,
894  const bool with_existing_temp_tables) {
896  auto timer = DEBUG_TIMER(__func__);
897  if (!with_existing_temp_tables) {
899  }
902  executor_->temporary_tables_ = &temporary_tables_;
903 
904  time(&now_);
905  CHECK(!seq.empty());
906 
907  auto get_descriptor_count = [&seq, &eo]() -> size_t {
908  if (eo.just_explain) {
909  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
910  // run the logical values descriptor to generate the result set, then the next
911  // descriptor to generate the explain
912  CHECK_GE(seq.size(), size_t(2));
913  return 2;
914  } else {
915  return 1;
916  }
917  } else {
918  return seq.size();
919  }
920  };
921 
922  const auto exec_desc_count = get_descriptor_count();
923  auto eo_copied = eo;
924  if (seq.hasQueryStepForUnion()) {
925  // we currently do not support resultset recycling when an input query
926  // contains union (all) operation
927  eo_copied.keep_result = false;
928  }
929 
930  // we have to register resultset(s) of the skipped query step(s) as temporary table
931  // before executing the remaining query steps
932  // since they may be required during the query processing
933  // i.e., get metadata of the target expression from the skipped query step
935  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
936  const auto cached_res =
937  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
938  CHECK(cached_res);
939  addTemporaryTable(kv.first, cached_res);
940  }
941  }
942 
943  const auto num_steps = exec_desc_count - 1;
944  for (size_t i = 0; i < exec_desc_count; i++) {
945  VLOG(1) << "Executing query step " << i << " / " << num_steps;
946  try {
948  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
949  } catch (const QueryMustRunOnCpu&) {
950  // Do not allow per-step retry if flag is off or in distributed mode
951  // TODO(todd): Determine if and when we can relax this restriction
952  // for distributed
955  throw;
956  }
957  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
958  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
959  if (render_info && i == num_steps) {
960  // only render on the last step
961  render_info->forceNonInSitu();
962  }
963  executeRelAlgStep(seq,
964  i,
965  co_cpu,
966  eo_copied,
967  (i == num_steps) ? render_info : nullptr,
968  queue_time_ms);
969  } catch (const NativeExecutionError&) {
970  if (!g_enable_interop) {
971  throw;
972  }
973  auto eo_extern = eo_copied;
974  eo_extern.executor_type = ::ExecutorType::Extern;
975  auto exec_desc_ptr = seq.getDescriptor(i);
976  const auto body = exec_desc_ptr->getBody();
977  const auto compound = dynamic_cast<const RelCompound*>(body);
978  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979  LOG(INFO) << "Also failed to run the query using interoperability";
980  throw;
981  }
983  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
984  }
985  }
986 
987  return seq.getDescriptor(num_steps)->getResult();
988 }
989 
991  const RaExecutionSequence& seq,
992  const std::pair<size_t, size_t> interval,
993  const CompilationOptions& co,
994  const ExecutionOptions& eo,
995  RenderInfo* render_info,
996  const int64_t queue_time_ms) {
998  executor_->temporary_tables_ = &temporary_tables_;
1000  time(&now_);
1001  for (size_t i = interval.first; i < interval.second; i++) {
1002  // only render on the last step
1003  try {
1004  executeRelAlgStep(seq,
1005  i,
1006  co,
1007  eo,
1008  (i == interval.second - 1) ? render_info : nullptr,
1009  queue_time_ms);
1010  } catch (const QueryMustRunOnCpu&) {
1011  // Do not allow per-step retry if flag is off or in distributed mode
1012  // TODO(todd): Determine if and when we can relax this restriction
1013  // for distributed
1016  throw;
1017  }
1018  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1019  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1020  if (render_info && i == interval.second - 1) {
1021  render_info->forceNonInSitu();
1022  }
1023  executeRelAlgStep(seq,
1024  i,
1025  co_cpu,
1026  eo,
1027  (i == interval.second - 1) ? render_info : nullptr,
1028  queue_time_ms);
1029  }
1030  }
1031 
1032  return seq.getDescriptor(interval.second - 1)->getResult();
1033 }
1034 
1035 namespace {
1036 void handle_query_hint(RegisteredQueryHint const& query_hints,
1037  ExecutionOptions& eo,
1038  CompilationOptions& co) {
1039  auto columnar_output_hint_enabled = false;
1040  auto rowwise_output_hint_enabled = false;
1041  if (query_hints.isHintRegistered(QueryHint::kCpuMode)) {
1042  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1044  }
1045  if (query_hints.isHintRegistered(QueryHint::kKeepResult)) {
1046  if (!g_enable_data_recycler) {
1047  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1048  "recycler is disabled";
1049  }
1051  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1052  "resultset recycler is disabled";
1053  } else {
1054  VLOG(1) << "A user enables keeping query resultset";
1055  eo.keep_result = true;
1056  }
1057  }
1059  // we use this hint within the function 'executeTableFunction`
1060  if (!g_enable_data_recycler) {
1061  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1062  "since data recycler is disabled";
1063  }
1065  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1066  "since query resultset recycler is disabled";
1067  } else {
1068  VLOG(1) << "A user enables keeping table function's resultset";
1069  eo.keep_result = true;
1070  }
1071  }
1072  if (query_hints.isHintRegistered(QueryHint::kWatchdog)) {
1073  if (!eo.with_watchdog) {
1074  VLOG(1) << "A user enables watchdog for this query";
1075  eo.with_watchdog = true;
1076  }
1077  }
1078  if (query_hints.isHintRegistered(QueryHint::kWatchdogOff)) {
1079  if (eo.with_watchdog) {
1080  VLOG(1) << "A user disables watchdog for this query";
1081  eo.with_watchdog = false;
1082  }
1083  }
1084  if (query_hints.isHintRegistered(QueryHint::kDynamicWatchdog)) {
1085  if (!eo.with_dynamic_watchdog) {
1086  VLOG(1) << "A user enables dynamic watchdog for this query";
1087  eo.with_watchdog = true;
1088  }
1089  }
1091  if (eo.with_dynamic_watchdog) {
1092  VLOG(1) << "A user disables dynamic watchdog for this query";
1093  eo.with_watchdog = false;
1094  }
1095  }
1096  if (query_hints.isHintRegistered(QueryHint::kQueryTimeLimit)) {
1097  std::ostringstream oss;
1098  oss << "A user sets query time limit to " << query_hints.query_time_limit << " ms";
1100  if (!eo.with_dynamic_watchdog) {
1101  eo.with_dynamic_watchdog = true;
1102  oss << " (and system automatically enables dynamic watchdog to activate the "
1103  "given \"query_time_limit\" hint)";
1104  }
1105  VLOG(1) << oss.str();
1106  }
1107  if (query_hints.isHintRegistered(QueryHint::kAllowLoopJoin)) {
1108  VLOG(1) << "A user enables loop join";
1109  eo.allow_loop_joins = true;
1110  }
1111  if (query_hints.isHintRegistered(QueryHint::kDisableLoopJoin)) {
1112  VLOG(1) << "A user disables loop join";
1113  eo.allow_loop_joins = false;
1114  }
1117  VLOG(1) << "A user forces the maximum size of a join hash table as "
1118  << eo.max_join_hash_table_size << " bytes";
1119  }
1121  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize) ||
1123  VLOG(1) << "Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1124  "of the following query hints are given simultaneously: "
1125  "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1126  } else {
1127  VLOG(1) << "A user enables optimization of cuda block and grid sizes";
1129  }
1130  }
1131  if (query_hints.isHintRegistered(QueryHint::kColumnarOutput)) {
1132  VLOG(1) << "A user forces the query to run with columnar output";
1133  columnar_output_hint_enabled = true;
1134  } else if (query_hints.isHintRegistered(QueryHint::kRowwiseOutput)) {
1135  VLOG(1) << "A user forces the query to run with rowwise output";
1136  rowwise_output_hint_enabled = true;
1137  }
1138  auto columnar_output_enabled = eo.output_columnar_hint ? !rowwise_output_hint_enabled
1139  : columnar_output_hint_enabled;
1140  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1141  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1142  "output layout in distributed mode.";
1143  }
1144  eo.output_columnar_hint = columnar_output_enabled;
1145 }
1146 } // namespace
1147 
1149  const size_t step_idx,
1150  const CompilationOptions& co,
1151  const ExecutionOptions& eo,
1152  RenderInfo* render_info,
1153  const int64_t queue_time_ms) {
1155  auto timer = DEBUG_TIMER(__func__);
1156  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1157  CHECK(exec_desc_ptr);
1158  auto& exec_desc = *exec_desc_ptr;
1159  const auto body = exec_desc.getBody();
1160  if (body->isNop()) {
1161  handleNop(exec_desc);
1162  return;
1163  }
1164  ExecutionOptions eo_copied = eo;
1165  CompilationOptions co_copied = co;
1166  eo_copied.with_watchdog =
1167  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1168  eo_copied.outer_fragment_indices =
1169  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1170 
1171  auto target_node = body;
1172  auto query_plan_dag_hash = body->getQueryPlanDagHash();
1173  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1174  target_node = sort_body->getInput(0);
1176  target_node->getQueryPlanDagHash(), SortInfo::createFromSortNode(sort_body));
1177  } else {
1179  target_node->getQueryPlanDagHash(), SortInfo());
1180  }
1181  auto query_hints = getParsedQueryHint(target_node);
1182  if (query_hints) {
1183  handle_query_hint(*query_hints, eo_copied, co_copied);
1184  }
1185 
1187  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1188  if (auto cached_resultset =
1189  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
1190  query_plan_dag_hash)) {
1191  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1192  << " from resultset cache";
1193  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1194  if (render_info) {
1195  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1196  executor_->getResultSetRecyclerHolder().getTargetExprs(query_plan_dag_hash);
1197  std::vector<Analyzer::Expr*> copied_target_exprs;
1198  for (const auto& expr : cached_target_exprs) {
1199  copied_target_exprs.push_back(expr.get());
1200  }
1202  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1203  }
1204  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1205  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1206  return;
1207  }
1208  }
1209 
1210  const auto compound = dynamic_cast<const RelCompound*>(body);
1211  if (compound) {
1212  if (compound->isDeleteViaSelect()) {
1213  executeDelete(compound, co_copied, eo_copied, queue_time_ms);
1214  } else if (compound->isUpdateViaSelect()) {
1215  executeUpdate(compound, co_copied, eo_copied, queue_time_ms);
1216  } else {
1217  exec_desc.setResult(
1218  executeCompound(compound, co_copied, eo_copied, render_info, queue_time_ms));
1219  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1220  << static_cast<int>(-compound->getId()) << ", ...)"
1221  << " exec_desc.getResult().getDataPtr()->rowCount()="
1222  << exec_desc.getResult().getDataPtr()->rowCount();
1223  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1224  return;
1225  }
1226  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1227  }
1228  return;
1229  }
1230  const auto project = dynamic_cast<const RelProject*>(body);
1231  if (project) {
1232  if (project->isDeleteViaSelect()) {
1233  executeDelete(project, co_copied, eo_copied, queue_time_ms);
1234  } else if (project->isUpdateViaSelect()) {
1235  executeUpdate(project, co_copied, eo_copied, queue_time_ms);
1236  } else {
1237  std::optional<size_t> prev_count;
1238  // Disabling the intermediate count optimization in distributed, as the previous
1239  // execution descriptor will likely not hold the aggregated result.
1240  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1241  // If the previous node produced a reliable count, skip the pre-flight count.
1242  RelAlgNode const* const prev_body = project->getInput(0);
1243  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1244  if (RaExecutionDesc const* const prev_exec_desc =
1245  prev_body->hasContextData()
1246  ? prev_body->getContextData()
1247  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1248  const auto& prev_exe_result = prev_exec_desc->getResult();
1249  const auto prev_result = prev_exe_result.getRows();
1250  if (prev_result) {
1251  prev_count = prev_result->rowCount();
1252  VLOG(3) << "Setting output row count for projection node to previous node ("
1253  << prev_exec_desc->getBody()->toString(
1255  << ") to " << *prev_count;
1256  }
1257  }
1258  }
1259  }
1260  exec_desc.setResult(executeProject(
1261  project, co_copied, eo_copied, render_info, queue_time_ms, prev_count));
1262  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1263  << static_cast<int>(-project->getId()) << ", ...)"
1264  << " exec_desc.getResult().getDataPtr()->rowCount()="
1265  << exec_desc.getResult().getDataPtr()->rowCount();
1266  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1267  return;
1268  }
1269  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1270  }
1271  return;
1272  }
1273  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1274  if (aggregate) {
1275  exec_desc.setResult(
1276  executeAggregate(aggregate, co_copied, eo_copied, render_info, queue_time_ms));
1277  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1278  return;
1279  }
1280  const auto filter = dynamic_cast<const RelFilter*>(body);
1281  if (filter) {
1282  exec_desc.setResult(
1283  executeFilter(filter, co_copied, eo_copied, render_info, queue_time_ms));
1284  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1285  return;
1286  }
1287  const auto sort = dynamic_cast<const RelSort*>(body);
1288  if (sort) {
1289  exec_desc.setResult(
1290  executeSort(sort, co_copied, eo_copied, render_info, queue_time_ms));
1291  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1292  return;
1293  }
1294  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1295  return;
1296  }
1297  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1298  if (logical_values) {
1299  exec_desc.setResult(executeLogicalValues(logical_values, eo_copied));
1300  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1301  return;
1302  }
1303  const auto modify = dynamic_cast<const RelModify*>(body);
1304  if (modify) {
1305  exec_desc.setResult(executeModify(modify, eo_copied));
1306  return;
1307  }
1308  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1309  if (logical_union) {
1310  exec_desc.setResult(executeUnion(
1311  logical_union, seq, co_copied, eo_copied, render_info, queue_time_ms));
1312  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1313  return;
1314  }
1315  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1316  if (table_func) {
1317  exec_desc.setResult(
1318  executeTableFunction(table_func, co_copied, eo_copied, queue_time_ms));
1319  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1320  return;
1321  }
1322  LOG(FATAL) << "Unhandled body type: "
1323  << body->toString(RelRexToStringConfig::defaults());
1324 }
1325 
1327  // just set the result of the previous node as the result of no op
1328  auto body = ed.getBody();
1329  CHECK(dynamic_cast<const RelAggregate*>(body));
1330  CHECK_EQ(size_t(1), body->inputCount());
1331  const auto input = body->getInput(0);
1332  body->setOutputMetainfo(input->getOutputMetainfo());
1333  const auto it = temporary_tables_.find(-input->getId());
1334  CHECK(it != temporary_tables_.end());
1335  // set up temp table as it could be used by the outer query or next step
1336  addTemporaryTable(-body->getId(), it->second);
1337 
1338  ed.setResult({it->second, input->getOutputMetainfo()});
1339 }
1340 
1341 namespace {
1342 
1343 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1344  public:
1345  RexUsedInputsVisitor() : RexVisitor() {}
1346 
1347  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1348  return synthesized_physical_inputs_owned;
1349  }
1350 
1351  std::unordered_set<const RexInput*> visitInput(
1352  const RexInput* rex_input) const override {
1353  const auto input_ra = rex_input->getSourceNode();
1354  CHECK(input_ra);
1355  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1356  if (scan_ra) {
1357  const auto td = scan_ra->getTableDescriptor();
1358  if (td) {
1359  const auto col_id = rex_input->getIndex();
1360  const auto cd =
1361  scan_ra->getCatalog().getMetadataForColumnBySpi(td->tableId, col_id + 1);
1362  if (cd && cd->columnType.get_physical_cols() > 0) {
1363  CHECK(IS_GEO(cd->columnType.get_type()));
1364  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1365  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1366  auto physical_input =
1367  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1368  synthesized_physical_inputs_owned.emplace_back(physical_input);
1369  synthesized_physical_inputs.insert(physical_input);
1370  }
1371  return synthesized_physical_inputs;
1372  }
1373  }
1374  }
1375  return {rex_input};
1376  }
1377 
1378  protected:
1379  std::unordered_set<const RexInput*> aggregateResult(
1380  const std::unordered_set<const RexInput*>& aggregate,
1381  const std::unordered_set<const RexInput*>& next_result) const override {
1382  auto result = aggregate;
1383  result.insert(next_result.begin(), next_result.end());
1384  return result;
1385  }
1386 
1387  private:
1388  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1389 };
1390 
1391 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1392  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1393  return table_func;
1394  }
1395  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1396  CHECK_EQ(size_t(2), join->inputCount());
1397  return join;
1398  }
1399  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1400  CHECK_EQ(size_t(1), ra_node->inputCount());
1401  }
1402  auto only_src = ra_node->getInput(0);
1403  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1404  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1405  return is_join ? only_src : ra_node;
1406 }
1407 
1408 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1409 get_used_inputs(const RelCompound* compound) {
1410  RexUsedInputsVisitor visitor;
1411  const auto filter_expr = compound->getFilterExpr();
1412  std::unordered_set<const RexInput*> used_inputs =
1413  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1414  const auto sources_size = compound->getScalarSourcesSize();
1415  for (size_t i = 0; i < sources_size; ++i) {
1416  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1417  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1418  }
1419  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1420  return std::make_pair(used_inputs, used_inputs_owned);
1421 }
1422 
1423 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1424 get_used_inputs(const RelAggregate* aggregate) {
1425  CHECK_EQ(size_t(1), aggregate->inputCount());
1426  std::unordered_set<const RexInput*> used_inputs;
1427  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1428  const auto source = aggregate->getInput(0);
1429  const auto& in_metainfo = source->getOutputMetainfo();
1430  const auto group_count = aggregate->getGroupByCount();
1431  CHECK_GE(in_metainfo.size(), group_count);
1432  for (size_t i = 0; i < group_count; ++i) {
1433  auto synthesized_used_input = new RexInput(source, i);
1434  used_inputs_owned.emplace_back(synthesized_used_input);
1435  used_inputs.insert(synthesized_used_input);
1436  }
1437  for (const auto& agg_expr : aggregate->getAggExprs()) {
1438  for (size_t i = 0; i < agg_expr->size(); ++i) {
1439  const auto operand_idx = agg_expr->getOperand(i);
1440  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1441  auto synthesized_used_input = new RexInput(source, operand_idx);
1442  used_inputs_owned.emplace_back(synthesized_used_input);
1443  used_inputs.insert(synthesized_used_input);
1444  }
1445  }
1446  return std::make_pair(used_inputs, used_inputs_owned);
1447 }
1448 
1449 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1450 get_used_inputs(const RelProject* project) {
1451  RexUsedInputsVisitor visitor;
1452  std::unordered_set<const RexInput*> used_inputs;
1453  for (size_t i = 0; i < project->size(); ++i) {
1454  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1455  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1456  }
1457  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1458  return std::make_pair(used_inputs, used_inputs_owned);
1459 }
1460 
1461 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1462 get_used_inputs(const RelTableFunction* table_func) {
1463  RexUsedInputsVisitor visitor;
1464  std::unordered_set<const RexInput*> used_inputs;
1465  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1466  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1467  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1468  }
1469  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1470  return std::make_pair(used_inputs, used_inputs_owned);
1471 }
1472 
1473 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1474 get_used_inputs(const RelFilter* filter) {
1475  std::unordered_set<const RexInput*> used_inputs;
1476  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1477  const auto data_sink_node = get_data_sink(filter);
1478  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1479  const auto source = data_sink_node->getInput(nest_level);
1480  const auto scan_source = dynamic_cast<const RelScan*>(source);
1481  if (scan_source) {
1482  CHECK(source->getOutputMetainfo().empty());
1483  for (size_t i = 0; i < scan_source->size(); ++i) {
1484  auto synthesized_used_input = new RexInput(scan_source, i);
1485  used_inputs_owned.emplace_back(synthesized_used_input);
1486  used_inputs.insert(synthesized_used_input);
1487  }
1488  } else {
1489  const auto& partial_in_metadata = source->getOutputMetainfo();
1490  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1491  auto synthesized_used_input = new RexInput(source, i);
1492  used_inputs_owned.emplace_back(synthesized_used_input);
1493  used_inputs.insert(synthesized_used_input);
1494  }
1495  }
1496  }
1497  return std::make_pair(used_inputs, used_inputs_owned);
1498 }
1499 
1500 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1501 get_used_inputs(const RelLogicalUnion* logical_union) {
1502  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1503  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1504  used_inputs_owned.reserve(logical_union->inputCount());
1505  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1506  auto const n_inputs = logical_union->inputCount();
1507  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1508  auto input = logical_union->getInput(nest_level);
1509  for (size_t i = 0; i < input->size(); ++i) {
1510  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1511  used_inputs.insert(used_inputs_owned.back().get());
1512  }
1513  }
1514  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1515 }
1516 
1518  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1519  shared::TableKey table_key{0, int32_t(-ra_node->getId())};
1520  if (scan_ra) {
1521  table_key.db_id = scan_ra->getCatalog().getDatabaseId();
1522  const auto td = scan_ra->getTableDescriptor();
1523  CHECK(td);
1524  table_key.table_id = td->tableId;
1525  }
1526  return table_key;
1527 }
1528 
1529 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1530  const RelAlgNode* ra_node,
1531  const std::vector<size_t>& input_permutation) {
1532  const auto data_sink_node = get_data_sink(ra_node);
1533  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1534  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1535  const auto input_node_idx =
1536  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1537  const auto input_ra = data_sink_node->getInput(input_node_idx);
1538  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1539  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1540  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1541  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1542  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1543  CHECK(it_ok.second);
1544  LOG_IF(DEBUG1, !input_permutation.empty())
1545  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1546  << " to nest level " << input_idx;
1547  }
1548  return input_to_nest_level;
1549 }
1550 
1551 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1553  const auto data_sink_node = get_data_sink(ra_node);
1554  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1555  CHECK_EQ(join->inputCount(), 2u);
1556  const auto condition = join->getCondition();
1557  RexUsedInputsVisitor visitor;
1558  auto condition_inputs = visitor.visit(condition);
1559  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1560  visitor.get_inputs_owned());
1561  return std::make_pair(condition_inputs, condition_inputs_owned);
1562  }
1563 
1564  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1565  CHECK_GE(left_deep_join->inputCount(), 2u);
1566  const auto condition = left_deep_join->getInnerCondition();
1567  RexUsedInputsVisitor visitor;
1568  auto result = visitor.visit(condition);
1569  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1570  ++nesting_level) {
1571  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1572  if (outer_condition) {
1573  const auto outer_result = visitor.visit(outer_condition);
1574  result.insert(outer_result.begin(), outer_result.end());
1575  }
1576  }
1577  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1578  return std::make_pair(result, used_inputs_owned);
1579  }
1580 
1581  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1582  CHECK_GT(ra_node->inputCount(), 1u)
1584  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1585  // no-op
1586  CHECK_GE(ra_node->inputCount(), 0u)
1588  } else {
1589  CHECK_EQ(ra_node->inputCount(), 1u)
1591  }
1592  return std::make_pair(std::unordered_set<const RexInput*>{},
1593  std::vector<std::shared_ptr<RexInput>>{});
1594 }
1595 
1597  std::vector<InputDescriptor>& input_descs,
1598  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1599  const RelAlgNode* ra_node,
1600  const std::unordered_set<const RexInput*>& source_used_inputs,
1601  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1602  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1603  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1604  << " source_used_inputs.size()=" << source_used_inputs.size();
1605  for (const auto used_input : source_used_inputs) {
1606  const auto input_ra = used_input->getSourceNode();
1607  const auto table_key = table_key_from_ra(input_ra);
1608  auto col_id = used_input->getIndex();
1609  auto it = input_to_nest_level.find(input_ra);
1610  if (it != input_to_nest_level.end()) {
1611  const int nest_level = it->second;
1612  if (auto rel_scan = dynamic_cast<const RelScan*>(input_ra)) {
1613  const auto& catalog = rel_scan->getCatalog();
1614  col_id = catalog.getColumnIdBySpi(table_key.table_id, col_id + 1);
1615  }
1616  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1617  col_id, table_key.table_id, table_key.db_id, nest_level));
1618  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1619  throw std::runtime_error("Bushy joins not supported");
1620  }
1621  }
1622 }
1623 
1624 template <class RA>
1625 std::pair<std::vector<InputDescriptor>,
1626  std::list<std::shared_ptr<const InputColDescriptor>>>
1627 get_input_desc_impl(const RA* ra_node,
1628  const std::unordered_set<const RexInput*>& used_inputs,
1629  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1630  const std::vector<size_t>& input_permutation) {
1631  std::vector<InputDescriptor> input_descs;
1632  const auto data_sink_node = get_data_sink(ra_node);
1633  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1634  const auto input_node_idx =
1635  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1636  auto input_ra = data_sink_node->getInput(input_node_idx);
1637  const auto table_key = table_key_from_ra(input_ra);
1638  input_descs.emplace_back(table_key.db_id, table_key.table_id, input_idx);
1639  }
1640  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1641  collect_used_input_desc(input_descs,
1642  input_col_descs_unique, // modified
1643  ra_node,
1644  used_inputs,
1645  input_to_nest_level);
1646  std::unordered_set<const RexInput*> join_source_used_inputs;
1647  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1648  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1649  get_join_source_used_inputs(ra_node);
1650  collect_used_input_desc(input_descs,
1651  input_col_descs_unique, // modified
1652  ra_node,
1653  join_source_used_inputs,
1654  input_to_nest_level);
1655  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1656  input_col_descs_unique.begin(), input_col_descs_unique.end());
1657 
1658  std::sort(input_col_descs.begin(),
1659  input_col_descs.end(),
1660  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1661  std::shared_ptr<const InputColDescriptor> const& rhs) {
1662  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1663  lhs->getColId(),
1664  lhs->getScanDesc().getTableKey()) <
1665  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1666  rhs->getColId(),
1667  rhs->getScanDesc().getTableKey());
1668  });
1669  return {input_descs,
1670  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1671  input_col_descs.end())};
1672 }
1673 
1674 template <class RA>
1675 std::tuple<std::vector<InputDescriptor>,
1676  std::list<std::shared_ptr<const InputColDescriptor>>,
1677  std::vector<std::shared_ptr<RexInput>>>
1678 get_input_desc(const RA* ra_node,
1679  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1680  const std::vector<size_t>& input_permutation) {
1681  std::unordered_set<const RexInput*> used_inputs;
1682  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1683  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node);
1684  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1685  auto input_desc_pair =
1686  get_input_desc_impl(ra_node, used_inputs, input_to_nest_level, input_permutation);
1687  return std::make_tuple(
1688  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1689 }
1690 
1691 size_t get_scalar_sources_size(const RelCompound* compound) {
1692  return compound->getScalarSourcesSize();
1693 }
1694 
1695 size_t get_scalar_sources_size(const RelProject* project) {
1696  return project->size();
1697 }
1698 
1699 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1700  return table_func->getTableFuncInputsSize();
1701 }
1702 
1703 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1704  return compound->getScalarSource(i);
1705 }
1706 
1707 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1708  return project->getProjectAt(i);
1709 }
1710 
1711 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1712  return table_func->getTableFuncInputAt(i);
1713 }
1714 
1715 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1716  const std::shared_ptr<Analyzer::Expr> expr) {
1717  const auto& ti = expr->get_type_info();
1718  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1719  return expr;
1720  }
1721  auto transient_dict_ti = ti;
1722  transient_dict_ti.set_compression(kENCODING_DICT);
1723  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1724  transient_dict_ti.setStringDictKey(shared::StringDictKey::kTransientDictKey);
1725  transient_dict_ti.set_fixed_size();
1726  return expr->add_cast(transient_dict_ti);
1727 }
1728 
1730  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1731  const std::shared_ptr<Analyzer::Expr>& expr) {
1732  try {
1733  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1734  } catch (...) {
1735  scalar_sources.push_back(fold_expr(expr.get()));
1736  }
1737 }
1738 
1739 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1740  const std::shared_ptr<Analyzer::Expr>& input) {
1741  const auto& input_ti = input->get_type_info();
1742  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1743  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1744  }
1745  return input;
1746 }
1747 
1748 template <class RA>
1749 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1750  const RA* ra_node,
1751  const RelAlgTranslator& translator,
1752  const ::ExecutorType executor_type) {
1753  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1754  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1755  VLOG(3) << "get_scalar_sources_size("
1756  << ra_node->toString(RelRexToStringConfig::defaults())
1757  << ") = " << scalar_sources_size;
1758  for (size_t i = 0; i < scalar_sources_size; ++i) {
1759  const auto scalar_rex = scalar_at(i, ra_node);
1760  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1761  // RexRef are synthetic scalars we append at the end of the real ones
1762  // for the sake of taking memory ownership, no real work needed here.
1763  continue;
1764  }
1765 
1766  const auto scalar_expr =
1767  rewrite_array_elements(translator.translate(scalar_rex).get());
1768  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1769  if (executor_type == ExecutorType::Native) {
1770  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1771  } else if (executor_type == ExecutorType::TableFunctions) {
1772  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1773  } else {
1774  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1775  }
1776  }
1777 
1778  return scalar_sources;
1779 }
1780 
1781 template <class RA>
1782 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1783  const RA* ra_node,
1784  const RelAlgTranslator& translator,
1785  int32_t tableId,
1787  const ColumnNameList& colNames,
1788  size_t starting_projection_column_idx) {
1789  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1790  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1791  const auto scalar_rex = scalar_at(i, ra_node);
1792  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1793  // RexRef are synthetic scalars we append at the end of the real ones
1794  // for the sake of taking memory ownership, no real work needed here.
1795  continue;
1796  }
1797 
1798  std::shared_ptr<Analyzer::Expr> translated_expr;
1799  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1800  translated_expr = cast_to_column_type(translator.translate(scalar_rex),
1801  tableId,
1802  cat,
1803  colNames[i - starting_projection_column_idx]);
1804  } else {
1805  translated_expr = translator.translate(scalar_rex);
1806  }
1807  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1808  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1809  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1810  }
1811 
1812  return scalar_sources;
1813 }
1814 
1815 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1816  const RelCompound* compound,
1817  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1818  if (!compound->isAggregate()) {
1819  return {nullptr};
1820  }
1821  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1822  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1823  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1824  }
1825  return groupby_exprs;
1826 }
1827 
1828 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1829  const RelAggregate* aggregate,
1830  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1831  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1832  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1833  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1834  }
1835  return groupby_exprs;
1836 }
1837 
1839  const RelAlgTranslator& translator) {
1840  const auto filter_rex = compound->getFilterExpr();
1841  const auto filter_expr = filter_rex ? translator.translate(filter_rex) : nullptr;
1842  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1844 }
1845 
1846 namespace {
1847 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1848 // bother decoding it. This is done by changing the sql type to an integer.
1850  size_t target_expr_idx,
1851  std::shared_ptr<Analyzer::Expr>& target_expr,
1852  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos) {
1853  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1854  CHECK(agg_expr);
1855  if (agg_expr->get_is_distinct()) {
1856  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1857  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1858  target_exprs_type_infos.emplace(target_expr_idx, ti);
1859  target_expr = target_expr->deep_copy();
1860  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1862  return;
1863  }
1864  }
1865  target_exprs_type_infos.emplace(target_expr_idx, target_expr->get_type_info());
1866 }
1867 } // namespace
1868 
1869 std::vector<Analyzer::Expr*> translate_targets(
1870  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1871  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1872  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1873  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1874  const RelCompound* compound,
1875  const RelAlgTranslator& translator,
1876  const ExecutorType executor_type) {
1877  std::vector<Analyzer::Expr*> target_exprs;
1878  for (size_t i = 0; i < compound->size(); ++i) {
1879  const auto target_rex = compound->getTargetExpr(i);
1880  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1881  std::shared_ptr<Analyzer::Expr> target_expr;
1882  if (target_rex_agg) {
1883  target_expr =
1884  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1885  conditionally_change_arg_to_int_type(i, target_expr, target_exprs_type_infos);
1886  } else {
1887  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1888  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1889  if (target_rex_ref) {
1890  const auto ref_idx = target_rex_ref->getIndex();
1891  CHECK_GE(ref_idx, size_t(1));
1892  CHECK_LE(ref_idx, groupby_exprs.size());
1893  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1894  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1895  } else {
1896  target_expr =
1897  rewrite_array_elements(translator.translate(target_rex_scalar).get());
1898  auto rewritten_expr = rewrite_expr(target_expr.get());
1899  target_expr = fold_expr(rewritten_expr.get());
1900  if (executor_type == ExecutorType::Native) {
1901  try {
1902  target_expr = set_transient_dict(target_expr);
1903  } catch (...) {
1904  // noop
1905  }
1906  } else {
1907  target_expr = cast_dict_to_none(target_expr);
1908  }
1909  }
1910  target_exprs_type_infos.emplace(i, target_expr->get_type_info());
1911  }
1912  CHECK(target_expr);
1913  target_exprs_owned.push_back(target_expr);
1914  target_exprs.push_back(target_expr.get());
1915  }
1916  return target_exprs;
1917 }
1918 
1919 std::vector<Analyzer::Expr*> translate_targets(
1920  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1921  std::unordered_map<size_t, SQLTypeInfo>& target_exprs_type_infos,
1922  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1923  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1924  const RelAggregate* aggregate,
1925  const RelAlgTranslator& translator) {
1926  std::vector<Analyzer::Expr*> target_exprs;
1927  size_t group_key_idx = 1;
1928  for (const auto& groupby_expr : groupby_exprs) {
1929  auto target_expr =
1930  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1931  target_exprs_owned.push_back(target_expr);
1932  target_exprs.push_back(target_expr.get());
1933  }
1934 
1935  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1936  auto target_expr =
1937  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1938  CHECK(target_expr);
1939  target_expr = fold_expr(target_expr.get());
1940  target_exprs_owned.push_back(target_expr);
1941  target_exprs.push_back(target_expr.get());
1942  }
1943  return target_exprs;
1944 }
1945 
1947  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1948  return agg_expr && agg_expr->get_is_distinct();
1949 }
1950 
1951 bool is_agg(const Analyzer::Expr* expr) {
1952  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1953  if (agg_expr && agg_expr->get_contains_agg()) {
1954  auto agg_type = agg_expr->get_aggtype();
1956  SQLAgg::kMAX,
1957  SQLAgg::kSUM,
1959  SQLAgg::kAVG>(agg_type)) {
1960  return true;
1961  }
1962  }
1963  return false;
1964 }
1965 
1967  if (is_count_distinct(expr)) {
1968  return SQLTypeInfo(kBIGINT, true);
1969  } else if (is_agg(expr)) {
1971  }
1972  return get_logical_type_info(expr->get_type_info());
1973 }
1974 
1975 template <class RA>
1976 std::vector<TargetMetaInfo> get_targets_meta(
1977  const RA* ra_node,
1978  const std::vector<Analyzer::Expr*>& target_exprs) {
1979  std::vector<TargetMetaInfo> targets_meta;
1980  CHECK_EQ(ra_node->size(), target_exprs.size());
1981  for (size_t i = 0; i < ra_node->size(); ++i) {
1982  CHECK(target_exprs[i]);
1983  // TODO(alex): remove the count distinct type fixup.
1984  auto ti = get_logical_type_for_expr(target_exprs[i]);
1985  targets_meta.emplace_back(
1986  ra_node->getFieldName(i), ti, target_exprs[i]->get_type_info());
1987  }
1988  return targets_meta;
1989 }
1990 
1991 template <>
1992 std::vector<TargetMetaInfo> get_targets_meta(
1993  const RelFilter* filter,
1994  const std::vector<Analyzer::Expr*>& target_exprs) {
1995  RelAlgNode const* input0 = filter->getInput(0);
1996  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1997  return get_targets_meta(input, target_exprs);
1998  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1999  return get_targets_meta(input, target_exprs);
2000  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
2001  return get_targets_meta(input, target_exprs);
2002  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
2003  return get_targets_meta(input, target_exprs);
2004  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
2005  return get_targets_meta(input, target_exprs);
2006  } else if (auto const* input = dynamic_cast<RelLogicalValues const*>(input0)) {
2007  return get_targets_meta(input, target_exprs);
2008  }
2009  UNREACHABLE() << "Unhandled node type: "
2011  return {};
2012 }
2013 
2014 } // namespace
2015 
2017  const CompilationOptions& co_in,
2018  const ExecutionOptions& eo_in,
2019  const int64_t queue_time_ms) {
2020  CHECK(node);
2021  auto timer = DEBUG_TIMER(__func__);
2022 
2023  auto co = co_in;
2024  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2025  // encoded string updates
2026 
2027  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2028  auto& work_unit,
2029  const bool is_aggregate) {
2030  auto table_descriptor = node->getModifiedTableDescriptor();
2031  CHECK(table_descriptor);
2032  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2033  throw std::runtime_error(
2034  "UPDATE queries involving variable length columns are only supported on tables "
2035  "with the vacuum attribute set to 'delayed'");
2036  }
2037 
2038  auto catalog = node->getModifiedTableCatalog();
2039  CHECK(catalog);
2040  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2041 
2043  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2044  *catalog,
2045  node->getTargetColumns(),
2046  node->getOutputMetainfo(),
2047  node->isVarlenUpdateRequired());
2048 
2049  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2050 
2051  auto execute_update_ra_exe_unit =
2052  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2053  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2055 
2056  auto eo = eo_in;
2057  if (dml_transaction_parameters_->tableIsTemporary()) {
2058  eo.output_columnar_hint = true;
2059  co_project.allow_lazy_fetch = false;
2060  co_project.filter_on_deleted_column =
2061  false; // project the entire delete column for columnar update
2062  }
2063 
2064  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2065  dml_transaction_parameters_.get());
2066  update_transaction_parameters->setInputSourceNode(node);
2067  CHECK(update_transaction_parameters);
2068  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2069  try {
2070  auto table_update_metadata =
2071  executor_->executeUpdate(ra_exe_unit,
2072  table_infos,
2073  table_descriptor,
2074  co_project,
2075  eo,
2076  *catalog,
2077  executor_->row_set_mem_owner_,
2078  update_callback,
2079  is_aggregate);
2080  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2081  dml_transaction_parameters_->finalizeTransaction(*catalog);
2082  TableOptimizer table_optimizer{
2083  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2084  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2085  };
2086  } catch (const QueryExecutionError& e) {
2087  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2088  }
2089  };
2090 
2091  if (dml_transaction_parameters_->tableIsTemporary()) {
2092  // hold owned target exprs during execution if rewriting
2093  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2094  // rewrite temp table updates to generate the full column by moving the where
2095  // clause into a case if such a rewrite is not possible, bail on the update
2096  // operation build an expr for the update target
2097  auto update_transaction_params =
2098  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2099  CHECK(update_transaction_params);
2100  const auto td = update_transaction_params->getTableDescriptor();
2101  CHECK(td);
2102  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2103  if (update_column_names.size() > 1) {
2104  throw std::runtime_error(
2105  "Multi-column update is not yet supported for temporary tables.");
2106  }
2107 
2108  const auto cd =
2109  catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2110  CHECK(cd);
2111  auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2112  cd->columnType,
2113  shared::ColumnKey{catalog->getDatabaseId(), td->tableId, cd->columnId},
2114  0);
2115  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2116  work_unit.exe_unit, projected_column_to_update);
2117  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2118  throw std::runtime_error(
2119  "Variable length updates not yet supported on temporary tables.");
2120  }
2121  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2122  } else {
2123  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2124  }
2125  };
2126 
2127  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2128  auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2129 
2130  execute_update_for_node(compound, work_unit, compound->isAggregate());
2131  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2132  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2133 
2134  if (project->isSimple()) {
2135  CHECK_EQ(size_t(1), project->inputCount());
2136  const auto input_ra = project->getInput(0);
2137  if (dynamic_cast<const RelSort*>(input_ra)) {
2138  const auto& input_table =
2139  get_temporary_table(&temporary_tables_, -input_ra->getId());
2140  CHECK(input_table);
2141  work_unit.exe_unit.scan_limit = input_table->rowCount();
2142  }
2143  }
2144  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2145  // the first condition means this project node has at least one window function
2146  // and the second condition indicates that this project node falls into
2147  // one of the following cases:
2148  // 1) window function expression on a multi-fragmented table
2149  // 2) window function expression is too complex to evaluate without codegen:
2150  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2151  // evaluate such a complex window function expression
2152  // 3) nested window function expression
2153  // but currently we do not support update on a multi-fragmented table having
2154  // window function, so the second condition only refers to non-fragmented table with
2155  // cases 2) or 3)
2156  // if at least one of two conditions satisfy, we must compute corresponding window
2157  // context before entering `execute_update_for_node` to properly update the table
2158  if (!leaf_results_.empty()) {
2159  throw std::runtime_error(
2160  "Update query having window function is not yet supported in distributed "
2161  "mode.");
2162  }
2163  ColumnCacheMap column_cache;
2164  co.device_type = ExecutorDeviceType::CPU;
2165  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2166  }
2167  execute_update_for_node(project, work_unit, false);
2168  } else {
2169  throw std::runtime_error("Unsupported parent node for update: " +
2170  node->toString(RelRexToStringConfig::defaults()));
2171  }
2172 }
2173 
2175  const CompilationOptions& co,
2176  const ExecutionOptions& eo_in,
2177  const int64_t queue_time_ms) {
2178  CHECK(node);
2179  auto timer = DEBUG_TIMER(__func__);
2180 
2181  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2182  auto& work_unit,
2183  const bool is_aggregate) {
2184  auto* table_descriptor = node->getModifiedTableDescriptor();
2185  CHECK(table_descriptor);
2186  if (!table_descriptor->hasDeletedCol) {
2187  throw std::runtime_error(
2188  "DELETE queries are only supported on tables with the vacuum attribute set to "
2189  "'delayed'");
2190  }
2191 
2192  const auto catalog = node->getModifiedTableCatalog();
2193  CHECK(catalog);
2194  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2195 
2196  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2197 
2198  auto execute_delete_ra_exe_unit =
2199  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2200  const auto& exe_unit, const bool is_aggregate) {
2202  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2203  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2205  CHECK(delete_params);
2206  auto delete_callback = yieldDeleteCallback(*delete_params);
2208 
2209  auto eo = eo_in;
2210  if (dml_transaction_parameters_->tableIsTemporary()) {
2211  eo.output_columnar_hint = true;
2212  co_delete.filter_on_deleted_column =
2213  false; // project the entire delete column for columnar update
2214  } else {
2215  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2216  }
2217 
2218  try {
2219  auto table_update_metadata =
2220  executor_->executeUpdate(exe_unit,
2221  table_infos,
2222  table_descriptor,
2223  co_delete,
2224  eo,
2225  *catalog,
2226  executor_->row_set_mem_owner_,
2227  delete_callback,
2228  is_aggregate);
2229  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2230  dml_transaction_parameters_->finalizeTransaction(*catalog);
2231  TableOptimizer table_optimizer{
2232  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2233  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2234  };
2235  } catch (const QueryExecutionError& e) {
2236  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2237  }
2238  };
2239 
2240  if (table_is_temporary(table_descriptor)) {
2241  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2242  const auto cd = catalog->getDeletedColumn(table_descriptor);
2243  CHECK(cd);
2244  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2245  cd->columnType,
2247  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2248  0);
2249  const auto rewritten_exe_unit =
2250  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2251  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2252  } else {
2253  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2254  }
2255  };
2256 
2257  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2258  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2259  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2260  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2261  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2262  if (project->isSimple()) {
2263  CHECK_EQ(size_t(1), project->inputCount());
2264  const auto input_ra = project->getInput(0);
2265  if (dynamic_cast<const RelSort*>(input_ra)) {
2266  const auto& input_table =
2267  get_temporary_table(&temporary_tables_, -input_ra->getId());
2268  CHECK(input_table);
2269  work_unit.exe_unit.scan_limit = input_table->rowCount();
2270  }
2271  }
2272  execute_delete_for_node(project, work_unit, false);
2273  } else {
2274  throw std::runtime_error("Unsupported parent node for delete: " +
2275  node->toString(RelRexToStringConfig::defaults()));
2276  }
2277 }
2278 
2280  const CompilationOptions& co,
2281  const ExecutionOptions& eo,
2282  RenderInfo* render_info,
2283  const int64_t queue_time_ms) {
2284  auto timer = DEBUG_TIMER(__func__);
2285  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
2286  CompilationOptions co_compound = co;
2287  return executeWorkUnit(work_unit,
2288  compound->getOutputMetainfo(),
2289  compound->isAggregate(),
2290  co_compound,
2291  eo,
2292  render_info,
2293  queue_time_ms);
2294 }
2295 
2297  const CompilationOptions& co,
2298  const ExecutionOptions& eo,
2299  RenderInfo* render_info,
2300  const int64_t queue_time_ms) {
2301  auto timer = DEBUG_TIMER(__func__);
2302  const auto work_unit = createAggregateWorkUnit(aggregate, SortInfo(), eo.just_explain);
2303  return executeWorkUnit(work_unit,
2304  aggregate->getOutputMetainfo(),
2305  true,
2306  co,
2307  eo,
2308  render_info,
2309  queue_time_ms);
2310 }
2311 
2312 namespace {
2313 
2314 // Returns true iff the execution unit contains window functions.
2316  return std::any_of(ra_exe_unit.target_exprs.begin(),
2317  ra_exe_unit.target_exprs.end(),
2318  [](const Analyzer::Expr* expr) {
2319  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2320  });
2321 }
2322 
2323 } // namespace
2324 
2326  const RelProject* project,
2327  const CompilationOptions& co,
2328  const ExecutionOptions& eo,
2329  RenderInfo* render_info,
2330  const int64_t queue_time_ms,
2331  const std::optional<size_t> previous_count) {
2332  auto timer = DEBUG_TIMER(__func__);
2333  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
2334  CompilationOptions co_project = co;
2335  if (project->isSimple()) {
2336  CHECK_EQ(size_t(1), project->inputCount());
2337  const auto input_ra = project->getInput(0);
2338  if (dynamic_cast<const RelSort*>(input_ra)) {
2339  co_project.device_type = ExecutorDeviceType::CPU;
2340  const auto& input_table =
2341  get_temporary_table(&temporary_tables_, -input_ra->getId());
2342  CHECK(input_table);
2343  work_unit.exe_unit.scan_limit =
2344  std::min(input_table->getLimit(), input_table->rowCount());
2345  }
2346  }
2347  return executeWorkUnit(work_unit,
2348  project->getOutputMetainfo(),
2349  false,
2350  co_project,
2351  eo,
2352  render_info,
2353  queue_time_ms,
2354  previous_count);
2355 }
2356 
2358  const CompilationOptions& co_in,
2359  const ExecutionOptions& eo,
2360  const int64_t queue_time_ms) {
2362  auto timer = DEBUG_TIMER(__func__);
2363 
2364  auto co = co_in;
2365 
2366  if (g_cluster) {
2367  throw std::runtime_error("Table functions not supported in distributed mode yet");
2368  }
2369  if (!g_enable_table_functions) {
2370  throw std::runtime_error("Table function support is disabled");
2371  }
2372  auto table_func_work_unit = createTableFunctionWorkUnit(
2373  table_func,
2374  eo.just_explain,
2375  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2376  const auto body = table_func_work_unit.body;
2377  CHECK(body);
2378 
2379  const auto table_infos =
2380  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2381 
2382  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2383  co.device_type,
2385  nullptr,
2386  executor_->blockSize(),
2387  executor_->gridSize()),
2388  {}};
2389 
2390  auto global_hint = getGlobalQueryHint();
2391  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2392  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2393  auto cached_resultset =
2394  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
2395  table_func->getQueryPlanDagHash());
2396  if (cached_resultset) {
2397  VLOG(1) << "recycle table function's resultset of the root node "
2398  << table_func->getRelNodeDagId() << " from resultset cache";
2399  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2400  addTemporaryTable(-body->getId(), result.getDataPtr());
2401  return result;
2402  }
2403  }
2404 
2405  auto query_exec_time_begin = timer_start();
2406  try {
2407  result = {executor_->executeTableFunction(
2408  table_func_work_unit.exe_unit, table_infos, co, eo),
2409  body->getOutputMetainfo()};
2410  } catch (const QueryExecutionError& e) {
2413  throw std::runtime_error("Table function ran out of memory during execution");
2414  }
2415  auto query_exec_time = timer_stop(query_exec_time_begin);
2416  result.setQueueTime(queue_time_ms);
2417  auto resultset_ptr = result.getDataPtr();
2418  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2420  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2422  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2423  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2424  !hasStepForUnion()) {
2425  resultset_ptr->setExecTime(query_exec_time);
2426  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2427  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2428  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2429  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2430  if (allow_auto_caching_resultset) {
2431  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2432  }
2433  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
2434  table_func_work_unit.exe_unit.query_plan_dag_hash,
2435  resultset_ptr->getInputTableKeys(),
2436  resultset_ptr,
2437  resultset_ptr->getBufferSizeBytes(co.device_type),
2439  } else {
2440  if (eo.keep_result) {
2441  if (g_cluster) {
2442  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2443  "support resultset recycling on distributed mode";
2444  } else if (hasStepForUnion()) {
2445  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2446  "has union-(all) operator";
2447  } else if (is_validate_or_explain_query(eo)) {
2448  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2449  "is either validate or explain query";
2450  } else {
2451  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2452  }
2453  }
2454  }
2455 
2456  return result;
2457 }
2458 
2459 namespace {
2460 
2461 // Creates a new expression which has the range table index set to 1. This is needed to
2462 // reuse the hash join construction helpers to generate a hash table for the window
2463 // function partition: create an equals expression with left and right sides identical
2464 // except for the range table index.
2465 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2466  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2467  if (tuple) {
2468  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2469  for (const auto& element : tuple->getTuple()) {
2470  transformed_tuple.push_back(transform_to_inner(element.get()));
2471  }
2472  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2473  }
2474  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2475  if (!col) {
2476  throw std::runtime_error("Only columns supported in the window partition for now");
2477  }
2478  return makeExpr<Analyzer::ColumnVar>(col->get_type_info(), col->getColumnKey(), 1);
2479 }
2480 
2481 } // namespace
2482 
2484  const CompilationOptions& co,
2485  const ExecutionOptions& eo,
2486  ColumnCacheMap& column_cache_map,
2487  const int64_t queue_time_ms) {
2488  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2489  CHECK_EQ(query_infos.size(), size_t(1));
2490  if (query_infos.front().info.fragments.size() != 1) {
2491  throw std::runtime_error(
2492  "Only single fragment tables supported for window functions for now");
2493  }
2494  if (eo.executor_type == ::ExecutorType::Extern) {
2495  return;
2496  }
2497  query_infos.push_back(query_infos.front());
2498  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2499  // a query may hold multiple window functions having the same partition by condition
2500  // then after building the first hash partition we can reuse it for the rest of
2501  // the window functions
2502  // here, a cached partition can be shared via multiple window function contexts as is
2503  // but sorted partition should be copied to reuse since we use it for (intermediate)
2504  // output buffer
2505  // todo (yoonmin) : support recycler for window function computation?
2506  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2507  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2508  sorted_partition_cache;
2509  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2510  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2511  window_function_context_map;
2512  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2513  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2514  ++target_index) {
2515  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2516  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2517  if (!window_func) {
2518  continue;
2519  }
2520  // Always use baseline layout hash tables for now, make the expression a tuple.
2521  const auto& partition_keys = window_func->getPartitionKeys();
2522  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2523  if (partition_keys.size() >= 1) {
2524  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2525  if (partition_keys.size() > 1) {
2526  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2527  } else {
2528  CHECK_EQ(partition_keys.size(), size_t(1));
2529  partition_key_tuple = partition_keys.front();
2530  }
2531  // Creates a tautology equality with the partition expression on both sides.
2532  partition_key_cond =
2533  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2534  kBW_EQ,
2535  kONE,
2536  partition_key_tuple,
2537  transform_to_inner(partition_key_tuple.get()));
2538  }
2539  auto context =
2540  createWindowFunctionContext(window_func,
2541  partition_key_cond /*nullptr if no partition key*/,
2542  partition_cache,
2543  sorted_partition_key_ref_count_map,
2544  work_unit,
2545  query_infos,
2546  co,
2547  column_cache_map,
2548  executor_->getRowSetMemoryOwner());
2549  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2550  }
2551 
2552  for (auto& kv : window_function_context_map) {
2553  kv.second->compute(
2554  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2555  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2556  }
2557 }
2558 
2559 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2560  const Analyzer::WindowFunction* window_func,
2561  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2562  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2563  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2564  const WorkUnit& work_unit,
2565  const std::vector<InputTableInfo>& query_infos,
2566  const CompilationOptions& co,
2567  ColumnCacheMap& column_cache_map,
2568  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2569  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2570  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2573  std::unique_ptr<WindowFunctionContext> context;
2574  auto partition_cache_key = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
2575  work_unit.body->getQueryPlanDagHash(), work_unit.exe_unit.sort_info);
2576  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2579  if (partition_key_cond) {
2580  auto partition_cond_str = partition_key_cond->toString();
2581  auto partition_key_hash = boost::hash_value(partition_cond_str);
2582  boost::hash_combine(partition_cache_key, partition_key_hash);
2583  boost::hash_combine(partition_cache_key, static_cast<int>(window_partition_type));
2584  std::shared_ptr<HashJoin> partition_ptr;
2585  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2586  if (cached_hash_table_it != partition_cache.end()) {
2587  partition_ptr = cached_hash_table_it->second;
2588  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2589  << partition_cache_key << ", partition condition: " << partition_cond_str
2590  << ")";
2591  } else {
2592  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2593  partition_key_cond,
2594  query_infos,
2595  memory_level,
2596  window_partition_type,
2598  column_cache_map,
2600  work_unit.exe_unit.query_hint,
2601  work_unit.exe_unit.table_id_to_node_map);
2602  if (!hash_table_or_err.fail_reason.empty()) {
2603  throw std::runtime_error(hash_table_or_err.fail_reason);
2604  }
2605  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2606  partition_ptr = hash_table_or_err.hash_table;
2607  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2608  .second);
2609  VLOG(1) << "Put a generated hash table for computing window function context to "
2610  "cache (key: "
2611  << partition_cache_key << ", partition condition: " << partition_cond_str
2612  << ")";
2613  }
2614  CHECK(partition_ptr);
2615  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2616  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2617  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2618  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2619  }
2620  context = std::make_unique<WindowFunctionContext>(window_func,
2621  partition_cache_key,
2622  partition_ptr,
2623  elem_count,
2624  co.device_type,
2625  row_set_mem_owner,
2626  aggregate_tree_fanout);
2627  } else {
2628  context = std::make_unique<WindowFunctionContext>(
2629  window_func, elem_count, co.device_type, row_set_mem_owner);
2630  }
2631  const auto& order_keys = window_func->getOrderKeys();
2632  if (!order_keys.empty()) {
2633  auto sorted_partition_cache_key = partition_cache_key;
2634  for (auto& order_key : order_keys) {
2635  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2636  }
2637  for (auto& collation : window_func->getCollation()) {
2638  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2639  }
2640  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2641  auto cache_key_cnt_it =
2642  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2643  if (!cache_key_cnt_it.second) {
2644  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2645  cache_key_cnt_it.first->second + 1;
2646  }
2647 
2648  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2649  for (const auto& order_key : order_keys) {
2650  const auto order_col =
2651  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2652  if (!order_col) {
2653  throw std::runtime_error("Only order by columns supported for now");
2654  }
2655  auto const [column, col_elem_count] =
2657  *order_col,
2658  query_infos.front().info.fragments.front(),
2659  memory_level,
2660  0,
2661  nullptr,
2662  /*thread_idx=*/0,
2663  chunks_owner,
2664  column_cache_map);
2665 
2666  CHECK_EQ(col_elem_count, elem_count);
2667  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2668  }
2669  }
2670  if (context->getWindowFunction()->hasFraming() ||
2671  context->getWindowFunction()->isMissingValueFillingFunction()) {
2672  // todo (yoonmin) : if we try to support generic window function expression without
2673  // extra project node, we need to revisit here b/c the current logic assumes that
2674  // window function expression has a single input source
2675  auto& window_function_expression_args = window_func->getArgs();
2676  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2677  for (auto& expr : window_function_expression_args) {
2678  if (const auto arg_col_var =
2679  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2680  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2681  executor_,
2682  *arg_col_var,
2683  query_infos.front().info.fragments.front(),
2684  memory_level,
2685  0,
2686  nullptr,
2687  /*thread_idx=*/0,
2688  chunks_owner,
2689  column_cache_map);
2690  CHECK_EQ(col_elem_count, elem_count);
2691  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2692  }
2693  }
2694  }
2695  return context;
2696 }
2697 
2699  const CompilationOptions& co,
2700  const ExecutionOptions& eo,
2701  RenderInfo* render_info,
2702  const int64_t queue_time_ms) {
2703  auto timer = DEBUG_TIMER(__func__);
2704  const auto work_unit = createFilterWorkUnit(filter, SortInfo(), eo.just_explain);
2705  return executeWorkUnit(
2706  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2707 }
2708 
2709 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2710  std::vector<TargetMetaInfo> const& rhs) {
2711  if (lhs.size() == rhs.size()) {
2712  for (size_t i = 0; i < lhs.size(); ++i) {
2713  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2714  return false;
2715  }
2716  }
2717  return true;
2718  }
2719  return false;
2720 }
2721 
2722 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2723  return target_meta_info.get_type_info().is_geometry();
2724 }
2725 
2727  const RaExecutionSequence& seq,
2728  const CompilationOptions& co,
2729  const ExecutionOptions& eo,
2730  RenderInfo* render_info,
2731  const int64_t queue_time_ms) {
2732  auto timer = DEBUG_TIMER(__func__);
2733  if (!logical_union->isAll()) {
2734  throw std::runtime_error("UNION without ALL is not supported yet.");
2735  }
2736  // Will throw a std::runtime_error if types don't match.
2737  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2738  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2739  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2740  }
2741  auto work_unit = createUnionWorkUnit(logical_union, SortInfo(), eo);
2742  return executeWorkUnit(work_unit,
2743  logical_union->getOutputMetainfo(),
2744  false,
2746  eo,
2747  render_info,
2748  queue_time_ms);
2749 }
2750 
2752  const RelLogicalValues* logical_values,
2753  const ExecutionOptions& eo) {
2754  auto timer = DEBUG_TIMER(__func__);
2757 
2758  auto tuple_type = logical_values->getTupleType();
2759  for (size_t i = 0; i < tuple_type.size(); ++i) {
2760  auto& target_meta_info = tuple_type[i];
2761  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2762  // replace w/ bigint
2763  tuple_type[i] =
2764  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2765  }
2766  query_mem_desc.addColSlotInfo(
2767  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2768  }
2769  logical_values->setOutputMetainfo(tuple_type);
2770 
2771  std::vector<TargetInfo> target_infos;
2772  for (const auto& tuple_type_component : tuple_type) {
2773  target_infos.emplace_back(TargetInfo{false,
2774  kCOUNT,
2775  tuple_type_component.get_type_info(),
2776  SQLTypeInfo(kNULLT, false),
2777  false,
2778  false,
2779  /*is_varlen_projection=*/false});
2780  }
2781 
2782  std::shared_ptr<ResultSet> rs{
2783  ResultSetLogicalValuesBuilder{logical_values,
2784  target_infos,
2787  executor_->getRowSetMemoryOwner(),
2788  executor_}
2789  .build()};
2790 
2791  return {rs, tuple_type};
2792 }
2793 
2794 namespace {
2795 
2796 template <class T>
2797 int64_t insert_one_dict_str(T* col_data,
2798  const std::string& columnName,
2799  const SQLTypeInfo& columnType,
2800  const Analyzer::Constant* col_cv,
2801  const Catalog_Namespace::Catalog& catalog) {
2802  if (col_cv->get_is_null()) {
2803  *col_data = inline_fixed_encoding_null_val(columnType);
2804  } else {
2805  const int dict_id = columnType.get_comp_param();
2806  const auto col_datum = col_cv->get_constval();
2807  const auto& str = *col_datum.stringval;
2808  const auto dd = catalog.getMetadataForDict(dict_id);
2809  CHECK(dd && dd->stringDict);
2810  int32_t str_id = dd->stringDict->getOrAdd(str);
2811  if (!dd->dictIsTemp) {
2812  const auto checkpoint_ok = dd->stringDict->checkpoint();
2813  if (!checkpoint_ok) {
2814  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2815  columnName);
2816  }
2817  }
2818  const bool invalid = str_id > max_valid_int_value<T>();
2819  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2820  if (invalid) {
2821  LOG(ERROR) << "Could not encode string: " << str
2822  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2823  << " bits. Will store NULL instead.";
2824  }
2825  str_id = inline_fixed_encoding_null_val(columnType);
2826  }
2827  *col_data = str_id;
2828  }
2829  return *col_data;
2830 }
2831 
2832 template <class T>
2833 int64_t insert_one_dict_str(T* col_data,
2834  const ColumnDescriptor* cd,
2835  const Analyzer::Constant* col_cv,
2836  const Catalog_Namespace::Catalog& catalog) {
2837  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2838 }
2839 
2840 } // namespace
2841 
2843  const ExecutionOptions& eo) {
2844  auto timer = DEBUG_TIMER(__func__);
2845  if (eo.just_explain) {
2846  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2847  }
2848 
2849  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2852  executor_->getRowSetMemoryOwner(),
2853  executor_->blockSize(),
2854  executor_->gridSize());
2855 
2856  std::vector<TargetMetaInfo> empty_targets;
2857  return {rs, empty_targets};
2858 }
2859 
2861  const Analyzer::Query& query,
2863  const Catalog_Namespace::SessionInfo& session) {
2864  // Note: We currently obtain an executor for this method, but we do not need it.
2865  // Therefore, we skip the executor state setup in the regular execution path. In the
2866  // future, we will likely want to use the executor to evaluate expressions in the insert
2867  // statement.
2868 
2869  const auto& values_lists = query.get_values_lists();
2870  const int table_id = query.get_result_table_id();
2871  const auto& col_id_list = query.get_result_col_list();
2872  size_t rows_number = values_lists.size();
2873  size_t leaf_count = inserter.getLeafCount();
2874  const auto& catalog = session.getCatalog();
2875  const auto td = catalog.getMetadataForTable(table_id);
2876  CHECK(td);
2877  size_t rows_per_leaf = rows_number;
2878  if (td->nShards == 0) {
2879  rows_per_leaf =
2880  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2881  }
2882  auto max_number_of_rows_per_package =
2883  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2884 
2885  std::vector<const ColumnDescriptor*> col_descriptors;
2886  std::vector<int> col_ids;
2887  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2888  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2889  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2890  std::unordered_map<int, int> sequential_ids;
2891 
2892  for (const int col_id : col_id_list) {
2893  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2894  const auto col_enc = cd->columnType.get_compression();
2895  if (cd->columnType.is_string()) {
2896  switch (col_enc) {
2897  case kENCODING_NONE: {
2898  auto it_ok =
2899  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2900  CHECK(it_ok.second);
2901  break;
2902  }
2903  case kENCODING_DICT: {
2904  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2905  CHECK(dd);
2906  const auto it_ok = col_buffers.emplace(
2907  col_id,
2908  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2909  max_number_of_rows_per_package));
2910  CHECK(it_ok.second);
2911  break;
2912  }
2913  default:
2914  CHECK(false);
2915  }
2916  } else if (cd->columnType.is_geometry()) {
2917  auto it_ok =
2918  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2919  CHECK(it_ok.second);
2920  } else if (cd->columnType.is_array()) {
2921  auto it_ok =
2922  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2923  CHECK(it_ok.second);
2924  } else {
2925  const auto it_ok = col_buffers.emplace(
2926  col_id,
2927  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2928  max_number_of_rows_per_package]()));
2929  CHECK(it_ok.second);
2930  }
2931  col_descriptors.push_back(cd);
2932  sequential_ids[col_id] = col_ids.size();
2933  col_ids.push_back(col_id);
2934  }
2935 
2936  Executor::clearExternalCaches(true, td, catalog.getDatabaseId());
2937 
2938  size_t start_row = 0;
2939  size_t rows_left = rows_number;
2940  while (rows_left != 0) {
2941  // clear the buffers
2942  for (const auto& kv : col_buffers) {
2943  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2944  }
2945  for (auto& kv : str_col_buffers) {
2946  kv.second.clear();
2947  }
2948  for (auto& kv : arr_col_buffers) {
2949  kv.second.clear();
2950  }
2951 
2952  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2953  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2954  // more efficient to do the loops below column by column instead of row by row.
2955  // But for now I consider such a refactoring not worth investigating, as we have more
2956  // efficient ways to insert many rows anyway.
2957  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2958  const auto& values_list = values_lists[row_idx + start_row];
2959  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2960  CHECK(values_list.size() == col_descriptors.size());
2961  auto col_cv =
2962  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2963  if (!col_cv) {
2964  auto col_cast =
2965  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2966  CHECK(col_cast);
2967  CHECK_EQ(kCAST, col_cast->get_optype());
2968  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2969  }
2970  CHECK(col_cv);
2971  const auto cd = col_descriptors[col_idx];
2972  auto col_datum = col_cv->get_constval();
2973  auto col_type = cd->columnType.get_type();
2974  uint8_t* col_data_bytes{nullptr};
2975  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2976  (!cd->columnType.is_string() ||
2977  cd->columnType.get_compression() == kENCODING_DICT)) {
2978  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2979  CHECK(col_data_bytes_it != col_buffers.end());
2980  col_data_bytes = col_data_bytes_it->second.get();
2981  }
2982  switch (col_type) {
2983  case kBOOLEAN: {
2984  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2985  auto null_bool_val =
2986  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2987  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2988  ? inline_fixed_encoding_null_val(cd->columnType)
2989  : (col_datum.boolval ? 1 : 0);
2990  break;
2991  }
2992  case kTINYINT: {
2993  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2994  col_data[row_idx] = col_cv->get_is_null()
2995  ? inline_fixed_encoding_null_val(cd->columnType)
2996  : col_datum.tinyintval;
2997  break;
2998  }
2999  case kSMALLINT: {
3000  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3001  col_data[row_idx] = col_cv->get_is_null()
3002  ? inline_fixed_encoding_null_val(cd->columnType)
3003  : col_datum.smallintval;
3004  break;
3005  }
3006  case kINT: {
3007  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3008  col_data[row_idx] = col_cv->get_is_null()
3009  ? inline_fixed_encoding_null_val(cd->columnType)
3010  : col_datum.intval;
3011  break;
3012  }
3013  case kBIGINT:
3014  case kDECIMAL:
3015  case kNUMERIC: {
3016  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3017  col_data[row_idx] = col_cv->get_is_null()
3018  ? inline_fixed_encoding_null_val(cd->columnType)
3019  : col_datum.bigintval;
3020  break;
3021  }
3022  case kFLOAT: {
3023  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3024  col_data[row_idx] = col_datum.floatval;
3025  break;
3026  }
3027  case kDOUBLE: {
3028  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3029  col_data[row_idx] = col_datum.doubleval;
3030  break;
3031  }
3032  case kTEXT:
3033  case kVARCHAR:
3034  case kCHAR: {
3035  switch (cd->columnType.get_compression()) {
3036  case kENCODING_NONE:
3037  str_col_buffers[col_ids[col_idx]].push_back(
3038  col_datum.stringval ? *col_datum.stringval : "");
3039  break;
3040  case kENCODING_DICT: {
3041  switch (cd->columnType.get_size()) {
3042  case 1:
3044  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3045  cd,
3046  col_cv,
3047  catalog);
3048  break;
3049  case 2:
3051  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3052  cd,
3053  col_cv,
3054  catalog);
3055  break;
3056  case 4:
3058  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3059  cd,
3060  col_cv,
3061  catalog);
3062  break;
3063  default:
3064  CHECK(false);
3065  }
3066  break;
3067  }
3068  default:
3069  CHECK(false);
3070  }
3071  break;
3072  }
3073  case kTIME:
3074  case kTIMESTAMP:
3075  case kDATE: {
3076  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3077  col_data[row_idx] = col_cv->get_is_null()
3078  ? inline_fixed_encoding_null_val(cd->columnType)
3079  : col_datum.bigintval;
3080  break;
3081  }
3082  case kARRAY: {
3083  const auto is_null = col_cv->get_is_null();
3084  const auto size = cd->columnType.get_size();
3085  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3086  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3087  const auto is_point_coords =
3088  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3089  if (is_null && !is_point_coords) {
3090  if (size > 0) {
3091  int8_t* buf = (int8_t*)checked_malloc(size);
3092  put_null_array(static_cast<void*>(buf), elem_ti, "");
3093  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3094  p += elem_ti.get_size()) {
3095  put_null(static_cast<void*>(p), elem_ti, "");
3096  }
3097  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3098  } else {
3099  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3100  }
3101  break;
3102  }
3103  const auto l = col_cv->get_value_list();
3104  size_t len = l.size() * elem_ti.get_size();
3105  if (size > 0 && static_cast<size_t>(size) != len) {
3106  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3107  std::to_string(size / elem_ti.get_size()) +
3108  " values, " + "received " +
3109  std::to_string(l.size()));
3110  }
3111  if (elem_ti.is_string()) {
3112  CHECK(kENCODING_DICT == elem_ti.get_compression());
3113  CHECK(4 == elem_ti.get_size());
3114 
3115  int8_t* buf = (int8_t*)checked_malloc(len);
3116  int32_t* p = reinterpret_cast<int32_t*>(buf);
3117 
3118  int elemIndex = 0;
3119  for (auto& e : l) {
3120  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3121  CHECK(c);
3123  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3124  elemIndex++;
3125  }
3126  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3127 
3128  } else {
3129  int8_t* buf = (int8_t*)checked_malloc(len);
3130  int8_t* p = buf;
3131  for (auto& e : l) {
3132  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3133  CHECK(c);
3134  p = append_datum(p, c->get_constval(), elem_ti);
3135  CHECK(p);
3136  }
3137  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3138  }
3139  break;
3140  }
3141  case kPOINT:
3142  case kMULTIPOINT:
3143  case kLINESTRING:
3144  case kMULTILINESTRING:
3145  case kPOLYGON:
3146  case kMULTIPOLYGON:
3147  if (col_datum.stringval && col_datum.stringval->empty()) {
3148  throw std::runtime_error(
3149  "Empty values are not allowed for geospatial column \"" +
3150  cd->columnName + "\"");
3151  } else {
3152  str_col_buffers[col_ids[col_idx]].push_back(
3153  col_datum.stringval ? *col_datum.stringval : "");
3154  }
3155  break;
3156  default:
3157  CHECK(false);
3158  }
3159  }
3160  }
3161  start_row += package_size;
3162  rows_left -= package_size;
3163 
3165  insert_data.databaseId = catalog.getCurrentDB().dbId;
3166  insert_data.tableId = table_id;
3167  insert_data.data.resize(col_ids.size());
3168  insert_data.columnIds = col_ids;
3169  for (const auto& kv : col_buffers) {
3170  DataBlockPtr p;
3171  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3172  insert_data.data[sequential_ids[kv.first]] = p;
3173  }
3174  for (auto& kv : str_col_buffers) {
3175  DataBlockPtr p;
3176  p.stringsPtr = &kv.second;
3177  insert_data.data[sequential_ids[kv.first]] = p;
3178  }
3179  for (auto& kv : arr_col_buffers) {
3180  DataBlockPtr p;
3181  p.arraysPtr = &kv.second;
3182  insert_data.data[sequential_ids[kv.first]] = p;
3183  }
3184  insert_data.numRows = package_size;
3185  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3186  inserter.insertData(session, insert_data);
3187  }
3188 
3189  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3192  executor_->getRowSetMemoryOwner(),
3193  0,
3194  0);
3195  std::vector<TargetMetaInfo> empty_targets;
3196  return {rs, empty_targets};
3197 }
3198 
3199 namespace {
3200 
3201 size_t get_limit_value(std::optional<size_t> limit) {
3202  return limit ? *limit : 0;
3203 }
3204 
3205 size_t get_scan_limit(const RelAlgNode* ra, std::optional<size_t> limit) {
3206  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3207  if (aggregate) {
3208  return 0;
3209  }
3210  const auto compound = dynamic_cast<const RelCompound*>(ra);
3211  return (compound && compound->isAggregate()) ? 0 : get_limit_value(limit);
3212 }
3213 
3214 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3215  return !order_entries.empty() && order_entries.front().is_desc;
3216 }
3217 
3218 bool is_none_encoded_text(TargetMetaInfo const& target_meta_info) {
3219  return target_meta_info.get_type_info().is_none_encoded_string();
3220 }
3221 
3222 } // namespace
3223 
3225  const CompilationOptions& co,
3226  const ExecutionOptions& eo,
3227  RenderInfo* render_info,
3228  const int64_t queue_time_ms) {
3229  auto timer = DEBUG_TIMER(__func__);
3231  const auto source = sort->getInput(0);
3232  const bool is_aggregate = node_is_aggregate(source);
3233  auto it = leaf_results_.find(sort->getId());
3234  auto order_entries = sort->getOrderEntries();
3235  if (it != leaf_results_.end()) {
3236  // Add any transient string literals to the sdp on the agg
3237  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3238  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3239  executor_->row_set_mem_owner_);
3240  // Handle push-down for LIMIT for multi-node
3241  auto& aggregated_result = it->second;
3242  auto& result_rows = aggregated_result.rs;
3243  auto limit = sort->getLimit();
3244  const size_t offset = sort->getOffset();
3245  if (limit || offset) {
3246  if (!order_entries.empty()) {
3247  result_rows->sort(
3248  order_entries, get_limit_value(limit) + offset, co.device_type, executor_);
3249  }
3250  result_rows->dropFirstN(offset);
3251  if (limit) {
3252  result_rows->keepFirstN(get_limit_value(limit));
3253  }
3254  }
3255 
3256  if (render_info) {
3257  // We've hit a sort step that is the very last step
3258  // in a distributed render query. We'll fill in the render targets
3259  // since we have all that data needed to do so. This is normally
3260  // done in executeWorkUnit, but that is bypassed in this case.
3261  build_render_targets(*render_info,
3262  source_work_unit.exe_unit.target_exprs,
3263  aggregated_result.targets_meta);
3264  }
3265 
3266  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3267  sort->setOutputMetainfo(aggregated_result.targets_meta);
3268 
3269  return result;
3270  }
3271 
3272  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3273  bool is_desc{false};
3274  bool use_speculative_top_n_sort{false};
3275 
3276  auto execute_sort_query = [this,
3277  sort,
3278  &source,
3279  &is_aggregate,
3280  &eo,
3281  &co,
3282  render_info,
3283  queue_time_ms,
3284  &groupby_exprs,
3285  &is_desc,
3286  &order_entries,
3287  &use_speculative_top_n_sort]() -> ExecutionResult {
3288  std::optional<size_t> limit = sort->getLimit();
3289  const size_t offset = sort->getOffset();
3290  // check whether sort's input is cached
3291  auto source_node = sort->getInput(0);
3292  CHECK(source_node);
3293  ExecutionResult source_result{nullptr, {}};
3295  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3296  auto source_query_plan_dag = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
3297  source_node->getQueryPlanDagHash(), sort_info);
3298  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3299  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3300  !sort->isEmptyResult()) {
3301  if (auto cached_resultset =
3302  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
3303  source_query_plan_dag)) {
3304  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3305  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3306  << " from resultset cache";
3307  source_result =
3308  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3309  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3310  addTemporaryTable(-source_node->getId(), cached_resultset);
3311  }
3312  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3313  co.device_type == ExecutorDeviceType::GPU;
3314  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3315  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3316  }
3317  }
3318  if (!source_result.getDataPtr()) {
3319  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3320  is_desc = first_oe_is_desc(order_entries);
3321  ExecutionOptions eo_copy = eo;
3322  CompilationOptions co_copy = co;
3323  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
3324  if (hasStepForUnion() &&
3325  boost::algorithm::any_of(source->getOutputMetainfo(), is_none_encoded_text)) {
3327  VLOG(1) << "Punt sort's input query to CPU: detect union(-all) of none-encoded "
3328  "text column";
3329  }
3330 
3331  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3332  source_result = executeWorkUnit(source_work_unit,
3333  source->getOutputMetainfo(),
3334  is_aggregate,
3335  co_copy,
3336  eo_copy,
3337  render_info,
3338  queue_time_ms);
3339  use_speculative_top_n_sort =
3340  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3341  use_speculative_top_n(source_work_unit.exe_unit,
3342  source_result.getRows()->getQueryMemDesc());
3343  }
3344  if (render_info && render_info->isInSitu()) {
3345  return source_result;
3346  }
3347  if (source_result.isFilterPushDownEnabled()) {
3348  return source_result;
3349  }
3350  auto rows_to_sort = source_result.getRows();
3351  if (eo.just_explain) {
3352  return {rows_to_sort, {}};
3353  }
3354  auto const limit_val = get_limit_value(limit);
3355  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3356  !use_speculative_top_n_sort) {
3357  const size_t top_n = limit_val + offset;
3358  rows_to_sort->sort(order_entries, top_n, co.device_type, executor_);
3359  }
3360  if (limit || offset) {
3361  if (g_cluster && sort->collationCount() == 0) {
3362  if (offset >= rows_to_sort->rowCount()) {
3363  rows_to_sort->dropFirstN(offset);
3364  } else {
3365  rows_to_sort->keepFirstN(limit_val + offset);
3366  }
3367  } else {
3368  rows_to_sort->dropFirstN(offset);
3369  if (limit) {
3370  rows_to_sort->keepFirstN(limit_val);
3371  }
3372  }
3373  }
3374  return {rows_to_sort, source_result.getTargetsMeta()};
3375  };
3376 
3377  try {
3378  return execute_sort_query();
3379  } catch (const SpeculativeTopNFailed& e) {
3380  CHECK_EQ(size_t(1), groupby_exprs.size());
3381  CHECK(groupby_exprs.front());
3382  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3383  return execute_sort_query();
3384  }
3385 }
3386 
3388  const RelSort* sort,
3389  std::list<Analyzer::OrderEntry>& order_entries,
3390  const ExecutionOptions& eo) {
3391  const auto source = sort->getInput(0);
3392  auto limit = sort->getLimit();
3393  const size_t offset = sort->getOffset();
3394  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3395  const size_t scan_total_limit =
3396  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3397  size_t max_groups_buffer_entry_guess{
3398  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3400  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3401  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3402  const auto& source_exe_unit = source_work_unit.exe_unit;
3403 
3404  // we do not allow sorting geometry or array types
3405  for (auto order_entry : order_entries) {
3406  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3407  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3408  const auto& ti = get_target_info(te, false);
3409  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3410  throw std::runtime_error(
3411  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3412  }
3413  }
3414 
3415  if (source_exe_unit.groupby_exprs.size() == 1) {
3416  if (!source_exe_unit.groupby_exprs.front()) {
3417  sort_algorithm = SortAlgorithm::StreamingTopN;
3418  } else {
3419  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3420  first_oe_is_desc(order_entries))) {
3421  sort_algorithm = SortAlgorithm::Default;
3422  }
3423  }
3424  }
3425 
3426  sort->setOutputMetainfo(source->getOutputMetainfo());
3427  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3428  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3429  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3430  std::move(source_exe_unit.input_col_descs),
3431  source_exe_unit.simple_quals,
3432  source_exe_unit.quals,
3433  source_exe_unit.join_quals,
3434  source_exe_unit.groupby_exprs,
3435  source_exe_unit.target_exprs,
3436  source_exe_unit.target_exprs_original_type_infos,
3437  nullptr,
3438  {sort_info.order_entries, sort_algorithm, limit, offset},
3439  scan_total_limit,
3440  source_exe_unit.query_hint,
3441  source_exe_unit.query_plan_dag_hash,
3442  source_exe_unit.hash_table_build_plan_dag,
3443  source_exe_unit.table_id_to_node_map,
3444  source_exe_unit.use_bump_allocator,
3445  source_exe_unit.union_all,
3446  source_exe_unit.query_state},
3447  source,
3448  max_groups_buffer_entry_guess,
3449  std::move(source_work_unit.query_rewriter),
3450  source_work_unit.input_permutation,
3451  source_work_unit.left_deep_join_input_sizes};
3452 }
3453 
3454 namespace {
3455 
3462 std::pair<size_t, shared::TableKey> groups_approx_upper_bound(
3463  const std::vector<InputTableInfo>& table_infos) {
3464  CHECK(!table_infos.empty());
3465  const auto& first_table = table_infos.front();
3466  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3467  auto table_key = first_table.table_key;
3468  for (const auto& table_info : table_infos) {
3469  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3470  max_num_groups = table_info.info.getNumTuplesUpperBound();
3471  table_key = table_info.table_key;
3472  }
3473  }
3474  return std::make_pair(std::max(max_num_groups, size_t(1)), table_key);
3475 }
3476 
3477 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3478  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3479 }
3480 
3482  const RenderInfo* render_info,
3483  const RelAlgNode* body) {
3484  if (!is_projection(ra_exe_unit)) {
3485  return false;
3486  }
3487  if (render_info && render_info->isInSitu()) {
3488  return false;
3489  }
3490  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3491  // disable output columnar when we have top-sort node query
3492  return false;
3493  }
3494  bool flatbuffer_is_used = false;
3495  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3496  const auto ti = target_expr->get_type_info();
3497  // Usage of FlatBuffer memory layout implies columnar-only support.
3498  if (ti.usesFlatBuffer()) {
3499  flatbuffer_is_used = true;
3500  continue;
3501  }
3502  // We don't currently support varlen columnar projections, so
3503  // return false if we find one
3504  // TODO: notice that currently ti.supportsFlatBuffer() == ti.is_varlen()
3505  if (ti.is_varlen()) {
3506  return false;
3507  }
3508  }
3509  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3510  if (top_project->isRowwiseOutputForced()) {
3511  if (flatbuffer_is_used) {
3512  throw std::runtime_error(
3513  "Cannot force rowwise output when FlatBuffer layout is used.");
3514  }
3515  return false;
3516  }
3517  }
3518  return true;
3519 }
3520 
3522  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3523  if (target_expr->get_type_info().usesFlatBuffer()) {
3524  // using FlatBuffer memory layout implies columnar-only output
3525  return true;
3526  }
3527  }
3530 }
3531 
3539  for (const auto target_expr : ra_exe_unit.target_exprs) {
3540  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3541  return false;
3542  }
3543  }
3544  size_t preflight_count_query_threshold = g_preflight_count_query_threshold;
3546  preflight_count_query_threshold =
3548  VLOG(1) << "Set the pre-flight count query's threshold as "
3549  << preflight_count_query_threshold << " by a query hint";
3550  }
3551  if (is_projection(ra_exe_unit) &&
3552  (!ra_exe_unit.scan_limit ||
3553  ra_exe_unit.scan_limit > preflight_count_query_threshold)) {
3554  return true;
3555  }
3556  return false;
3557 }
3558 
3559 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3560  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3561  ra_exe_unit.simple_quals.empty());
3562 }
3563 
3565  const RelAlgExecutionUnit& ra_exe_unit_in,
3566  const std::vector<InputTableInfo>& table_infos,
3567  const Executor* executor,
3568  const ExecutorDeviceType device_type_in,
3569  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3570  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3571  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3572  const auto target_expr = ra_exe_unit.target_exprs[i];
3573  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3574  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3575  continue;
3576  }
3577  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3578  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3579  CHECK(arg);
3580  const auto& arg_ti = arg->get_type_info();
3581  // Avoid calling getExpressionRange for variable length types (string and array),
3582  // it'd trigger an assertion since that API expects to be called only for types
3583  // for which the notion of range is well-defined. A bit of a kludge, but the
3584  // logic to reject these types anyway is at lower levels in the stack and not
3585  // really worth pulling into a separate function for now.
3586  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3587  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3588  continue;
3589  }
3590  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3591  if (arg_range.getType() != ExpressionRangeType::Integer) {
3592  continue;
3593  }
3594  // When running distributed, the threshold for using the precise implementation
3595  // must be consistent across all leaves, otherwise we could have a mix of precise
3596  // and approximate bitmaps and we cannot aggregate them.
3597  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3598  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3599  const auto sub_bitmap_count =
3600  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3601  int64_t approx_bitmap_sz_bits{0};
3602  const auto error_rate_expr = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3603  if (error_rate_expr) {
3604  CHECK(error_rate_expr->get_type_info().get_type() == kINT);
3605  auto const error_rate =
3606  dynamic_cast<Analyzer::Constant const*>(error_rate_expr.get());
3607  CHECK(error_rate);
3608  CHECK_GE(error_rate->get_constval().intval, 1);
3609  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3610  } else {
3611  approx_bitmap_sz_bits = g_hll_precision_bits;
3612  }
3613  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3614  arg_range.getIntMin(),
3615  0,
3616  approx_bitmap_sz_bits,
3617  true,
3618  device_type,
3619  sub_bitmap_count};
3620  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3621  arg_range.getIntMin(),
3622  0,
3623  bitmap_sz_bits,
3624  false,
3625  device_type,
3626  sub_bitmap_count};
3627  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3628  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3629  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3630  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3631  target_exprs_owned.push_back(precise_count_distinct);
3632  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3633  }
3634  }
3635  return ra_exe_unit;
3636 }
3637 
3638 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3639  const CompilationOptions& co,
3640  const ExecutionOptions& eo) {
3642  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3643 }
3644 
3645 } // namespace
3646 
3648  const RelAlgExecutor::WorkUnit& work_unit,
3649  const std::vector<TargetMetaInfo>& targets_meta,
3650  const bool is_agg,
3651  const CompilationOptions& co_in,
3652  const ExecutionOptions& eo_in,
3653  RenderInfo* render_info,
3654  const int64_t queue_time_ms,
3655  const std::optional<size_t> previous_count) {
3657  auto timer = DEBUG_TIMER(__func__);
3658  auto query_exec_time_begin = timer_start();
3659 
3660  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3661  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3662 
3663  auto co = co_in;
3664  auto eo = eo_in;
3665  ColumnCacheMap column_cache;
3666  ScopeGuard clearWindowContextIfNecessary = [&]() {
3667  if (is_window_execution_unit(work_unit.exe_unit)) {
3669  }
3670  };
3671  if (is_window_execution_unit(work_unit.exe_unit)) {
3673  throw std::runtime_error("Window functions support is disabled");
3674  }
3675  co.device_type = ExecutorDeviceType::CPU;
3676  co.allow_lazy_fetch = false;
3677  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3678  }
3679  if (!eo.just_explain && eo.find_push_down_candidates) {
3680  // find potential candidates:
3681  VLOG(1) << "Try to find filter predicate push-down candidate.";
3682  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3683  if (!selected_filters.empty() || eo.just_calcite_explain) {
3684  VLOG(1) << "Found " << selected_filters.size()
3685  << " filter(s) to be pushed down. Re-create a query plan based on pushed "
3686  "filter predicate(s).";
3687  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3688  }
3689  VLOG(1) << "Continue with the current query plan";
3690  }
3691  if (render_info && render_info->isInSitu()) {
3692  co.allow_lazy_fetch = false;
3693  }
3694  const auto body = work_unit.body;
3695  CHECK(body);
3696  auto it = leaf_results_.find(body->getId());
3697  VLOG(3) << "body->getId()=" << body->getId()
3698  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3699  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3700  if (it != leaf_results_.end()) {
3701  executor_->addTransientStringLiterals(work_unit.exe_unit,
3702  executor_->row_set_mem_owner_);
3703  auto& aggregated_result = it->second;
3704  auto& result_rows = aggregated_result.rs;
3705  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3706  body->setOutputMetainfo(aggregated_result.targets_meta);
3707  if (render_info) {
3708  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3709  }
3710  return result;
3711  }
3712  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3713 
3715  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3716 
3717  // register query hint if query_dag_ is valid
3718  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3719  if (query_dag_) {
3720  auto candidate = query_dag_->getQueryHint(body);
3721  if (candidate) {
3722  ra_exe_unit.query_hint = *candidate;
3723  }
3724  }
3725 
3726  const auto& query_hints = ra_exe_unit.query_hint;
3727  ScopeGuard reset_cuda_block_grid_sizes = [&,
3728  orig_block_size = executor_->blockSize(),
3729  orig_grid_size = executor_->gridSize()]() {
3730  if (executor_->getDataMgr()->getCudaMgr()) {
3731  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3732  if (orig_block_size) {
3733  executor_->setBlockSize(orig_block_size);
3734  } else {
3735  executor_->resetBlockSize();
3736  }
3737  }
3738  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3739  if (orig_grid_size) {
3740  executor_->setGridSize(orig_grid_size);
3741  } else {
3742  executor_->resetGridSize();
3743  }
3744  }
3745  }
3746  };
3747 
3748  if (co.device_type == ExecutorDeviceType::GPU) {
3749  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3750  if (!executor_->getDataMgr()->getCudaMgr()) {
3751  VLOG(1) << "Skip CUDA grid size query hint: cannot detect CUDA device";
3752  } else {
3753  const auto num_sms = executor_->cudaMgr()->getMinNumMPsForAllDevices();
3754  const auto new_grid_size = static_cast<unsigned>(
3755  std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3756  const auto default_grid_size = executor_->gridSize();
3757  if (new_grid_size != default_grid_size) {
3758  VLOG(1) << "Change CUDA grid size: " << default_grid_size
3759  << " (default_grid_size) -> " << new_grid_size << " (# SMs * "
3760  << query_hints.cuda_grid_size_multiplier << ")";
3761  // todo (yoonmin): do we need to check a hard limit?
3762  executor_->setGridSize(new_grid_size);
3763  } else {
3764  VLOG(1) << "Skip CUDA grid size query hint: invalid grid size";
3765  }
3766  }
3767  }
3768  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3769  if (!executor_->getDataMgr()->getCudaMgr()) {
3770  VLOG(1) << "Skip CUDA block size query hint: cannot detect CUDA device";
3771  } else {
3772  int cuda_block_size = query_hints.cuda_block_size;
3773  int warp_size = executor_->warpSize();
3774  if (cuda_block_size >= warp_size) {
3775  cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3776  VLOG(1) << "Change CUDA block size w.r.t warp size (" << warp_size
3777  << "): " << executor_->blockSize() << " -> " << cuda_block_size;
3778  } else {
3779  VLOG(1) << "Change CUDA block size: " << executor_->blockSize() << " -> "
3780  << cuda_block_size;
3781  }
3782  executor_->setBlockSize(cuda_block_size);
3783  }
3784  }
3785  }
3786 
3787  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3788  if (is_window_execution_unit(ra_exe_unit)) {
3789  CHECK_EQ(table_infos.size(), size_t(1));
3790  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3791  max_groups_buffer_entry_guess =
3792  table_infos.front().info.fragments.front().getNumTuples();
3793  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3794  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3795  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3796  ra_exe_unit.scan_limit = *previous_count;
3797  } else {
3798  // TODO(adb): enable bump allocator path for render queries
3799  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3800  ra_exe_unit.scan_limit = 0;
3801  ra_exe_unit.use_bump_allocator = true;
3802  } else if (eo.executor_type == ::ExecutorType::Extern) {
3803  ra_exe_unit.scan_limit = 0;
3804  } else if (!eo.just_explain) {
3805  const auto filter_count_all = getFilteredCountAll(ra_exe_unit, true, co, eo);
3806  if (filter_count_all) {
3807  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3808  VLOG(1) << "Set a new scan limit from filtered_count_all: "
3809  << ra_exe_unit.scan_limit;
3810  auto const has_limit_value = ra_exe_unit.sort_info.limit.has_value();
3811  auto const top_k_sort_query =
3812  has_limit_value && !ra_exe_unit.sort_info.order_entries.empty();
3813  // top-k sort query needs to get a global result before sorting, so we cannot
3814  // apply LIMIT value at this point
3815  if (has_limit_value && !top_k_sort_query &&
3816  ra_exe_unit.scan_limit > ra_exe_unit.sort_info.limit.value()) {
3817  ra_exe_unit.scan_limit = ra_exe_unit.sort_info.limit.value();
3818  VLOG(1) << "Override scan limit to LIMIT value: " << ra_exe_unit.scan_limit;
3819  }
3820  }
3821  }
3822  }
3823  }
3824 
3825  // when output_columnar_hint is true here, it means either 1) columnar output
3826  // configuration is on or 2) a user hint is given but we have to disable it if some
3827  // requirements are not satisfied
3828  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3829  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3830  VLOG(1) << "Using columnar layout for projection as output size of "
3831  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3833  << " or some target uses FlatBuffer memory layout.";
3834  eo.output_columnar_hint = true;
3835  }
3836  } else {
3837  eo.output_columnar_hint = false;
3838  }
3839 
3840  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3841  co.device_type,
3843  nullptr,
3844  executor_->blockSize(),
3845  executor_->gridSize()),
3846  {}};
3847 
3848  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3849  const bool has_cardinality_estimation,
3850  const bool has_ndv_estimation) -> ExecutionResult {
3851  // Note that the groups buffer entry guess may be modified during query execution.
3852  // Create a local copy so we can track those changes if we need to attempt a retry
3853  // due to OOM
3854  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3855  try {
3856  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3857  is_agg,
3858  table_infos,
3859  ra_exe_unit,
3860  co,
3861  eo,
3862  render_info,
3863  has_cardinality_estimation,
3864  column_cache),
3865  targets_meta};
3866  } catch (const QueryExecutionError& e) {
3867  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3868  throw CardinalityEstimationRequired(/*range=*/0);
3869  }
3871  return handleOutOfMemoryRetry(
3872  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3873  targets_meta,
3874  is_agg,
3875  co,
3876  eo,
3877  render_info,
3879  queue_time_ms);
3880  }
3881  };
3882 
3883  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3884  for (const auto& table_info : table_infos) {
3885  const auto db_id = table_info.table_key.db_id;
3886  if (db_id > 0) {
3887  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
3888  if (td && (td->isTemporaryTable() || td->isView)) {
3889  use_resultset_cache = false;
3890  if (eo.keep_result) {
3891  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3892  "temporary table or view";
3893  }
3894  }
3895  }
3896  }
3897 
3898  CardinalityCacheKey cache_key{ra_exe_unit};
3899  try {
3900  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3901  auto card = cached_cardinality.second;
3902  if (cached_cardinality.first && card >= 0) {
3903  VLOG(1) << "Use cached cardinality for max_groups_buffer_entry_guess: " << card;
3904  result = execute_and_handle_errors(
3905  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3906  } else {
3907  VLOG(1) << "Use default cardinality for max_groups_buffer_entry_guess: "
3908  << max_groups_buffer_entry_guess;
3909  result = execute_and_handle_errors(
3910  max_groups_buffer_entry_guess,
3911  groups_approx_upper_bound(table_infos).first <= g_big_group_threshold,
3912  /*has_ndv_estimation=*/false);
3913  }
3914  } catch (const CardinalityEstimationRequired& e) {
3915  // check the cardinality cache
3916  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3917  auto card = cached_cardinality.second;
3918  if (cached_cardinality.first && card >= 0) {
3919  VLOG(1) << "CardinalityEstimationRequired, Use cached cardinality for "
3920  "max_groups_buffer_entry_guess: "
3921  << card;
3922  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3923  } else {
3924  const auto ndv_groups_estimation =
3925  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3926  const auto estimated_groups_buffer_entry_guess =
3927  ndv_groups_estimation > 0
3928  ? 2 * ndv_groups_estimation
3929  : std::min(groups_approx_upper_bound(table_infos).first,
3931  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3932  VLOG(1) << "CardinalityEstimationRequired, Use ndv_estimation: "
3933  << ndv_groups_estimation
3934  << ", cardinality for estimated_groups_buffer_entry_guess: "
3935  << estimated_groups_buffer_entry_guess;
3936  result = execute_and_handle_errors(
3937  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3938  if (!(eo.just_validate || eo.just_explain)) {
3939  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3940  }
3941  }
3942  }
3943 
3944  result.setQueueTime(queue_time_ms);
3945  if (render_info) {
3946  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3947  if (render_info->isInSitu()) {
3948  // return an empty result (with the same queue time, and zero render time)
3949  return {std::make_shared<ResultSet>(
3950  queue_time_ms,
3951  0,
3952  executor_->row_set_mem_owner_
3953  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3954  : nullptr),
3955  {}};
3956  }
3957  }
3958 
3959  for (auto& target_info : result.getTargetsMeta()) {
3960  if (target_info.get_type_info().is_string() &&
3961  !target_info.get_type_info().is_dict_encoded_string()) {
3962  // currently, we do not support resultset caching if non-encoded string is projected
3963  use_resultset_cache = false;
3964  if (eo.keep_result) {
3965  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3966  "string column projection";
3967  }
3968  }
3969  }
3970 
3971  const auto res = result.getDataPtr();
3972  auto allow_auto_caching_resultset =
3973  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3974  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3975  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset)) {
3976  auto query_exec_time = timer_stop(query_exec_time_begin);
3977  res->setExecTime(query_exec_time);
3978  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3979  res->setTargetMetaInfo(body->getOutputMetainfo());
3980  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3981  res->setInputTableKeys(std::move(input_table_keys));
3982  if (allow_auto_caching_resultset) {
3983  VLOG(1) << "Automatically keep query resultset to recycler";
3984  }
3985  res->setUseSpeculativeTopNSort(
3986  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3987  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
3988  ra_exe_unit.query_plan_dag_hash,
3989  res->getInputTableKeys(),
3990  res,
3991  res->getBufferSizeBytes(co.device_type),
3993  } else {
3994  if (eo.keep_result) {
3995  if (g_cluster) {
3996  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3997  "resultset recycling on distributed mode";
3998  } else if (hasStepForUnion()) {
3999  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
4000  "operator";
4001  } else if (render_info && render_info->isInSitu()) {
4002  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
4003  "a in-situ rendering query";
4004  } else if (is_validate_or_explain_query(eo)) {
4005  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
4006  "validate or explain query";
4007  } else {
4008  VLOG(1) << "Query hint \'keep_result\' is ignored";
4009  }
4010  }
4011  }
4012 
4013  return result;
4014 }
4015 
4017  std::vector<InputTableInfo> const& input_tables_info) const {
4018  return std::any_of(
4019  input_tables_info.begin(), input_tables_info.end(), [](InputTableInfo const& info) {
4020  auto const& table_key = info.table_key;
4021  if (table_key.db_id > 0) {
4022  auto catalog =
4024  CHECK(catalog);
4025  auto td = catalog->getMetadataForTable(table_key.table_id);
4026  CHECK(td);
4027  if (catalog->getDeletedColumnIfRowsDeleted(td)) {
4028  return true;
4029  }
4030  }
4031  return false;
4032  });
4033 }
4034 
4035 namespace {
4036 std::string get_table_name_from_table_key(shared::TableKey const& table_key) {
4037  std::string table_name{""};
4038  if (table_key.db_id > 0) {
4039  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(table_key.db_id);
4040  CHECK(catalog);
4041  auto td = catalog->getMetadataForTable(table_key.table_id);
4042  CHECK(td);
4043  table_name = td->tableName;
4044  }
4045  return table_name;
4046 }
4047 } // namespace
4048 
4050  const RelAlgExecutionUnit& ra_exe_unit,
4051  const bool is_agg,
4052  const CompilationOptions& co,
4053  const ExecutionOptions& eo) {
4054  auto const input_tables_info = get_table_infos(ra_exe_unit, executor_);
4055  if (is_projection(ra_exe_unit) && ra_exe_unit.simple_quals.empty() &&
4056  ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
4057  !hasDeletedRowInQuery(input_tables_info)) {
4058  auto const max_row_info = groups_approx_upper_bound(input_tables_info);
4059  auto const num_rows = max_row_info.first;
4060  auto const table_name = get_table_name_from_table_key(max_row_info.second);
4061  VLOG(1) << "Short-circuiting filtered count query for the projection query "
4062  "containing input table "
4063  << table_name << ": return its table cardinality " << num_rows << " instead";
4064  return std::make_optional<size_t>(num_rows);
4065  }
4066  const auto count =
4067  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
4068  kCOUNT,
4069  nullptr,
4070  false,
4071  nullptr);
4072  const auto count_all_exe_unit = ra_exe_unit.createCountAllExecutionUnit(count.get());
4073  size_t one{1};
4074  ResultSetPtr count_all_result;
4075  try {
4076  VLOG(1) << "Try to execute pre-flight counts query";
4077  ColumnCacheMap column_cache;
4078  ExecutionOptions copied_eo = eo;
4079  copied_eo.estimate_output_cardinality = true;
4080  count_all_result = executor_->executeWorkUnit(one,
4081  is_agg,
4082  input_tables_info,
4083  count_all_exe_unit,
4084  co,
4085  copied_eo,
4086  nullptr,
4087  false,
4088  column_cache);
4089  ra_exe_unit.per_device_cardinality = count_all_exe_unit.per_device_cardinality;
4090  } catch (const foreign_storage::ForeignStorageException& error) {
4091  throw;
4092  } catch (const QueryMustRunOnCpu&) {
4093  // force a retry of the top level query on CPU
4094  throw;
4095  } catch (const std::exception& e) {
4096  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
4097  return std::nullopt;
4098  }
4099  const auto count_row = count_all_result->getNextRow(false, false);
4100  CHECK_EQ(size_t(1), count_row.size());
4101  const auto& count_tv = count_row.front();
4102  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4103  CHECK(count_scalar_tv);
4104  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4105  CHECK(count_ptr);
4106  CHECK_GE(*count_ptr, 0);
4107  auto count_upper_bound = static_cast<size_t>(*count_ptr);
4108  return std::max(count_upper_bound, size_t(1));
4109 }
4110 
4111 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
4112  const auto& ra_exe_unit = work_unit.exe_unit;
4113  if (ra_exe_unit.input_descs.size() != 1) {
4114  return false;
4115  }
4116  const auto& table_desc = ra_exe_unit.input_descs.front();
4117  if (table_desc.getSourceType() != InputSourceType::TABLE) {
4118  return false;
4119  }
4120  const auto& table_key = table_desc.getTableKey();
4121  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
4122  const auto comp_expr =
4123  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4124  if (!comp_expr || comp_expr->get_optype() != kEQ) {
4125  return false;
4126  }
4127  const auto lhs = comp_expr->get_left_operand();
4128  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4129  if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4130  return false;
4131  }
4132  const auto rhs = comp_expr->get_right_operand();
4133  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4134  if (!rhs_const) {
4135  return false;
4136  }
4137  const auto cd = get_column_descriptor(
4138  {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4139  if (cd->isVirtualCol) {
4140  CHECK_EQ("rowid", cd->columnName);
4141  return true;
4142  }
4143  }
4144  return false;
4145 }
4146 
4148  const RelAlgExecutor::WorkUnit& work_unit,
4149  const std::vector<TargetMetaInfo>& targets_meta,
4150  const bool is_agg,
4151  const CompilationOptions& co,
4152  const ExecutionOptions& eo,
4153  RenderInfo* render_info,
4154  const bool was_multifrag_kernel_launch,
4155  const int64_t queue_time_ms) {
4156  // Disable the bump allocator
4157  // Note that this will have basically the same affect as using the bump allocator for
4158  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
4159  // path and the bump allocator path for kernel per fragment execution.
4160  auto ra_exe_unit_in = work_unit.exe_unit;
4161  ra_exe_unit_in.use_bump_allocator = false;
4162 
4163  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4164  co.device_type,
4166  nullptr,
4167  executor_->blockSize(),
4168  executor_->gridSize()),
4169  {}};
4170 
4171  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
4172  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
4173  auto eo_no_multifrag = eo;
4174  eo_no_multifrag.setNoExplainExecutionOptions(true);
4175  eo_no_multifrag.allow_multifrag = false;
4176  eo_no_multifrag.find_push_down_candidates = false;
4177  if (was_multifrag_kernel_launch) {
4178  try {
4179  // Attempt to retry using the kernel per fragment path. The smaller input size
4180  // required may allow the entire kernel to execute in GPU memory.
4181  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
4182  "kernels disabled.";
4183  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
4184  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
4185  ColumnCacheMap column_cache;
4186  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4187  is_agg,
4188  table_infos,
4189  ra_exe_unit,
4190  co,
4191  eo_no_multifrag,
4192  nullptr,
4193  true,
4194  column_cache),
4195  targets_meta};
4196  result.setQueueTime(queue_time_ms);
4197  } catch (const QueryExecutionError& e) {
4199  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
4200  }
4201  }
4202 
4203  if (render_info) {
4204  render_info->forceNonInSitu();
4205  }
4206 
4207  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
4208  // Only reset the group buffer entry guess if we ran out of slots, which
4209  // suggests a
4210  // highly pathological input which prevented a good estimation of distinct tuple
4211  // count. For projection queries, this will force a per-fragment scan limit, which is
4212  // compatible with the CPU path
4213  VLOG(1) << "Resetting max groups buffer entry guess.";
4214  max_groups_buffer_entry_guess = 0;
4215 
4216  int iteration_ctr = -1;
4217  while (true) {
4218  iteration_ctr++;
4220  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
4221  ColumnCacheMap column_cache;
4222  try {
4223  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4224  is_agg,
4225  table_infos,
4226  ra_exe_unit,
4227  co_cpu,
4228  eo_no_multifrag,
4229  nullptr,
4230  true,
4231  column_cache),
4232  targets_meta};
4233  } catch (const QueryExecutionError& e) {
4234  // Ran out of slots
4235  if (e.getErrorCode() < 0) {
4236  // Even the conservative guess failed; it should only happen when we group
4237  // by a huge cardinality array. Maybe we should throw an exception instead?
4238  // Such a heavy query is entirely capable of exhausting all the host memory.
4239  CHECK(max_groups_buffer_entry_guess);
4240  // Only allow two iterations of increasingly large entry guesses up to a maximum
4241  // of 512MB per column per kernel
4242  if (g_enable_watchdog || iteration_ctr > 1) {
4243  throw std::runtime_error("Query ran out of output slots in the result");
4244  }
4245  max_groups_buffer_entry_guess *= 2;
4246  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4247  "groups buffer entry "
4248  "guess equal to "
4249  << max_groups_buffer_entry_guess;
4250  } else {
4252  }
4253  continue;
4254  }
4255  result.setQueueTime(queue_time_ms);
4256  return result;
4257  }
4258  return result;
4259 }
4260 
4262  LOG(ERROR) << "Query execution failed with error "
4263  << getErrorMessageFromCode(error_code);
4264  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
4265  // We ran out of GPU memory, this doesn't count as an error if the query is
4266  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4267  // --allow-cpu-retry.
4268  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4269  if (!g_allow_cpu_retry) {
4270  throw std::runtime_error(
4271  "Query ran out of GPU memory, unable to automatically retry on CPU");
4272  }
4273  return;
4274  }
4275  throw std::runtime_error(getErrorMessageFromCode(error_code));
4276 }
4277 
4278 namespace {
4279 struct ErrorInfo {
4280  const char* code{nullptr};
4281  const char* description{nullptr};
4282 };
4284  // 'designated initializers' don't compile on Windows for std 17
4285  // They require /std:c++20. They been removed for the windows port.
4286  switch (error_code) {
4288  return {"ERR_DIV_BY_ZERO", "Division by zero"};
4290  return {"ERR_OUT_OF_GPU_MEM",
4291 
4292  "Query couldn't keep the entire working set of columns in GPU memory"};
4294  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
4296  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
4298  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
4300  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
4302  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
4304  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
4305  "Columnar conversion not supported for variable length types"};
4307  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
4309  return {"ERR_STRING_CONST_IN_RESULTSET",
4310 
4311  "NONE ENCODED String types are not supported as input result set."};
4313  return {"ERR_OUT_OF_RENDER_MEM",
4314 
4315  "Insufficient GPU memory for query results in render output buffer "
4316  "sized by render-mem-bytes"};
4318  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
4319  "Streaming-Top-N not supported in Render Query"};
4321  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
4322  "Multiple distinct values encountered"};
4323  case Executor::ERR_GEOS:
4324  return {"ERR_GEOS", "ERR_GEOS"};
4326  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
4327 
4328  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
4329  default:
4330  return {nullptr, nullptr};
4331  }
4332 }
4333 
4334 } // namespace
4335 
4337  if (error_code < 0) {
4338  return "Ran out of slots in the query output buffer";
4339  }
4340  const auto errorInfo = getErrorDescription(error_code);
4341 
4342  if (errorInfo.code) {
4343  return errorInfo.code + ": "s + errorInfo.description;
4344  } else {
4345  return "Other error: code "s + std::to_string(error_code);
4346  }
4347 }
4348 
4351  VLOG(1) << "Running post execution callback.";
4352  (*post_execution_callback_)();
4353  }
4354 }
4355 
4357  const SortInfo& sort_info,
4358  const ExecutionOptions& eo) {
4359  const auto compound = dynamic_cast<const RelCompound*>(node);
4360  if (compound) {
4361  return createCompoundWorkUnit(compound, sort_info, eo);
4362  }
4363  const auto project = dynamic_cast<const RelProject*>(node);
4364  if (project) {
4365  return createProjectWorkUnit(project, sort_info, eo);
4366  }
4367  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4368  if (aggregate) {
4369  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4370  }
4371  const auto filter = dynamic_cast<const RelFilter*>(node);
4372  if (filter) {
4373  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4374  }
4375  LOG(FATAL) << "Unhandled node type: "
4377  return {};
4378 }
4379 
4380 namespace {
4381 
4383  auto sink = get_data_sink(ra);
4384  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4385  return join->getJoinType();
4386  }
4387  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4388  return JoinType::INNER;
4389  }
4390 
4391  return JoinType::INVALID;
4392 }
4393 
4394 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4395  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4396  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4397  return nullptr;
4398  }
4399  const auto equi_join_condition =
4400  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4401  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4402  return nullptr;
4403  }
4404  const auto both_are_null_condition =
4405  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4406  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4407  both_are_null_condition->size() != 2) {
4408  return nullptr;
4409  }
4410  const auto lhs_is_null =
4411  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4412  const auto rhs_is_null =
4413  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4414  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4415  rhs_is_null->getOperator() != kISNULL) {
4416  return nullptr;
4417  }
4418  CHECK_EQ(size_t(1), lhs_is_null->size());
4419  CHECK_EQ(size_t(1), rhs_is_null->size());
4420  CHECK_EQ(size_t(2), equi_join_condition->size());
4421  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4422  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4423  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4424  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4425  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4426  return nullptr;
4427  }
4428  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4429  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4430  RexDeepCopyVisitor deep_copy_visitor;
4431  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4432  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4433  eq_operands.emplace_back(lhs_op_copy.release());
4434  eq_operands.emplace_back(rhs_op_copy.release());
4435  return boost::make_unique<const RexOperator>(
4436  kBW_EQ, eq_operands, equi_join_condition->getType());
4437  }
4438  return nullptr;
4439 }
4440 
4441 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4442  const RexScalar* scalar) {
4443  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4444  if (condition && condition->getOperator() == kAND) {
4445  CHECK_GE(condition->size(), size_t(2));
4446  auto acc = get_bitwise_equals(condition->getOperand(0));
4447  if (!acc) {
4448  return nullptr;
4449  }
4450  for (size_t i = 1; i < condition->size(); ++i) {
4451  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4452  and_operands.emplace_back(std::move(acc));
4453  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4454  acc =
4455  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4456  }
4457  return acc;
4458  }
4459  return get_bitwise_equals(scalar);
4460 }
4461 
4462 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4463  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4464  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4465  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4466  ++nesting_level) {
4467  if (left_deep_join->getOuterCondition(nesting_level)) {
4468  join_types[nesting_level - 1] = JoinType::LEFT;
4469  }
4470  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4471  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4472  join_types[nesting_level - 1] = cur_level_join_type;
4473  }
4474  }
4475  return join_types;
4476 }
4477 
4478 template <class RA>
4479 std::vector<size_t> do_table_reordering(
4480  std::vector<InputDescriptor>& input_descs,
4481  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4482  const JoinQualsPerNestingLevel& left_deep_join_quals,
4483  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4484  const RA* node,
4485  const std::vector<InputTableInfo>& query_infos,
4486  const Executor* executor) {
4487  if (g_cluster) {
4488  // Disable table reordering in distributed mode. The aggregator does not have enough
4489  // information to break ties
4490  return {};
4491  }
4492  if (node->isUpdateViaSelect() || node->isDeleteViaSelect()) {
4493  // Do not reorder tables for UPDATE and DELETE queries, since the outer table always
4494  // has to be the physical table.
4495  return {};
4496  }
4497  for (const auto& table_info : query_infos) {
4498  if (table_info.table_key.table_id < 0) {
4499  continue;
4500  }
4501  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
4502  CHECK(td);
4503  if (table_is_replicated(td)) {
4504  return {};
4505  }
4506  }
4507  const auto input_permutation =
4508  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4509  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4510  std::tie(input_descs, input_col_descs, std::ignore) =
4511  get_input_desc(node, input_to_nest_level, input_permutation);
4512  return input_permutation;
4513 }
4514 
4516  const RelLeftDeepInnerJoin* left_deep_join) {
4517  std::vector<size_t> input_sizes;
4518  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4519  const auto inputs = get_node_output(left_deep_join->getInput(i));
4520  input_sizes.push_back(inputs.size());
4521  }
4522  return input_sizes;
4523 }
4524 
4525 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4526  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4527  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4528  for (const auto& qual : quals) {
4529  const auto rewritten_qual = rewrite_expr(qual.get());
4530  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4531  }
4532  return rewritten_quals;
4533 }
4534 
4535 } // namespace
4536 
4538  const RelCompound* compound,
4539  const SortInfo& sort_info,
4540  const ExecutionOptions& eo) {
4541  std::vector<InputDescriptor> input_descs;
4542  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4543  auto input_to_nest_level = get_input_nest_levels(compound, {});
4544  std::tie(input_descs, input_col_descs, std::ignore) =
4545  get_input_desc(compound, input_to_nest_level, {});
4546  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4547  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4548  auto query_infos = get_table_infos(input_descs, executor_);
4549  CHECK_EQ(size_t(1), compound->inputCount());
4550  const auto left_deep_join =
4551  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4552  JoinQualsPerNestingLevel left_deep_join_quals;
4553  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4554  : std::vector<JoinType>{get_join_type(compound)};
4555  std::vector<size_t> input_permutation;
4556  std::vector<size_t> left_deep_join_input_sizes;
4557  std::optional<unsigned> left_deep_tree_id;
4558  if (left_deep_join) {
4559  left_deep_tree_id = left_deep_join->getId();
4560  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4561  left_deep_join_quals = translateLeftDeepJoinFilter(
4562  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4564  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4565  join_types.end()) {
4566  input_permutation = do_table_reordering(input_descs,
4567  input_col_descs,
4568  left_deep_join_quals,
4569  input_to_nest_level,
4570  compound,
4571  query_infos,
4572  executor_);
4573  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4574  std::tie(input_descs, input_col_descs, std::ignore) =
4575  get_input_desc(compound, input_to_nest_level, input_permutation);
4576  left_deep_join_quals = translateLeftDeepJoinFilter(
4577  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4578  }
4579  }
4580  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4581  input_descs,
4582  input_to_nest_level,
4583  input_permutation,
4584  input_col_descs,
4585  executor_);
4586  if (bbox_intersect_qual_info.is_reordered) {
4587  query_infos = get_table_infos(input_descs, executor_);
4588  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4589  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4590  }
4591  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4592  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4593  }
4594  RelAlgTranslator translator(
4595  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4596  const auto quals_cf = translate_quals(compound, translator);
4597  const auto quals = rewrite_quals(quals_cf.quals);
4598  const auto scalar_sources =
4599  translate_scalar_sources(compound, translator, eo.executor_type);
4600  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4601  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4602  const auto target_exprs = translate_targets(target_exprs_owned_,
4603  target_exprs_type_infos,
4604  scalar_sources,
4605  groupby_exprs,
4606  compound,
4607  translator,
4608  eo.executor_type);
4609 
4610  auto query_hint = RegisteredQueryHint::defaults();
4611  if (query_dag_) {
4612  auto candidate = query_dag_->getQueryHint(compound);
4613  if (candidate) {
4614  query_hint = *candidate;
4615  }
4616  }
4617  CHECK_EQ(compound->size(), target_exprs.size());
4618  const RelAlgExecutionUnit exe_unit = {input_descs,
4619  input_col_descs,
4620  quals_cf.simple_quals,
4621  quals,
4622  left_deep_join_quals,
4623  groupby_exprs,
4624  target_exprs,
4625  target_exprs_type_infos,
4626  nullptr,
4627  sort_info,
4628  0,
4629  query_hint,
4631  compound->getQueryPlanDagHash(), sort_info),
4632  {},
4633  {},
4634  false,
4635  std::nullopt,
4636  query_state_};
4637  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4638  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4639  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4640  compound->setOutputMetainfo(targets_meta);
4641  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4642  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4643  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4644  rewritten_exe_unit.join_quals);
4645  }
4646  if (has_valid_query_plan_dag(compound)) {
4647  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4648  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4649  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4650  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4651  }
4652  return {rewritten_exe_unit,
4653  compound,
4655  std::move(query_rewriter),
4656  input_permutation,
4657  left_deep_join_input_sizes};
4658 }
4659 
4660 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4661  const RelAlgNode* node) {
4662  auto input_to_nest_level = get_input_nest_levels(node, {});
4663  const auto left_deep_join =
4664  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4665  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4666  : std::vector<JoinType>{get_join_type(node)};
4667  return std::make_shared<RelAlgTranslator>(
4668  query_state_, executor_, input_to_nest_level, join_types, now_, false);
4669 }
4670 
4671 namespace {
4672 
4673 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4674  CHECK(qual_expr);
4675  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4676  if (!bin_oper || bin_oper->getOperator() != kAND) {
4677  return {qual_expr};
4678  }
4679  CHECK_GE(bin_oper->size(), size_t(2));
4680  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4681  for (size_t i = 1; i < bin_oper->size(); ++i) {
4682  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4683  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4684  }
4685  return lhs_cf;
4686 }
4687 
4688 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4689  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4690  const SQLOps sql_op) {
4691  CHECK(!factors.empty());
4692  auto acc = factors.front();
4693  for (size_t i = 1; i < factors.size(); ++i) {
4694  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4695  }
4696  return acc;
4697 }
4698 
4699 template <class QualsList>
4700 bool list_contains_expression(const QualsList& haystack,
4701  const std::shared_ptr<Analyzer::Expr>& needle) {
4702  for (const auto& qual : haystack) {
4703  if (*qual == *needle) {
4704  return true;
4705  }
4706  }
4707  return false;
4708 }
4709 
4710 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4711 // evaluations of `p` and allows use of the original form in joins if `p`
4712 // can be used for hash joins.
4713 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4714  const std::shared_ptr<Analyzer::Expr>& expr) {
4715  const auto expr_terms = qual_to_disjunctive_form(expr);
4716  CHECK_GE(expr_terms.size(), size_t(1));
4717  const auto& first_term = expr_terms.front();
4718  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4719  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4720  // First, collect the conjunctive components common to all the disjunctive components.
4721  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4722  for (const auto& first_term_factor : first_term_factors.quals) {
4723  bool is_common =
4724  expr_terms.size() > 1; // Only report common factors for disjunction.
4725  for (size_t i = 1; i < expr_terms.size(); ++i) {
4726  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4727  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4728  is_common = false;
4729  break;
4730  }
4731  }
4732  if (is_common) {
4733  common_factors.push_back(first_term_factor);
4734  }
4735  }
4736  if (common_factors.empty()) {
4737  return expr;
4738  }
4739  // Now that the common expressions are known, collect the remaining expressions.
4740  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4741  for (const auto& term : expr_terms) {
4742  const auto term_cf = qual_to_conjunctive_form(term);
4743  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4744  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4745  for (const auto& qual : term_cf.quals) {
4746  if (!list_contains_expression(common_factors, qual)) {
4747  remaining_quals.push_back(qual);
4748  }
4749  }
4750  if (!remaining_quals.empty()) {
4751  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4752  }
4753  }
4754  // Reconstruct the expression with the transformation applied.
4755  const auto common_expr = build_logical_expression(common_factors, kAND);
4756  if (remaining_terms.empty()) {
4757  return common_expr;
4758  }
4759  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4760  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4761 }
4762 
4763 } // namespace
4764 
4765 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4766  const RexScalar* join_condition,
4767  const std::vector<JoinType>& join_types,
4768  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4769  const bool just_explain) const {
4770  RelAlgTranslator translator(
4771  query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4772  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4773  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4774  for (const auto rex_condition_component : rex_condition_cf) {
4775  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4776  const auto join_condition = reverse_logical_distribution(
4777  translator.translate(bw_equals ? bw_equals.get() : rex_condition_component));
4778  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4779 
4780  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4781  for (const auto& cf_qual : cf_quals) {
4782  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4783  }
4784  };
4785 
4786  append_folded_cf_quals(join_condition_cf.quals);
4787  append_folded_cf_quals(join_condition_cf.simple_quals);
4788  }
4789  return combine_equi_join_conditions(join_condition_quals);
4790 }
4791 
4792 // Translate left deep join filter and separate the conjunctive form qualifiers
4793 // per nesting level. The code generated for hash table lookups on each level
4794 // must dominate its uses in deeper nesting levels.
4796  const RelLeftDeepInnerJoin* join,
4797  const std::vector<InputDescriptor>& input_descs,
4798  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4799  const bool just_explain) {
4800  const auto join_types = left_deep_join_types(join);
4801  const auto join_condition_quals = makeJoinQuals(
4802  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4803  MaxRangeTableIndexVisitor rte_idx_visitor;
4804  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4805  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4806  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4807  const auto outer_condition = join->getOuterCondition(rte_idx);
4808  if (outer_condition) {
4809  result[rte_idx - 1].quals =
4810  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4811  CHECK_LE(rte_idx, join_types.size());
4812  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4813  result[rte_idx - 1].type = JoinType::LEFT;
4814  continue;
4815  }
4816  for (const auto& qual : join_condition_quals) {
4817  if (visited_quals.count(qual)) {
4818  continue;
4819  }
4820  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4821  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4822  const auto it_ok = visited_quals.emplace(qual);
4823  CHECK(it_ok.second);
4824  result[rte_idx - 1].quals.push_back(qual);
4825  }
4826  }
4827  CHECK_LE(rte_idx, join_types.size());
4828  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4829  join_types[rte_idx - 1] == JoinType::SEMI ||
4830  join_types[rte_idx - 1] == JoinType::ANTI);
4831  result[rte_idx - 1].type = join_types[rte_idx - 1];
4832  }
4833  return result;
4834 }
4835 
4836 namespace {
4837 
4838 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4839  const RelAlgNode* ra_node,
4840  const size_t nest_level,
4841  const std::vector<TargetMetaInfo>& in_metainfo,
4842  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4843  CHECK_LE(size_t(1), ra_node->inputCount());
4844  CHECK_GE(size_t(2), ra_node->inputCount());
4845  const auto input = ra_node->getInput(nest_level);
4846  const auto it_rte_idx = input_to_nest_level.find(input);
4847  CHECK(it_rte_idx != input_to_nest_level.end());
4848  const int rte_idx = it_rte_idx->second;
4849  const auto table_key = table_key_from_ra(input);
4850  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4851  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4852  int input_idx = 0;
4853  for (const auto& input_meta : in_metainfo) {
4854  inputs.push_back(std::make_shared<Analyzer::ColumnVar>(
4855  input_meta.get_type_info(),
4856  shared::ColumnKey{table_key, scan_ra ? input_idx + 1 : input_idx},
4857  rte_idx));
4858  ++input_idx;
4859  }
4860  return inputs;
4861 }
4862 
4863 std::vector<Analyzer::Expr*> get_raw_pointers(
4864  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4865  std::vector<Analyzer::Expr*> output(input.size());
4866  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4867  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4868  return output;
4869 }
4870 
4871 } // namespace
4872 
4874  const RelAggregate* aggregate,
4875  const SortInfo& sort_info,
4876  const bool just_explain) {
4877  std::vector<InputDescriptor> input_descs;
4878  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4879  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4880  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4881  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4882  get_input_desc(aggregate, input_to_nest_level, {});
4883  const auto join_type = get_join_type(aggregate);
4884 
4885  RelAlgTranslator translator(
4886  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4887  CHECK_EQ(size_t(1), aggregate->inputCount());
4888  const auto source = aggregate->getInput(0);
4889  const auto& in_metainfo = source->getOutputMetainfo();
4890  const auto scalar_sources =
4891  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4892  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4893  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4894  const auto target_exprs = translate_targets(target_exprs_owned_,
4895  target_exprs_type_infos,
4896  scalar_sources,
4897  groupby_exprs,
4898  aggregate,
4899  translator);
4900 
4901  const auto query_infos = get_table_infos(input_descs, executor_);
4902 
4903  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4904  aggregate->setOutputMetainfo(targets_meta);
4905  auto query_hint = RegisteredQueryHint::defaults();
4906  if (query_dag_) {
4907  auto candidate = query_dag_->getQueryHint(aggregate);
4908  if (candidate) {
4909  query_hint = *candidate;
4910  }
4911  }
4912  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4913  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4914  return {RelAlgExecutionUnit{input_descs,
4915  input_col_descs,
4916  {},
4917  {},
4918  {},
4919  groupby_exprs,
4920  target_exprs,
4921  target_exprs_type_infos,
4922  nullptr,
4923  sort_info,
4924  0,
4925  query_hint,
4927  aggregate->getQueryPlanDagHash(), sort_info),
4928  join_info.hash_table_plan_dag,
4929  join_info.table_id_to_node_map,
4930  false,
4931  std::nullopt,
4932  query_state_},
4933  aggregate,
4935  nullptr};
4936 }
4937 
4939  const RelProject* project,
4940  const SortInfo& sort_info,
4941  const ExecutionOptions& eo) {
4942  std::vector<InputDescriptor> input_descs;
4943  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4944  auto input_to_nest_level = get_input_nest_levels(project, {});
4945  std::tie(input_descs, input_col_descs, std::ignore) =
4946  get_input_desc(project, input_to_nest_level, {});
4947  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4948  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4949  auto query_infos = get_table_infos(input_descs, executor_);
4950  const auto left_deep_join =
4951  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4952  JoinQualsPerNestingLevel left_deep_join_quals;
4953  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4954  : std::vector<JoinType>{get_join_type(project)};
4955  std::vector<size_t> input_permutation;
4956  std::vector<size_t> left_deep_join_input_sizes;
4957  std::optional<unsigned> left_deep_tree_id;
4958  if (left_deep_join) {
4959  left_deep_tree_id = left_deep_join->getId();
4960  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4961  left_deep_join_quals = translateLeftDeepJoinFilter(
4962  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4964  input_permutation = do_table_reordering(input_descs,
4965  input_col_descs,
4966  left_deep_join_quals,
4967  input_to_nest_level,
4968  project,
4969  query_infos,
4970  executor_);
4971  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4972  std::tie(input_descs, input_col_descs, std::ignore) =
4973  get_input_desc(project, input_to_nest_level, input_permutation);
4974  left_deep_join_quals = translateLeftDeepJoinFilter(
4975  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4976  }
4977  }
4978  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4979  input_descs,
4980  input_to_nest_level,
4981  input_permutation,
4982  input_col_descs,
4983  executor_);
4984  if (bbox_intersect_qual_info.is_reordered) {
4985  query_infos = get_table_infos(input_descs, executor_);
4986  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4987  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4988  }
4989  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4990  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4991  }
4992  RelAlgTranslator translator(
4993  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4994  const auto target_exprs_owned =
4995  translate_scalar_sources(project, translator, eo.executor_type);
4996 
4997  target_exprs_owned_.insert(
4998  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4999  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5000  auto query_hint = RegisteredQueryHint::defaults();
5001  if (query_dag_) {
5002  auto candidate = query_dag_->getQueryHint(project);
5003  if (candidate) {
5004  query_hint = *candidate;
5005  }
5006  }
5007  const RelAlgExecutionUnit exe_unit = {input_descs,
5008  input_col_descs,
5009  {},
5010  {},
5011  left_deep_join_quals,
5012  {nullptr},
5013  target_exprs,
5014  {},
5015  nullptr,
5016  sort_info,
5017  0,
5018  query_hint,
5020  project->getQueryPlanDagHash(), sort_info),
5021  {},
5022  {},
5023  false,
5024  std::nullopt,
5025  query_state_};
5026  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5027  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5028  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
5029  project->setOutputMetainfo(targets_meta);
5030  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
5031  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
5032  left_deep_trees_info.emplace(left_deep_tree_id.value(),
5033  rewritten_exe_unit.join_quals);
5034  }
5035  if (has_valid_query_plan_dag(project)) {
5036  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5037  project, left_deep_tree_id, left_deep_trees_info, executor_);
5038  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
5039  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
5040  }
5041  return {rewritten_exe_unit,
5042  project,
5044  std::move(query_rewriter),
5045  input_permutation,
5046  left_deep_join_input_sizes};
5047 }
5048 
5049 namespace {
5050 
5051 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
5052  RelAlgNode const* input_node) {
5053  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
5054  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
5055  const int negative_node_id = -input_node->getId();
5056  int32_t db_id{0};
5057  if (auto rel_scan = dynamic_cast<const RelScan*>(input_node)) {
5058  db_id = rel_scan->getCatalog().getDatabaseId();
5059  }
5060  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
5061  target_exprs.reserve(tmis.size());
5062  for (size_t i = 0; i < tmis.size(); ++i) {
5063  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
5064  tmis[i].get_type_info(),
5065  shared::ColumnKey{db_id, negative_node_id, int32_t(i)},
5066  0));
5067  }
5068  return target_exprs;
5069 }
5070 
5071 } // namespace
5072 
5074  const RelLogicalUnion* logical_union,
5075  const SortInfo& sort_info,
5076  const ExecutionOptions& eo) {
5077  std::vector<InputDescriptor> input_descs;
5078  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5079  // Map ra input ptr to index (0, 1).
5080  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
5081  std::tie(input_descs, input_col_descs, std::ignore) =
5082  get_input_desc(logical_union, input_to_nest_level, {});
5083  const auto query_infos = get_table_infos(input_descs, executor_);
5084  auto const max_num_tuples =
5085  std::accumulate(query_infos.cbegin(),
5086  query_infos.cend(),
5087  size_t(0),
5088  [](auto max, auto const& query_info) {
5089  return std::max(max, query_info.info.getNumTuples());
5090  });
5091 
5092  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
5093  for (auto& pair : input_to_nest_level) {
5094  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
5095  << pair.second << ')';
5096  }
5097 
5098  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5099  // may differ on StringDictionaries.
5100  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5101  for (unsigned i = 0; i < 2; ++i) {
5102  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5103  CHECK(!input_exprs_owned.empty())
5104  << "No metainfo found for input node(" << i << ") "
5105  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5106  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5107  for (auto& input_expr : input_exprs_owned) {
5108  VLOG(3) << " " << input_expr->toString();
5109  }
5110  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5111  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5112  }
5113 
5114  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5115  << " input_col_descs=" << shared::printContainer(input_col_descs)
5116  << " target_exprs.size()=" << target_exprs_pair[0].size()
5117  << " max_num_tuples=" << max_num_tuples;
5118 
5119  const RelAlgExecutionUnit exe_unit = {input_descs,
5120  input_col_descs,
5121  {}, // quals_cf.simple_quals,
5122  {}, // rewrite_quals(quals_cf.quals),
5123  {},
5124  {nullptr},
5125  target_exprs_pair[0],
5126  {},
5127  nullptr,
5128  sort_info,
5129  max_num_tuples,
5132  {},
5133  {},
5134  false,
5135  logical_union->isAll(),
5136  query_state_,
5137  target_exprs_pair[1]};
5138  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5139  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5140 
5141  RelAlgNode const* input0 = logical_union->getInput(0);
5142  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5143  logical_union->setOutputMetainfo(
5144  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5145  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5146  logical_union->setOutputMetainfo(
5147  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5148  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5149  logical_union->setOutputMetainfo(
5150  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5151  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5152  logical_union->setOutputMetainfo(
5153  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5154  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5155  logical_union->setOutputMetainfo(
5156  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5157  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5158  logical_union->setOutputMetainfo(
5159  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5160  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5161  logical_union->setOutputMetainfo(
5162  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5163  } else if (dynamic_cast<const RelSort*>(input0)) {
5164  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5165  } else {
5166  throw QueryNotSupported("Unsupported input type: " +
5168  }
5169  VLOG(3) << "logical_union->getOutputMetainfo()="
5170  << shared::printContainer(logical_union->getOutputMetainfo())
5171  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5172  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5173 
5174  return {rewritten_exe_unit,
5175  logical_union,
5177  std::move(query_rewriter)};
5178 }
5179 
5181  const RelTableFunction* rel_table_func,
5182  const bool just_explain,
5183  const bool is_gpu) {
5184  std::vector<InputDescriptor> input_descs;
5185  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5186  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5187  std::tie(input_descs, input_col_descs, std::ignore) =
5188  get_input_desc(rel_table_func, input_to_nest_level, {});
5189  const auto query_infos = get_table_infos(input_descs, executor_);
5190  RelAlgTranslator translator(
5191  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5192  auto input_exprs_owned = translate_scalar_sources(
5193  rel_table_func, translator, ::ExecutorType::TableFunctions);
5194  target_exprs_owned_.insert(
5195  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5196 
5197  const auto table_function_impl_and_type_infos = [=]() {
5198  if (is_gpu) {
5199  try {
5200  return bind_table_function(
5201  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5202  } catch (ExtensionFunctionBindingError& e) {
5203  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5204  << " Redirecting " << rel_table_func->getFunctionName()
5205  << " step to run on CPU.";
5206  throw QueryMustRunOnCpu();
5207  }
5208  } else {
5209  try {
5210  return bind_table_function(
5211  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5212  } catch (ExtensionFunctionBindingError& e) {
5213  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5214  throw;
5215  }
5216  }
5217  }();
5218  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5219  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5220  size_t output_row_sizing_param = 0;
5221  if (table_function_impl
5222  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5223  const auto parameter_index =
5224  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5225  CHECK_GT(parameter_index, size_t(0));
5226  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5227  const auto parameter_expr =
5228  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5229  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5230  if (!parameter_expr_literal) {
5231  throw std::runtime_error(
5232  "Provided output buffer sizing parameter is not a literal. Only literal "
5233  "values are supported with output buffer sizing configured table "
5234  "functions.");
5235  }
5236  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5237  if (literal_val < 0) {
5238  throw std::runtime_error("Provided output sizing parameter " +
5239  std::to_string(literal_val) +
5240  " must be positive integer.");
5241  }
5242  output_row_sizing_param = static_cast<size_t>(literal_val);
5243  } else {
5244  // RowMultiplier not specified in the SQL query. Set it to 1
5245  output_row_sizing_param = 1; // default value for RowMultiplier
5246  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5247  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5248  makeExpr<Analyzer::Constant>(kINT, false, d);
5249  // Push the constant 1 to input_exprs
5250  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5251  DEFAULT_ROW_MULTIPLIER_EXPR);
5252  }
5253  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5254  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5255  } else {
5256  UNREACHABLE();
5257  }
5258 
5259  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5260  size_t input_index = 0;
5261  size_t arg_index = 0;
5262  const auto table_func_args = table_function_impl.getInputArgs();
5263  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5264  for (const auto& ti : table_function_type_infos) {
5265  if (ti.is_column_list()) {
5266  for (int i = 0; i < ti.get_dimension(); i++) {
5267  auto& input_expr = input_exprs_owned[input_index];
5268  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5269  CHECK(col_var);
5270 
5271  // avoid setting type info to ti here since ti doesn't have all the
5272  // properties correctly set
5273  auto type_info = input_expr->get_type_info();
5274  if (ti.is_column_array()) {
5275  type_info.set_compression(kENCODING_ARRAY);
5276  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5277  } else {
5278  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5279  }
5280  type_info.set_type(ti.get_type()); // set type to column list
5281  type_info.set_dimension(ti.get_dimension());
5282  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5283  input_expr->set_type_info(type_info);
5284 
5285  input_col_exprs.push_back(col_var);
5286  input_index++;
5287  }
5288  } else if (ti.is_column()) {
5289  auto& input_expr = input_exprs_owned[input_index];
5290  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5291  CHECK(col_var);
5292  // same here! avoid setting type info to ti since it doesn't have all the
5293  // properties correctly set
5294  auto type_info = input_expr->get_type_info();
5295  if (ti.is_column_array()) {
5296  type_info.set_compression(kENCODING_ARRAY);
5297  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5298  } else {
5299  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5300  }
5301  type_info.set_type(ti.get_type()); // set type to column
5302  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5303  input_expr->set_type_info(type_info);
5304  input_col_exprs.push_back(col_var);
5305  input_index++;
5306  } else {
5307  auto input_expr = input_exprs_owned[input_index];
5308  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5309  if (ext_func_arg_ti != input_expr->get_type_info()) {
5310  input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5311  }
5312  input_index++;
5313  }
5314  arg_index++;
5315  }
5316  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5317  std::vector<Analyzer::Expr*> table_func_outputs;
5318  constexpr int32_t transient_pos{-1};
5319  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5320  auto ti = table_function_impl.getOutputSQLType(i);
5321  ti.setUsesFlatBuffer(ti.supportsFlatBuffer());
5322  if (ti.is_geometry()) {
5323  auto p = table_function_impl.getInputID(i);
5324  int32_t input_pos = p.first;
5325  if (input_pos != transient_pos) {
5326  CHECK(!ti.is_column_list()); // see QE-472
5327  CHECK_LT(input_pos, input_exprs_owned.size());
5328  const auto& reference_ti = input_exprs_owned[input_pos]->get_type_info();
5329  CHECK(IS_GEO(reference_ti.get_type()) || IS_GEO(reference_ti.get_subtype()));
5330  ti.set_input_srid(reference_ti.get_input_srid());
5331  ti.set_output_srid(reference_ti.get_output_srid());
5332  ti.set_compression(reference_ti.get_compression());
5333  ti.set_comp_param(reference_ti.get_comp_param());
5334  } else {
5335  ti.set_input_srid(0);
5336  ti.set_output_srid(0);
5337  ti.set_compression(kENCODING_NONE);
5338  ti.set_comp_param(0);
5339  }
5340  } else if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5341  auto p = table_function_impl.getInputID(i);
5342 
5343  int32_t input_pos = p.first;
5344  if (input_pos == transient_pos) {
5345  ti.set_comp_param(TRANSIENT_DICT_ID);
5346  ti.setStringDictKey(shared::StringDictKey::kTransientDictKey);
5347  } else {
5348  // Iterate over the list of arguments to compute the offset. Use this offset to
5349  // get the corresponding input
5350  int32_t offset = 0;
5351  for (int j = 0; j < input_pos; j++) {
5352  const auto ti = table_function_type_infos[j];
5353  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5354  }
5355  input_pos = offset + p.second;
5356 
5357  CHECK_LT(input_pos, input_exprs_owned.size());
5358  const auto& dict_key =
5359  input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5360  ti.set_comp_param(dict_key.dict_id);
5361  ti.setStringDictKey(dict_key);
5362  }
5363  }
5364  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(
5365  ti, shared::ColumnKey{0, 0, int32_t(i)}, -1));
5366  table_func_outputs.push_back(target_exprs_owned_.back().get());
5367  }
5368  auto input_exprs = get_raw_pointers(input_exprs_owned);
5369  const TableFunctionExecutionUnit exe_unit = {
5370  input_descs,
5371  input_col_descs,
5372  input_exprs, // table function inputs
5373  input_col_exprs, // table function column inputs (duplicates w/ above)
5374  table_func_outputs, // table function projected exprs
5375  output_row_sizing_param, // output buffer sizing param
5376  table_function_impl};
5377  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5378  rel_table_func->setOutputMetainfo(targets_meta);
5379  return {exe_unit, rel_table_func};
5380 }
5381 
5382 namespace {
5383 
5384 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
5386  const RelAlgTranslator& translator,
5387  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
5388  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
5389  std::vector<TargetMetaInfo> in_metainfo;
5390  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
5391  const auto data_sink_node = get_data_sink(filter);
5392  auto input_it = inputs_owned.begin();
5393  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
5394  const auto source = data_sink_node->getInput(nest_level);
5395  const auto scan_source = dynamic_cast<const RelScan*>(source);
5396  if (scan_source) {
5397  CHECK(source->getOutputMetainfo().empty());
5398  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
5399  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
5400  scalar_sources_owned.push_back(translator.translate(input_it->get()));
5401  }
5402  const auto source_metadata =
5403  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
5404  in_metainfo.insert(
5405  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5406  exprs_owned.insert(
5407  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5408  } else {
5409  const auto& source_metadata = source->getOutputMetainfo();
5410  input_it += source_metadata.size();
5411  in_metainfo.insert(
5412  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5413  const auto scalar_sources_owned = synthesize_inputs(
5414  data_sink_node, nest_level, source_metadata, input_to_nest_level);
5415  exprs_owned.insert(
5416  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5417  }
5418  }
5419  return std::make_pair(in_metainfo, exprs_owned);
5420 }
5421 
5422 } // namespace
5423 
5425  const SortInfo& sort_info,
5426  const bool just_explain) {
5427  CHECK_EQ(size_t(1), filter->inputCount());
5428  std::vector<InputDescriptor> input_descs;
5429  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5430  std::vector<TargetMetaInfo> in_metainfo;
5431  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5432  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5433 
5434  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5435  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5436  get_input_desc(filter, input_to_nest_level, {});
5437  const auto join_type = get_join_type(filter);
5438  RelAlgTranslator translator(
5439  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
5440  std::tie(in_metainfo, target_exprs_owned) =
5441  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5442  const auto filter_expr = translator.translate(filter->getCondition());
5443  const auto query_infos = get_table_infos(input_descs, executor_);
5444 
5445  const auto qual = fold_expr(filter_expr.get());
5446  target_exprs_owned_.insert(
5447  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5448 
5449  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5450  filter->setOutputMetainfo(in_metainfo);
5451  const auto rewritten_qual = rewrite_expr(qual.get());
5452  auto query_hint = RegisteredQueryHint::defaults();
5453  if (query_dag_) {
5454  auto candidate = query_dag_->getQueryHint(filter);
5455  if (candidate) {
5456  query_hint = *candidate;
5457  }
5458  }
5459  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5460  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5461  return {{input_descs,
5462  input_col_descs,
5463  {},
5464  {rewritten_qual ? rewritten_qual : qual},
5465  {},
5466  {nullptr},
5467  target_exprs,
5468  {},
5469  nullptr,
5470  sort_info,
5471  0,
5472  query_hint,
5474  filter->getQueryPlanDagHash(), sort_info),
5475  join_info.hash_table_plan_dag,
5476  join_info.table_id_to_node_map},
5477  filter,
5479  nullptr};
5480 }
5481 
5483 
5485  if (auto foreign_storage_mgr =
5486  executor_->getDataMgr()->getPersistentStorageMgr()->getForeignStorageMgr()) {
5487  // Parallelism hints need to be reset to empty so that we don't accidentally re-use
5488  // them. This can cause attempts to fetch strings that do not shard to the correct
5489  // node in distributed mode.
5490  foreign_storage_mgr->setParallelismHints({});
5491  }
5492 }
5493 
5495  CHECK(executor_);
5496  const auto phys_inputs = get_physical_inputs_with_spi_col_id(ra);
5497  const auto phys_table_ids = get_physical_table_inputs(ra);
5498  executor_->setupCaching(phys_inputs, phys_table_ids);
5499 }
5500 
5502  const auto& ra = query_dag_->getRootNode();
5504 }
5505 
5506 std::unordered_set<shared::TableKey> RelAlgExecutor::getPhysicalTableIds() const {
5508 }
5509 
5512 }
const size_t getGroupByCount() const
Definition: RelAlgDag.h:1508
void setGlobalQueryHints(const RegisteredQueryHint &global_hints)
Definition: RelAlgDag.h:3340
bool isAll() const
Definition: RelAlgDag.h:2770
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
int64_t queue_time_ms_
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
size_t getOffset() const
Definition: RelAlgDag.h:2228
std::vector< int > ChunkKey
Definition: types.h:36
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool hasNoneEncodedTextArg() const
Definition: Analyzer.h:1743
TextEncodingCastCounts visitUOper(const Analyzer::UOper *u_oper) const override
RaExecutionDesc * getDescriptor(size_t idx) const
TextEncodingCastCounts visitStringOper(const Analyzer::StringOper *string_oper) const override
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
void visitBegin() const override
JoinType
Definition: sqldefs.h:174
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:580
bool g_use_query_resultset_cache
Definition: Execute.cpp:156
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string cat(Ts &&...args)
bool isFrameNavigateWindowFunction() const
Definition: Analyzer.h:2848
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
Definition: RelAlgDag.h:2102
const Expr * get_escape_expr() const
Definition: Analyzer.h:1064
bool has_valid_query_plan_dag(const RelAlgNode *node)
AggregatedColRange computeColRangesCache()
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
Definition: sqltypes.h:76
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
Definition: RelAlgDag.h:1320
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:113
const std::vector< std::vector< std::shared_ptr< TargetEntry > > > & get_values_lists() const
Definition: Analyzer.h:3026
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
Definition: RelAlgDag.h:2096
size_t get_limit_value(std::optional< size_t > limit)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
std::list< Analyzer::OrderEntry > getOrderEntries() const
Definition: RelAlgDag.h:2264
bool is_validate_or_explain_query(const ExecutionOptions &eo)
void forceNonInSitu()
Definition: RenderInfo.cpp:46
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ErrorInfo getErrorDescription(const int32_t error_code)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
const bool hasQueryStepForUnion() const
size_t getIndex() const
Definition: RelAlgDag.h:741
TextEncodingCastCounts get_text_cast_counts(const RelAlgExecutionUnit &ra_exe_unit)
std::pair< size_t, shared::TableKey > groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:84
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
shared::TableKey table_key_from_ra(const RelAlgNode *ra_node)
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1488
#define LOG(tag)
Definition: Logger.h:285
size_t size() const override
Definition: RelAlgDag.h:2094
bool is_none_encoded_text(TargetMetaInfo const &target_meta_info)
std::vector< size_t > outer_fragment_indices
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
bool g_allow_query_step_skipping
Definition: Execute.cpp:159
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:38
const Expr * get_right_operand() const
Definition: Analyzer.h:456
SQLOps
Definition: sqldefs.h:28
const std::unordered_map< int, QueryPlanHash > getSkippedQueryStepCacheKeys() const
TemporaryTables temporary_tables_
bool is_any(T &&value)
Definition: misc.h:258
size_t getNumRows() const
Definition: RelAlgDag.h:2720
void prepare_string_dictionaries(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:217
PersistentStorageMgr * getPersistentStorageMgr() const
Definition: DataMgr.cpp:677
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setupCaching(const RelAlgNode *ra)
const RexScalar * getCondition() const
Definition: RelAlgDag.h:1898
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6141
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr, const Executor *executor=nullptr)
Definition: ParserNode.cpp:379
const bool default_disregard_casts_to_none_encoding_
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
Definition: RelAlgDag.h:2687
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:347
Definition: sqldefs.h:37
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:3217
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
#define UNREACHABLE()
Definition: Logger.h:338
size_t g_preflight_count_query_threshold
Definition: Execute.cpp:84
void handle_query_hint(const std::vector< std::shared_ptr< RelAlgNode >> &nodes, RelAlgDag &rel_alg_dag) noexcept
Definition: RelAlgDag.cpp:1570
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:48
Definition: sqldefs.h:29
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::shared_ptr< Analyzer::Expr > ExpressionPtr
Definition: Analyzer.h:184
size_t getArity() const
Definition: Analyzer.h:1674
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1629
const ResultSetPtr & getDataPtr() const
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr *expr)
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1625
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
virtual void handleQueryEngineVector(std::vector< std::shared_ptr< RelAlgNode >> const nodes)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
TextEncodingCastCounts visit(const Analyzer::Expr *expr) const
size_t get_frag_count_of_table(const shared::TableKey &table_key, Executor *executor)
void setNoExplainExecutionOptions(bool no_validation=false)
void set_parallelism_hints(const RelAlgNode &ra_node)
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
static SortInfo createFromSortNode(const RelSort *sort_node)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
int g_hll_precision_bits
const std::string kInfoSchemaDbName
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
bool g_enable_data_recycler
Definition: Execute.cpp:154
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2802
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2820
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
size_t max_join_hash_table_size
Definition: QueryHint.h:358
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
const Expr * get_arg() const
Definition: Analyzer.h:1061
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
bool hasStepForUnion() const
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
ExecutorDeviceType
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< RelAlgNode > > & getNodes()
Definition: RelAlgDag.h:2824
foreign_storage::ForeignStorageMgr * getForeignStorageMgr() const
void handleNop(RaExecutionDesc &ed)
void set_type_info(const SQLTypeInfo &ti)
Definition: Analyzer.h:80
#define LOG_IF(severity, condition)
Definition: Logger.h:384
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
void computeWindow(const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
bool can_output_columnar(const RelAlgExecutionUnit &ra_exe_unit, const RenderInfo *render_info, const RelAlgNode *body)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1626
Definition: sqldefs.h:75
size_t g_watchdog_none_encoded_string_translation_limit
Definition: Execute.cpp:82
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
size_t getColInputsSize() const
Definition: RelAlgDag.h:2562
bool g_enable_interop
SQLOps get_optype() const
Definition: Analyzer.h:452
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
Definition: RelAlgDag.h:2110
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1627
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:234
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1624
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:77
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::unordered_set< shared::TableKey > getPhysicalTableIds() const
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
Definition: RelAlgDag.h:174
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1615
void check_none_encoded_string_cast_tuple_limit(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
std::vector< std::pair< std::vector< size_t >, size_t > > per_device_cardinality
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_from_table_reordering
Definition: Execute.cpp:93
size_t g_window_function_aggregation_tree_fanout
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Definition: RelAlgDag.h:869
Classes representing a parse tree.
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2796
size_t query_time_limit
Definition: QueryHint.h:335
bool isGeometry(TargetMetaInfo const &target_meta_info)
void conditionally_change_arg_to_int_type(size_t target_expr_idx, std::shared_ptr< Analyzer::Expr > &target_expr, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos)
ExecutorType executor_type
bool g_enable_system_tables
Definition: SysCatalog.cpp:64
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::vector< unsigned > aggregateResult(const std::vector< unsigned > &aggregate, const std::vector< unsigned > &next_result) const override
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:96
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1619
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
T visit(const RelAlgNode *rel_alg) const
Definition: RelAlgVisitor.h:25
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
MergeType
void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id)
Definition: Execute.cpp:233
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::vector< unsigned > visitLeftDeepInnerJoin(const RelLeftDeepInnerJoin *left_deep_join_tree) const override
TableIdToNodeMap table_id_to_node_map
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
size_t g_big_group_threshold
Definition: Execute.cpp:115
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1621
std::list< Analyzer::OrderEntry > order_entries
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1622
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
std::optional< size_t > getFilteredCountAll(const RelAlgExecutionUnit &ra_exe_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
bool g_bigint_count
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
Definition: sqldefs.h:36
Definition: sqldefs.h:77
void executePostExecutionCallback()
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
bool g_enable_watchdog
RelAlgExecutionUnit createCountAllExecutionUnit(Analyzer::Expr *replacement_target) const
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1618
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:158
bool isSimple() const
Definition: RelAlgDag.h:1307
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1904
specifies the content in-memory of a row in the column metadata table
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:320
bool get_is_distinct() const
Definition: Analyzer.h:1332
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1628
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
TextEncodingCastCounts(const size_t text_decoding_casts, const size_t text_encoding_casts)
JoinType get_join_type(const RelAlgNode *ra)
std::shared_ptr< Analyzer::Expr > translate(const RexScalar *rex) const
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1616
size_t getTableFuncInputsSize() const
Definition: RelAlgDag.h:2560
std::vector< TargetMetaInfo > getCompatibleMetainfoTypes() const
Definition: RelAlgDag.cpp:911
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query, Fragmenter_Namespace::InsertDataLoader &inserter, const Catalog_Namespace::SessionInfo &session)
Argument type based extension function binding.
BoundingBoxIntersectJoinTranslationInfo convert_bbox_intersect_join(JoinQualsPerNestingLevel const &join_quals, std::vector< InputDescriptor > &input_descs, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, std::vector< size_t > &input_permutation, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_desc, Executor const *executor)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
virtual std::string toString(RelRexToStringConfig config=RelRexToStringConfig::defaults()) const =0
Executor * getExecutor() const
std::string * stringval
Definition: Datum.h:79
int get_result_table_id() const
Definition: Analyzer.h:3042
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
Definition: RelAlgDag.h:1531
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::optional< RegisteredQueryHint > getGlobalQueryHint()
bool g_enable_window_functions
Definition: Execute.cpp:116
TextEncodingCastCounts aggregateResult(const TextEncodingCastCounts &aggregate, const TextEncodingCastCounts &next_result) const override
bool isEmptyResult() const
Definition: RelAlgDag.h:2222
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDag > query_dag_
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
size_t preflight_count_query_threshold
Definition: QueryHint.h:337
const RexScalar * getProjectAt(const size_t idx) const
Definition: RelAlgDag.h:1352
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
TextEncodingCastCounts visitBinOper(const Analyzer::BinOper *bin_oper) const override
const RaExecutionDesc * getContextData() const
Definition: RelAlgDag.h:873
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:79
Definition: sqltypes.h:80
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node)
int32_t countRexLiteralArgs() const
Definition: RelAlgDag.cpp:698
bool string_op_returns_string(const SqlStringOpKind kind)
Definition: sqldefs.h:468
Definition: sqldefs.h:71
Expression class for string functions The &quot;arg&quot; constructor parameter must be an expression that reso...
Definition: Analyzer.h:1601
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
size_t get_scan_limit(const RelAlgNode *ra, std::optional< size_t > limit)
#define CHECK_LE(x, y)
Definition: Logger.h:304
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
#define DEFAULT_ROW_MULTIPLIER_VALUE
SqlStringOpKind get_kind() const
Definition: Analyzer.h:1672
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
std::shared_ptr< const query_state::QueryState > query_state_
const Expr * get_like_expr() const
Definition: Analyzer.h:1063
bool table_is_replicated(const TableDescriptor *td)
Catalog & getCatalog() const
Definition: SessionInfo.h:75
bool is_projection(const RelAlgExecutionUnit &ra_exe_unit)
size_t g_estimator_failure_max_groupby_size
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
const Expr * get_operand() const
Definition: Analyzer.h:384
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:383
Basic constructors and methods of the row set interface.
Datum get_constval() const
Definition: Analyzer.h:348
SQLTypes get_int_type_by_size(size_t const nbytes)
Definition: sqltypes.h:1452
Definition: sqldefs.h:31
Definition: sqldefs.h:78
const size_t getGroupByCount() const
Definition: RelAlgDag.h:2121
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool is_none_encoded_string() const
Definition: sqltypes.h:645
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
Definition: RelAlgDag.h:2211
const RelAlgNode * getSourceNode() const
Definition: RelAlgDag.h:1056
bool isAggregate() const
Definition: RelAlgDag.h:2123
void setInputSourceNode(const RelAlgNode *input_source_node)
size_t QueryPlanHash
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
static const int32_t ERR_WIDTH_BUCKET_INVALID_ARGUMENT
Definition: Execute.h:1630
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
Definition: sqltypes.h:68
bool hasDeletedRowInQuery(std::vector< InputTableInfo > const &) const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
const RexScalar * getTableFuncInputAt(const size_t idx) const
Definition: RelAlgDag.h:2566
static const StringDictKey kTransientDictKey
Definition: DbObjectKeys.h:45
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
bool should_output_columnar(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1329
const JoinType getJoinType(const size_t nesting_level) const
bool optimize_cuda_block_and_grid_sizes
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3525
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
Definition: RelAlgDag.h:2551
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool g_enable_bump_allocator
Definition: Execute.cpp:128
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
std::vector< std::string > ColumnNameList
Definition: RelAlgDag.h:72
const RexScalar * getInnerCondition() const
def error_code
Definition: report.py:244
RegisteredQueryHint query_hint
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:595
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
Definition: sqldefs.h:30
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
Find out all the physical inputs (columns) a query is using.
const RelAlgNode * getBody() const
bool g_enable_union
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
TextEncodingCastCountVisitor(const bool default_disregard_casts_to_none_encoding)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:90
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:89
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:455
void cleanupPostExecution()
void initializeParallelismHints()
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
TextEncodingCastCounts visitLikeExpr(const Analyzer::LikeExpr *like) const override
Definition: sqltypes.h:72
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
unsigned dynamic_watchdog_time_limit
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::optional< size_t > getLimit() const
Definition: RelAlgDag.h:2226
bool is_string() const
Definition: sqltypes.h:559
const size_t inputCount() const
Definition: RelAlgDag.h:875
size_t getRelNodeDagId() const
Definition: RelAlgDag.h:921
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
bool g_columnar_large_projections
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
TextEncodingCastCounts defaultResult() const override
size_t aggregate_tree_fanout
Definition: QueryHint.h:345
int8_t * numbersPtr
Definition: sqltypes.h:233
virtual size_t get_num_column_vars(const bool include_agg) const
Definition: Analyzer.cpp:58
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
static constexpr char const * FOREIGN_TABLE
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
Definition: sqldefs.h:76
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:3043
bool sameTypeInfo(std::vector< TargetMetaInfo > const &lhs, std::vector< TargetMetaInfo > const &rhs)
size_t g_columnar_large_projections_threshold
int cpu_threads()
Definition: thread_count.h:25
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2798
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
Definition: Datum.h:69
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
std::pair< std::vector< unsigned >, std::unordered_map< unsigned, JoinQualsPerNestingLevel > > getJoinInfo(const RelAlgNode *root_node)
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1620
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
static ExecutionOptions defaults()
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
const TableDescriptor * getTableDescriptor() const
Definition: RelAlgDag.h:1117
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::string columnName
std::shared_ptr< RelAlgTranslator > getRelAlgTranslator(const RelAlgNode *root_node)
Definition: sqldefs.h:74
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
static size_t getArenaBlockSize()
Definition: Execute.cpp:558
RelAlgDag * getRelAlgDag()
Executor * executor_
bool hasContextData() const
Definition: RelAlgDag.h:871
void prepareForSystemTableExecution(const CompilationOptions &co) const
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:310
const Expr * getArg(const size_t i) const
Definition: Analyzer.h:1688
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound)
SQLOps get_optype() const
Definition: Analyzer.h:383
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
Definition: RelAlgDag.cpp:371
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:164
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
bool g_enable_table_functions
Definition: Execute.cpp:117
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
void setHasStepForUnion(bool flag)
std::string get_table_name_from_table_key(shared::TableKey const &table_key)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
HashTableBuildDagMap hash_table_build_plan_dag
const RexScalar * getScalarSource(const size_t i) const
Definition: RelAlgDag.h:2112
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)