OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
22 #include "Parser/ParserNode.h"
39 #include "QueryEngine/RexVisitor.h"
43 #include "Shared/measure.h"
44 #include "Shared/misc.h"
45 #include "Shared/shard_key.h"
46 
47 #include <boost/algorithm/cxx11/any_of.hpp>
48 #include <boost/range/adaptor/reversed.hpp>
49 
50 #include <algorithm>
51 #include <functional>
52 #include <numeric>
53 
55 bool g_enable_interop{false};
56 bool g_enable_union{true}; // DEPRECATED
60 
61 extern bool g_enable_watchdog;
63 extern bool g_enable_bump_allocator;
65 extern bool g_enable_system_tables;
66 
67 namespace {
68 
69 bool node_is_aggregate(const RelAlgNode* ra) {
70  const auto compound = dynamic_cast<const RelCompound*>(ra);
71  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
72  return ((compound && compound->isAggregate()) || aggregate);
73 }
74 
75 std::unordered_set<PhysicalInput> get_physical_inputs(
77  const RelAlgNode* ra) {
78  auto phys_inputs = get_physical_inputs(ra);
79  std::unordered_set<PhysicalInput> phys_inputs2;
80  for (auto& phi : phys_inputs) {
81  phys_inputs2.insert(
82  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
83  }
84  return phys_inputs2;
85 }
86 
87 void set_parallelism_hints(const RelAlgNode& ra_node,
88  const Catalog_Namespace::Catalog& catalog) {
89  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
90  parallelism_hints_per_table;
91  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
92  int table_id = physical_input.table_id;
93  auto table = catalog.getMetadataForTable(table_id, false);
94  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
95  !table->is_system_table) {
96  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
97  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
98  auto foreign_table = catalog.getForeignTable(table_id);
99  for (const auto& fragment :
100  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
101  Chunk_NS::Chunk chunk{col_desc};
102  ChunkKey chunk_key = {
103  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
104 
105  // Parallelism hints should not include fragments that are not mapped to the
106  // current node, otherwise we will try to prefetch them and run into trouble.
108  continue;
109  }
110 
111  // do not include chunk hints that are in CPU memory
112  if (!chunk.isChunkOnDevice(
113  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
114  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
116  fragment.fragmentId});
117  }
118  }
119  }
120  }
121  if (!parallelism_hints_per_table.empty()) {
122  auto foreign_storage_mgr =
124  CHECK(foreign_storage_mgr);
125  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
126  }
127 }
128 
130  const Catalog_Namespace::Catalog& catalog) {
131  for (const auto [col_id, table_id] : get_physical_inputs(&ra_node)) {
132  auto table = catalog.getMetadataForTable(table_id, false);
133  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
134  auto spi_col_id = catalog.getColumnIdBySpi(table_id, col_id);
135  foreign_storage::populate_string_dictionary(table_id, spi_col_id, catalog);
136  }
137  }
138 }
139 
141  const Catalog_Namespace::Catalog& catalog) {
142  // Iterate through ra_node inputs for types that need to be loaded pre-execution
143  // If they do not have valid metadata, load them into CPU memory to generate
144  // the metadata and leave them ready to be used by the query
145  set_parallelism_hints(ra_node, catalog);
146  prepare_string_dictionaries(ra_node, catalog);
147 }
148 
150  const Catalog_Namespace::Catalog& catalog,
151  const CompilationOptions& co) {
153  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
154  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
155  int table_id = physical_input.table_id;
156  auto table = catalog.getMetadataForTable(table_id, false);
157  if (table && table->is_system_table) {
158  auto column_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
159  system_table_columns_by_table_id[table_id].emplace_back(column_id);
160  }
161  }
162  // Execute on CPU for queries involving system tables
163  if (!system_table_columns_by_table_id.empty() &&
165  throw QueryMustRunOnCpu();
166  }
167 
168  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
169  // Clear any previously cached data, since system tables depend on point in
170  // time data snapshots.
172  ChunkKey{catalog.getDatabaseId(), table_id}, Data_Namespace::CPU_LEVEL);
173  auto td = catalog.getMetadataForTable(table_id);
174  CHECK(td);
175  CHECK(td->fragmenter);
176  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
177  CHECK_LE(fragment_count, static_cast<size_t>(1))
178  << "In-memory system tables are expected to have a single fragment.";
179  if (fragment_count > 0) {
180  for (auto column_id : column_ids) {
181  // Prefetch system table chunks in order to force chunk statistics metadata
182  // computation.
183  auto cd = catalog.getMetadataForColumn(table_id, column_id);
184  ChunkKey chunk_key{catalog.getDatabaseId(), table_id, column_id, 0};
186  cd, &(catalog.getDataMgr()), chunk_key, Data_Namespace::CPU_LEVEL, 0, 0, 0);
187  }
188  }
189  }
190  }
191 }
192 
195 }
196 
197 // TODO(alex): Once we're fully migrated to the relational algebra model, change
198 // the executor interface to use the collation directly and remove this conversion.
199 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
200  std::list<Analyzer::OrderEntry> result;
201  for (size_t i = 0; i < sort->collationCount(); ++i) {
202  const auto sort_field = sort->getCollation(i);
203  result.emplace_back(sort_field.getField() + 1,
204  sort_field.getSortDir() == SortDirection::Descending,
205  sort_field.getNullsPosition() == NullSortedPosition::First);
206  }
207  return result;
208 }
209 
211  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
212  const std::vector<TargetMetaInfo>& targets_meta) {
213  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
214  render_info.targets.clear();
215  for (size_t i = 0; i < targets_meta.size(); ++i) {
216  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
217  targets_meta[i].get_resname(),
218  work_unit_target_exprs[i]->get_shared_ptr(),
219  false));
220  }
221 }
222 
224  return eo.just_validate || eo.just_explain || eo.just_calcite_explain;
225 }
226 
227 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
228  public:
229  std::vector<unsigned> visitLeftDeepInnerJoin(
230  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
231  return {left_deep_join_tree->getId()};
232  }
233 
234  protected:
235  std::vector<unsigned> aggregateResult(
236  const std::vector<unsigned>& aggregate,
237  const std::vector<unsigned>& next_result) const override {
238  auto result = aggregate;
239  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
240  return result;
241  }
242 };
243 
247  TextEncodingCastCounts() : text_decoding_casts(0UL), text_encoding_casts(0UL) {}
248  TextEncodingCastCounts(const size_t text_decoding_casts,
249  const size_t text_encoding_casts)
250  : text_decoding_casts(text_decoding_casts)
251  , text_encoding_casts(text_encoding_casts) {}
252 };
253 class TextEncodingCastCountVisitor : public ScalarExprVisitor<TextEncodingCastCounts> {
254  protected:
255  TextEncodingCastCounts visitUOper(const Analyzer::UOper* u_oper) const override {
256  TextEncodingCastCounts result = defaultResult();
257  const bool disregard_cast_to_none_encoding = disregard_cast_to_none_encoding_;
258  result = aggregateResult(result, visit(u_oper->get_operand()));
259  if (u_oper->get_optype() != kCAST) {
260  return result;
261  }
262  const auto& operand_ti = u_oper->get_operand()->get_type_info();
263  const auto& casted_ti = u_oper->get_type_info();
264  if (!operand_ti.is_string() || !casted_ti.is_string()) {
265  return result;
266  }
267  const bool literals_only = u_oper->get_operand()->get_num_column_vars(true) == 0UL;
268  if (literals_only) {
269  return result;
270  }
271  if (operand_ti.is_none_encoded_string() && casted_ti.is_dict_encoded_string()) {
272  return aggregateResult(result, TextEncodingCastCounts(0UL, 1UL));
273  }
274  if (operand_ti.is_dict_encoded_string() && casted_ti.is_none_encoded_string()) {
275  if (!disregard_cast_to_none_encoding) {
276  return aggregateResult(result, TextEncodingCastCounts(1UL, 0UL));
277  } else {
278  return result;
279  }
280  }
281  return result;
282  }
283 
285  TextEncodingCastCounts result = defaultResult();
286  const auto u_oper = dynamic_cast<const Analyzer::UOper*>(like->get_arg());
287  if (u_oper && u_oper->get_optype() == kCAST) {
288  disregard_cast_to_none_encoding_ = true;
289  result = aggregateResult(result, visitUOper(u_oper));
290  } else {
291  result = aggregateResult(result, visit(like->get_arg()));
292  }
293  result = aggregateResult(result, visit(like->get_like_expr()));
294  if (like->get_escape_expr()) {
295  result = aggregateResult(result, visit(like->get_escape_expr()));
296  }
297  return result;
298  }
299 
301  const TextEncodingCastCounts& aggregate,
302  const TextEncodingCastCounts& next_result) const override {
303  auto result = aggregate;
304  result.text_decoding_casts += next_result.text_decoding_casts;
305  result.text_encoding_casts += next_result.text_encoding_casts;
306  return result;
307  }
308 
309  void visitBegin() const override { disregard_cast_to_none_encoding_ = false; }
310 
312  return TextEncodingCastCounts();
313  }
314 
315  private:
316  mutable bool disregard_cast_to_none_encoding_ = false;
317 };
318 
320  TextEncodingCastCounts cast_counts;
321 
322  auto check_node_for_text_casts = [&cast_counts](const Analyzer::Expr* expr) {
323  if (!expr) {
324  return;
325  }
327  const auto this_node_cast_counts = visitor.visit(expr);
328  cast_counts.text_encoding_casts += this_node_cast_counts.text_encoding_casts;
329  cast_counts.text_decoding_casts += this_node_cast_counts.text_decoding_casts;
330  };
331 
332  for (const auto& qual : ra_exe_unit.quals) {
333  check_node_for_text_casts(qual.get());
334  }
335  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
336  check_node_for_text_casts(simple_qual.get());
337  }
338  for (const auto& groupby_expr : ra_exe_unit.groupby_exprs) {
339  check_node_for_text_casts(groupby_expr.get());
340  }
341  for (const auto& target_expr : ra_exe_unit.target_exprs) {
342  check_node_for_text_casts(target_expr);
343  }
344  for (const auto& join_condition : ra_exe_unit.join_quals) {
345  for (const auto& join_qual : join_condition.quals) {
346  check_node_for_text_casts(join_qual.get());
347  }
348  }
349  return cast_counts;
350 }
351 
353  const std::vector<InputTableInfo>& query_infos,
354  const RelAlgExecutionUnit& ra_exe_unit) {
355  if (!g_enable_watchdog) {
356  return;
357  }
358  auto const tuples_upper_bound =
359  std::accumulate(query_infos.cbegin(),
360  query_infos.cend(),
361  size_t(0),
362  [](auto max, auto const& query_info) {
363  return std::max(max, query_info.info.getNumTuples());
364  });
365  if (tuples_upper_bound <= g_watchdog_none_encoded_string_translation_limit) {
366  return;
367  }
368 
369  const auto& text_cast_counts = get_text_cast_counts(ra_exe_unit);
370  const bool has_text_casts =
371  text_cast_counts.text_decoding_casts + text_cast_counts.text_encoding_casts > 0UL;
372 
373  if (!has_text_casts) {
374  return;
375  }
376  std::ostringstream oss;
377  oss << "Query requires one or more casts between none-encoded and dictionary-encoded "
378  << "strings, and the estimated table size (" << tuples_upper_bound << " rows) "
379  << "exceeds the configured watchdog none-encoded string translation limit of "
381  throw std::runtime_error(oss.str());
382 }
383 
384 } // namespace
385 
387  RenderInfo* render_info) const {
388  auto validate_or_explain_query = is_validate_or_explain_query(eo);
389  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
391  !validate_or_explain_query && !hasStepForUnion() &&
392  !query_for_partial_outer_frag &&
393  (!render_info || (render_info && !render_info->isPotentialInSituRender()));
394 }
395 
397  const ExecutionOptions& eo) {
398  if (eo.find_push_down_candidates) {
399  return 0;
400  }
401 
402  if (eo.just_explain) {
403  return 0;
404  }
405 
406  CHECK(query_dag_);
407 
408  query_dag_->resetQueryExecutionState();
409  const auto& ra = query_dag_->getRootNode();
410 
411  auto lock = executor_->acquireExecuteMutex();
412  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
413  setupCaching(&ra);
414 
415  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
416  auto ed_seq = RaExecutionSequence(&ra, executor_);
417 
418  if (!getSubqueries().empty()) {
419  return 0;
420  }
421 
422  CHECK(!ed_seq.empty());
423  if (ed_seq.size() > 1) {
424  return 0;
425  }
426 
429  executor_->setCatalog(&cat_);
430  executor_->temporary_tables_ = &temporary_tables_;
431 
433  auto exec_desc_ptr = ed_seq.getDescriptor(0);
434  CHECK(exec_desc_ptr);
435  auto& exec_desc = *exec_desc_ptr;
436  const auto body = exec_desc.getBody();
437  if (body->isNop()) {
438  return 0;
439  }
440 
441  const auto project = dynamic_cast<const RelProject*>(body);
442  if (project) {
443  auto work_unit =
444  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
445 
446  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
447  executor_);
448  }
449 
450  const auto compound = dynamic_cast<const RelCompound*>(body);
451  if (compound) {
452  if (compound->isDeleteViaSelect()) {
453  return 0;
454  } else if (compound->isUpdateViaSelect()) {
455  return 0;
456  } else {
457  if (compound->isAggregate()) {
458  return 0;
459  }
460 
461  const auto work_unit =
462  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
463 
464  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
465  executor_);
466  }
467  }
468 
469  return 0;
470 }
471 
473  const ExecutionOptions& eo,
474  const bool just_explain_plan,
475  RenderInfo* render_info) {
476  CHECK(query_dag_);
477  auto timer = DEBUG_TIMER(__func__);
479 
480  auto run_query = [&](const CompilationOptions& co_in) {
481  auto execution_result =
482  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
483 
484  constexpr bool vlog_result_set_summary{false};
485  if constexpr (vlog_result_set_summary) {
486  VLOG(1) << execution_result.getRows()->summaryToString();
487  }
488 
490  VLOG(1) << "Running post execution callback.";
491  (*post_execution_callback_)();
492  }
493  return execution_result;
494  };
495 
496  try {
497  return run_query(co);
498  } catch (const QueryMustRunOnCpu&) {
499  if (!g_allow_cpu_retry) {
500  throw;
501  }
502  }
503  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
504  auto co_cpu = CompilationOptions::makeCpuOnly(co);
505 
506  if (render_info) {
507  render_info->setForceNonInSituData();
508  }
509  return run_query(co_cpu);
510 }
511 
513  const ExecutionOptions& eo,
514  const bool just_explain_plan,
515  RenderInfo* render_info) {
517  auto timer = DEBUG_TIMER(__func__);
518  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
519 
520  query_dag_->resetQueryExecutionState();
521  const auto& ra = query_dag_->getRootNode();
522 
523  // capture the lock acquistion time
524  auto clock_begin = timer_start();
526  executor_->resetInterrupt();
527  }
528  std::string query_session{""};
529  std::string query_str{"N/A"};
530  std::string query_submitted_time{""};
531  // gather necessary query's info
532  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
533  query_session = query_state_->getConstSessionInfo()->get_session_id();
534  query_str = query_state_->getQueryStr();
535  query_submitted_time = query_state_->getQuerySubmittedTime();
536  }
537 
538  auto validate_or_explain_query =
539  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
540  auto interruptable = !render_info && !query_session.empty() &&
541  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
542  if (interruptable) {
543  // if we reach here, the current query which was waiting an idle executor
544  // within the dispatch queue is now scheduled to the specific executor
545  // (not UNITARY_EXECUTOR)
546  // so we update the query session's status with the executor that takes this query
547  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
548  query_session, query_str, query_submitted_time);
549 
550  // now the query is going to be executed, so update the status as
551  // "RUNNING_QUERY_KERNEL"
552  executor_->updateQuerySessionStatus(
553  query_session,
554  query_submitted_time,
555  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
556  }
557 
558  // so it should do cleanup session info after finishing its execution
559  ScopeGuard clearQuerySessionInfo =
560  [this, &query_session, &interruptable, &query_submitted_time] {
561  // reset the runtime query interrupt status after the end of query execution
562  if (interruptable) {
563  // cleanup running session's info
564  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
565  }
566  };
567 
568  auto acquire_execute_mutex = [](Executor * executor) -> auto {
569  auto ret = executor->acquireExecuteMutex();
570  return ret;
571  };
572  // now we acquire executor lock in here to make sure that this executor holds
573  // all necessary resources and at the same time protect them against other executor
574  auto lock = acquire_execute_mutex(executor_);
575 
576  if (interruptable) {
577  // check whether this query session is "already" interrupted
578  // this case occurs when there is very short gap between being interrupted and
579  // taking the execute lock
580  // if so we have to remove "all" queries initiated by this session and we do in here
581  // without running the query
582  try {
583  executor_->checkPendingQueryStatus(query_session);
584  } catch (QueryExecutionError& e) {
586  throw std::runtime_error("Query execution has been interrupted (pending query)");
587  }
588  throw e;
589  } catch (...) {
590  throw std::runtime_error("Checking pending query status failed: unknown error");
591  }
592  }
593  int64_t queue_time_ms = timer_stop(clock_begin);
594 
596 
597  // Notify foreign tables to load prior to caching
599 
600  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
601  setupCaching(&ra);
602 
603  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
604  auto ed_seq = RaExecutionSequence(&ra, executor_);
605 
606  if (just_explain_plan) {
607  std::stringstream ss;
608  std::vector<const RelAlgNode*> nodes;
609  for (size_t i = 0; i < ed_seq.size(); i++) {
610  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
611  }
612  size_t ctr_node_id_in_plan = nodes.size();
613  for (auto& body : boost::adaptors::reverse(nodes)) {
614  // we set each node's id in the query plan in advance before calling toString
615  // method to properly represent the query plan
616  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
617  body->setIdInPlanTree(node_id_in_plan_tree);
618  }
619  size_t ctr = nodes.size();
620  size_t tab_ctr = 0;
621  RelRexToStringConfig config;
622  config.skip_input_nodes = true;
623  for (auto& body : boost::adaptors::reverse(nodes)) {
624  const auto index = ctr--;
625  const auto tabs = std::string(tab_ctr++, '\t');
626  CHECK(body);
627  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
628  if (auto sort = dynamic_cast<const RelSort*>(body)) {
629  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
630  }
631  if (dynamic_cast<const RelProject*>(body) ||
632  dynamic_cast<const RelCompound*>(body)) {
633  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
634  ss << tabs << " : " << join->toString(config) << "\n";
635  }
636  }
637  }
638  const auto& subqueries = getSubqueries();
639  if (!subqueries.empty()) {
640  ss << "Subqueries: "
641  << "\n";
642  for (const auto& subquery : subqueries) {
643  const auto ra = subquery->getRelAlg();
644  ss << "\t" << ra->toString(config) << "\n";
645  }
646  }
647  auto rs = std::make_shared<ResultSet>(ss.str());
648  return {rs, {}};
649  }
650 
651  if (eo.find_push_down_candidates) {
652  // this extra logic is mainly due to current limitations on multi-step queries
653  // and/or subqueries.
655  ed_seq, co, eo, render_info, queue_time_ms);
656  }
657  timer_setup.stop();
658 
659  // Dispatch the subqueries first
660  for (auto subquery : getSubqueries()) {
661  const auto subquery_ra = subquery->getRelAlg();
662  CHECK(subquery_ra);
663  if (subquery_ra->hasContextData()) {
664  continue;
665  }
666  // Execute the subquery and cache the result.
667  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
668  RaExecutionSequence subquery_seq(subquery_ra, executor_);
669  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
670  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
671  }
672  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
673 }
674 
676  AggregatedColRange agg_col_range_cache;
677  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
678  return executor_->computeColRangesCache(phys_inputs);
679 }
680 
682  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
683  return executor_->computeStringDictionaryGenerations(phys_inputs);
684 }
685 
687  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
688  return executor_->computeTableGenerations(phys_table_ids);
689 }
690 
691 Executor* RelAlgExecutor::getExecutor() const {
692  return executor_;
693 }
694 
696  CHECK(executor_);
697  executor_->row_set_mem_owner_ = nullptr;
698 }
699 
700 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
702  auto sort_node = dynamic_cast<const RelSort*>(root_node);
703  if (sort_node) {
704  // we assume that test query that needs join info does not contain any sort node
705  return {};
706  }
707  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
708  RelLeftDeepTreeIdsCollector visitor;
709  auto left_deep_tree_ids = visitor.visit(root_node);
710  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
711 }
712 
713 namespace {
714 
716  CHECK_EQ(size_t(1), sort->inputCount());
717  const auto source = sort->getInput(0);
718  if (dynamic_cast<const RelSort*>(source)) {
719  throw std::runtime_error("Sort node not supported as input to another sort");
720  }
721 }
722 
723 } // namespace
724 
726  const RaExecutionSequence& seq,
727  const size_t step_idx,
728  const CompilationOptions& co,
729  const ExecutionOptions& eo,
730  RenderInfo* render_info) {
731  INJECT_TIMER(executeRelAlgQueryStep);
732 
733  auto exe_desc_ptr = seq.getDescriptor(step_idx);
734  CHECK(exe_desc_ptr);
735  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
736 
737  size_t shard_count{0};
738  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
739  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
740  };
741 
742  if (sort) {
744  auto order_entries = get_order_entries(sort);
745  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
747  source_work_unit.exe_unit, *executor_->getCatalog());
748  if (!shard_count) {
749  // No point in sorting on the leaf, only execute the input to the sort node.
750  CHECK_EQ(size_t(1), sort->inputCount());
751  const auto source = sort->getInput(0);
752  if (sort->collationCount() || node_is_aggregate(source)) {
753  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
754  CHECK_EQ(temp_seq.size(), size_t(1));
755  ExecutionOptions eo_copy = {
757  eo.keep_result,
758  eo.allow_multifrag,
759  eo.just_explain,
760  eo.allow_loop_joins,
761  eo.with_watchdog,
762  eo.jit_debug,
763  eo.just_validate || sort->isEmptyResult(),
772  eo.executor_type,
773  };
774  // Use subseq to avoid clearing existing temporary tables
775  return {
776  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
777  merge_type(source),
778  source->getId(),
779  false};
780  }
781  }
782  }
785  std::make_pair(step_idx, step_idx + 1),
786  co,
787  eo,
788  render_info,
790  merge_type(exe_desc_ptr->getBody()),
791  exe_desc_ptr->getBody()->getId(),
792  false};
794  VLOG(1) << "Running post execution callback.";
795  (*post_execution_callback_)();
796  }
797  return result;
798 }
799 
801  const AggregatedColRange& agg_col_range,
802  const StringDictionaryGenerations& string_dictionary_generations,
803  const TableGenerations& table_generations) {
804  // capture the lock acquistion time
805  auto clock_begin = timer_start();
807  executor_->resetInterrupt();
808  }
809  queue_time_ms_ = timer_stop(clock_begin);
810  executor_->row_set_mem_owner_ =
811  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
812  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
813  executor_->table_generations_ = table_generations;
814  executor_->agg_col_range_cache_ = agg_col_range;
815 }
816 
818  const CompilationOptions& co,
819  const ExecutionOptions& eo,
820  RenderInfo* render_info,
821  const int64_t queue_time_ms,
822  const bool with_existing_temp_tables) {
824  auto timer = DEBUG_TIMER(__func__);
825  if (!with_existing_temp_tables) {
827  }
830  executor_->setCatalog(&cat_);
831  executor_->temporary_tables_ = &temporary_tables_;
832 
833  time(&now_);
834  CHECK(!seq.empty());
835 
836  auto get_descriptor_count = [&seq, &eo]() -> size_t {
837  if (eo.just_explain) {
838  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
839  // run the logical values descriptor to generate the result set, then the next
840  // descriptor to generate the explain
841  CHECK_GE(seq.size(), size_t(2));
842  return 2;
843  } else {
844  return 1;
845  }
846  } else {
847  return seq.size();
848  }
849  };
850 
851  const auto exec_desc_count = get_descriptor_count();
852  auto eo_copied = eo;
853  if (seq.hasQueryStepForUnion()) {
854  // we currently do not support resultset recycling when an input query
855  // contains union (all) operation
856  eo_copied.keep_result = false;
857  }
858 
859  // we have to register resultset(s) of the skipped query step(s) as temporary table
860  // before executing the remaining query steps
861  // since they may be required during the query processing
862  // i.e., get metadata of the target expression from the skipped query step
864  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
865  const auto cached_res =
866  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
867  CHECK(cached_res);
868  addTemporaryTable(kv.first, cached_res);
869  }
870  }
871 
872  const auto num_steps = exec_desc_count - 1;
873  for (size_t i = 0; i < exec_desc_count; i++) {
874  VLOG(1) << "Executing query step " << i << " / " << num_steps;
875  try {
877  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
878  } catch (const QueryMustRunOnCpu&) {
879  // Do not allow per-step retry if flag is off or in distributed mode
880  // TODO(todd): Determine if and when we can relax this restriction
881  // for distributed
884  throw;
885  }
886  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
887  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
888  if (render_info && i == num_steps) {
889  // only render on the last step
890  render_info->setForceNonInSituData();
891  }
892  executeRelAlgStep(seq,
893  i,
894  co_cpu,
895  eo_copied,
896  (i == num_steps) ? render_info : nullptr,
897  queue_time_ms);
898  } catch (const NativeExecutionError&) {
899  if (!g_enable_interop) {
900  throw;
901  }
902  auto eo_extern = eo_copied;
903  eo_extern.executor_type = ::ExecutorType::Extern;
904  auto exec_desc_ptr = seq.getDescriptor(i);
905  const auto body = exec_desc_ptr->getBody();
906  const auto compound = dynamic_cast<const RelCompound*>(body);
907  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
908  LOG(INFO) << "Also failed to run the query using interoperability";
909  throw;
910  }
912  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
913  }
914  }
915 
916  return seq.getDescriptor(num_steps)->getResult();
917 }
918 
920  const RaExecutionSequence& seq,
921  const std::pair<size_t, size_t> interval,
922  const CompilationOptions& co,
923  const ExecutionOptions& eo,
924  RenderInfo* render_info,
925  const int64_t queue_time_ms) {
927  executor_->setCatalog(&cat_);
928  executor_->temporary_tables_ = &temporary_tables_;
930  time(&now_);
931  for (size_t i = interval.first; i < interval.second; i++) {
932  // only render on the last step
933  try {
934  executeRelAlgStep(seq,
935  i,
936  co,
937  eo,
938  (i == interval.second - 1) ? render_info : nullptr,
939  queue_time_ms);
940  } catch (const QueryMustRunOnCpu&) {
941  // Do not allow per-step retry if flag is off or in distributed mode
942  // TODO(todd): Determine if and when we can relax this restriction
943  // for distributed
946  throw;
947  }
948  LOG(INFO) << "Retrying current query step " << i << " on CPU";
949  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
950  if (render_info && i == interval.second - 1) {
951  render_info->setForceNonInSituData();
952  }
953  executeRelAlgStep(seq,
954  i,
955  co_cpu,
956  eo,
957  (i == interval.second - 1) ? render_info : nullptr,
958  queue_time_ms);
959  }
960  }
961 
962  return seq.getDescriptor(interval.second - 1)->getResult();
963 }
964 
966  const size_t step_idx,
967  const CompilationOptions& co,
968  const ExecutionOptions& eo,
969  RenderInfo* render_info,
970  const int64_t queue_time_ms) {
972  auto timer = DEBUG_TIMER(__func__);
974  auto exec_desc_ptr = seq.getDescriptor(step_idx);
975  CHECK(exec_desc_ptr);
976  auto& exec_desc = *exec_desc_ptr;
977  const auto body = exec_desc.getBody();
978  if (body->isNop()) {
979  handleNop(exec_desc);
980  return;
981  }
982 
983  const ExecutionOptions eo_work_unit{
985  eo.keep_result,
986  eo.allow_multifrag,
987  eo.just_explain,
988  eo.allow_loop_joins,
989  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
990  eo.jit_debug,
991  eo.just_validate,
1000  eo.executor_type,
1001  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
1002 
1003  auto handle_hint = [co,
1004  eo_work_unit,
1005  body,
1006  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1007  ExecutionOptions eo_hint_applied = eo_work_unit;
1008  CompilationOptions co_hint_applied = co;
1009  auto target_node = body;
1010  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1011  target_node = sort_body->getInput(0);
1012  }
1013  auto query_hints = getParsedQueryHint(target_node);
1014  auto columnar_output_hint_enabled = false;
1015  auto rowwise_output_hint_enabled = false;
1016  if (query_hints) {
1017  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1018  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1019  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1020  }
1021  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1022  if (!g_enable_data_recycler) {
1023  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1024  "recycler is disabled";
1025  }
1027  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1028  "resultset recycler is disabled";
1029  } else {
1030  VLOG(1) << "A user enables keeping query resultset";
1031  eo_hint_applied.keep_result = true;
1032  }
1033  }
1034  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1035  // we use this hint within the function 'executeTableFunction`
1036  if (!g_enable_data_recycler) {
1037  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1038  "since data recycler is disabled";
1039  }
1041  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1042  "since query resultset recycler is disabled";
1043  } else {
1044  VLOG(1) << "A user enables keeping table function's resultset";
1045  eo_hint_applied.keep_result = true;
1046  }
1047  }
1048  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1049  VLOG(1) << "A user forces the query to run with columnar output";
1050  columnar_output_hint_enabled = true;
1051  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1052  VLOG(1) << "A user forces the query to run with rowwise output";
1053  rowwise_output_hint_enabled = true;
1054  }
1055  }
1056  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1057  ? !rowwise_output_hint_enabled
1058  : columnar_output_hint_enabled;
1059  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1060  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1061  "output layout in distributed mode.";
1062  }
1063  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1064  return std::make_pair(co_hint_applied, eo_hint_applied);
1065  };
1066 
1067  auto hint_applied = handle_hint();
1069 
1070  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1071  if (auto cached_resultset =
1072  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1073  body->getQueryPlanDagHash())) {
1074  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1075  << " from resultset cache";
1076  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1077  if (render_info) {
1078  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1079  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1080  body->getQueryPlanDagHash());
1081  std::vector<Analyzer::Expr*> copied_target_exprs;
1082  for (const auto& expr : cached_target_exprs) {
1083  copied_target_exprs.push_back(expr.get());
1084  }
1086  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1087  }
1088  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1089  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1090  return;
1091  }
1092  }
1093 
1094  const auto compound = dynamic_cast<const RelCompound*>(body);
1095  if (compound) {
1096  if (compound->isDeleteViaSelect()) {
1097  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1098  } else if (compound->isUpdateViaSelect()) {
1099  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1100  } else {
1101  exec_desc.setResult(executeCompound(
1102  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1103  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1104  << static_cast<int>(-compound->getId()) << ", ...)"
1105  << " exec_desc.getResult().getDataPtr()->rowCount()="
1106  << exec_desc.getResult().getDataPtr()->rowCount();
1107  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1108  return;
1109  }
1110  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1111  }
1112  return;
1113  }
1114  const auto project = dynamic_cast<const RelProject*>(body);
1115  if (project) {
1116  if (project->isDeleteViaSelect()) {
1117  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1118  } else if (project->isUpdateViaSelect()) {
1119  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1120  } else {
1121  std::optional<size_t> prev_count;
1122  // Disabling the intermediate count optimization in distributed, as the previous
1123  // execution descriptor will likely not hold the aggregated result.
1124  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1125  // If the previous node produced a reliable count, skip the pre-flight count.
1126  RelAlgNode const* const prev_body = project->getInput(0);
1127  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1128  if (RaExecutionDesc const* const prev_exec_desc =
1129  prev_body->hasContextData()
1130  ? prev_body->getContextData()
1131  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1132  const auto& prev_exe_result = prev_exec_desc->getResult();
1133  const auto prev_result = prev_exe_result.getRows();
1134  if (prev_result) {
1135  prev_count = prev_result->rowCount();
1136  VLOG(3) << "Setting output row count for projection node to previous node ("
1137  << prev_exec_desc->getBody()->toString(
1139  << ") to " << *prev_count;
1140  }
1141  }
1142  }
1143  }
1144  exec_desc.setResult(executeProject(project,
1145  hint_applied.first,
1146  hint_applied.second,
1147  render_info,
1148  queue_time_ms,
1149  prev_count));
1150  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1151  << static_cast<int>(-project->getId()) << ", ...)"
1152  << " exec_desc.getResult().getDataPtr()->rowCount()="
1153  << exec_desc.getResult().getDataPtr()->rowCount();
1154  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1155  return;
1156  }
1157  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1158  }
1159  return;
1160  }
1161  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1162  if (aggregate) {
1163  exec_desc.setResult(executeAggregate(
1164  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1165  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1166  return;
1167  }
1168  const auto filter = dynamic_cast<const RelFilter*>(body);
1169  if (filter) {
1170  exec_desc.setResult(executeFilter(
1171  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1172  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1173  return;
1174  }
1175  const auto sort = dynamic_cast<const RelSort*>(body);
1176  if (sort) {
1177  exec_desc.setResult(executeSort(
1178  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1179  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1180  return;
1181  }
1182  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1183  return;
1184  }
1185  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1186  if (logical_values) {
1187  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1188  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1189  return;
1190  }
1191  const auto modify = dynamic_cast<const RelModify*>(body);
1192  if (modify) {
1193  exec_desc.setResult(executeModify(modify, hint_applied.second));
1194  return;
1195  }
1196  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1197  if (logical_union) {
1198  exec_desc.setResult(executeUnion(logical_union,
1199  seq,
1200  hint_applied.first,
1201  hint_applied.second,
1202  render_info,
1203  queue_time_ms));
1204  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1205  return;
1206  }
1207  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1208  if (table_func) {
1209  exec_desc.setResult(executeTableFunction(
1210  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1211  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1212  return;
1213  }
1214  LOG(FATAL) << "Unhandled body type: "
1215  << body->toString(RelRexToStringConfig::defaults());
1216 }
1217 
1219  // just set the result of the previous node as the result of no op
1220  auto body = ed.getBody();
1221  CHECK(dynamic_cast<const RelAggregate*>(body));
1222  CHECK_EQ(size_t(1), body->inputCount());
1223  const auto input = body->getInput(0);
1224  body->setOutputMetainfo(input->getOutputMetainfo());
1225  const auto it = temporary_tables_.find(-input->getId());
1226  CHECK(it != temporary_tables_.end());
1227  // set up temp table as it could be used by the outer query or next step
1228  addTemporaryTable(-body->getId(), it->second);
1229 
1230  ed.setResult({it->second, input->getOutputMetainfo()});
1231 }
1232 
1233 namespace {
1234 
1235 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1236  public:
1238 
1239  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1240  return synthesized_physical_inputs_owned;
1241  }
1242 
1243  std::unordered_set<const RexInput*> visitInput(
1244  const RexInput* rex_input) const override {
1245  const auto input_ra = rex_input->getSourceNode();
1246  CHECK(input_ra);
1247  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1248  if (scan_ra) {
1249  const auto td = scan_ra->getTableDescriptor();
1250  if (td) {
1251  const auto col_id = rex_input->getIndex();
1252  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
1253  if (cd && cd->columnType.get_physical_cols() > 0) {
1254  CHECK(IS_GEO(cd->columnType.get_type()));
1255  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1256  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1257  auto physical_input =
1258  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1259  synthesized_physical_inputs_owned.emplace_back(physical_input);
1260  synthesized_physical_inputs.insert(physical_input);
1261  }
1262  return synthesized_physical_inputs;
1263  }
1264  }
1265  }
1266  return {rex_input};
1267  }
1268 
1269  protected:
1270  std::unordered_set<const RexInput*> aggregateResult(
1271  const std::unordered_set<const RexInput*>& aggregate,
1272  const std::unordered_set<const RexInput*>& next_result) const override {
1273  auto result = aggregate;
1274  result.insert(next_result.begin(), next_result.end());
1275  return result;
1276  }
1277 
1278  private:
1279  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1281 };
1282 
1283 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1284  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1285  return table_func;
1286  }
1287  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1288  CHECK_EQ(size_t(2), join->inputCount());
1289  return join;
1290  }
1291  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1292  CHECK_EQ(size_t(1), ra_node->inputCount());
1293  }
1294  auto only_src = ra_node->getInput(0);
1295  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1296  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1297  return is_join ? only_src : ra_node;
1298 }
1299 
1300 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1302  RexUsedInputsVisitor visitor(cat);
1303  const auto filter_expr = compound->getFilterExpr();
1304  std::unordered_set<const RexInput*> used_inputs =
1305  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1306  const auto sources_size = compound->getScalarSourcesSize();
1307  for (size_t i = 0; i < sources_size; ++i) {
1308  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1309  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1310  }
1311  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1312  return std::make_pair(used_inputs, used_inputs_owned);
1313 }
1314 
1315 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1317  CHECK_EQ(size_t(1), aggregate->inputCount());
1318  std::unordered_set<const RexInput*> used_inputs;
1319  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1320  const auto source = aggregate->getInput(0);
1321  const auto& in_metainfo = source->getOutputMetainfo();
1322  const auto group_count = aggregate->getGroupByCount();
1323  CHECK_GE(in_metainfo.size(), group_count);
1324  for (size_t i = 0; i < group_count; ++i) {
1325  auto synthesized_used_input = new RexInput(source, i);
1326  used_inputs_owned.emplace_back(synthesized_used_input);
1327  used_inputs.insert(synthesized_used_input);
1328  }
1329  for (const auto& agg_expr : aggregate->getAggExprs()) {
1330  for (size_t i = 0; i < agg_expr->size(); ++i) {
1331  const auto operand_idx = agg_expr->getOperand(i);
1332  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1333  auto synthesized_used_input = new RexInput(source, operand_idx);
1334  used_inputs_owned.emplace_back(synthesized_used_input);
1335  used_inputs.insert(synthesized_used_input);
1336  }
1337  }
1338  return std::make_pair(used_inputs, used_inputs_owned);
1339 }
1340 
1341 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1343  RexUsedInputsVisitor visitor(cat);
1344  std::unordered_set<const RexInput*> used_inputs;
1345  for (size_t i = 0; i < project->size(); ++i) {
1346  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1347  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1348  }
1349  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1350  return std::make_pair(used_inputs, used_inputs_owned);
1351 }
1352 
1353 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1356  RexUsedInputsVisitor visitor(cat);
1357  std::unordered_set<const RexInput*> used_inputs;
1358  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1359  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1360  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1361  }
1362  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1363  return std::make_pair(used_inputs, used_inputs_owned);
1364 }
1365 
1366 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1368  std::unordered_set<const RexInput*> used_inputs;
1369  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1370  const auto data_sink_node = get_data_sink(filter);
1371  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1372  const auto source = data_sink_node->getInput(nest_level);
1373  const auto scan_source = dynamic_cast<const RelScan*>(source);
1374  if (scan_source) {
1375  CHECK(source->getOutputMetainfo().empty());
1376  for (size_t i = 0; i < scan_source->size(); ++i) {
1377  auto synthesized_used_input = new RexInput(scan_source, i);
1378  used_inputs_owned.emplace_back(synthesized_used_input);
1379  used_inputs.insert(synthesized_used_input);
1380  }
1381  } else {
1382  const auto& partial_in_metadata = source->getOutputMetainfo();
1383  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1384  auto synthesized_used_input = new RexInput(source, i);
1385  used_inputs_owned.emplace_back(synthesized_used_input);
1386  used_inputs.insert(synthesized_used_input);
1387  }
1388  }
1389  }
1390  return std::make_pair(used_inputs, used_inputs_owned);
1391 }
1392 
1393 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1395  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1396  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1397  used_inputs_owned.reserve(logical_union->inputCount());
1398  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1399  auto const n_inputs = logical_union->inputCount();
1400  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1401  auto input = logical_union->getInput(nest_level);
1402  for (size_t i = 0; i < input->size(); ++i) {
1403  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1404  used_inputs.insert(used_inputs_owned.back().get());
1405  }
1406  }
1407  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1408 }
1409 
1410 int table_id_from_ra(const RelAlgNode* ra_node) {
1411  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1412  if (scan_ra) {
1413  const auto td = scan_ra->getTableDescriptor();
1414  CHECK(td);
1415  return td->tableId;
1416  }
1417  return -ra_node->getId();
1418 }
1419 
1420 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1421  const RelAlgNode* ra_node,
1422  const std::vector<size_t>& input_permutation) {
1423  const auto data_sink_node = get_data_sink(ra_node);
1424  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1425  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1426  const auto input_node_idx =
1427  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1428  const auto input_ra = data_sink_node->getInput(input_node_idx);
1429  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1430  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1431  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1432  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1433  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1434  CHECK(it_ok.second);
1435  LOG_IF(INFO, !input_permutation.empty())
1436  << "Assigned input " << input_ra->toString(RelRexToStringConfig::defaults())
1437  << " to nest level " << input_idx;
1438  }
1439  return input_to_nest_level;
1440 }
1441 
1442 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1445  const auto data_sink_node = get_data_sink(ra_node);
1446  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1447  CHECK_EQ(join->inputCount(), 2u);
1448  const auto condition = join->getCondition();
1449  RexUsedInputsVisitor visitor(cat);
1450  auto condition_inputs = visitor.visit(condition);
1451  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1452  visitor.get_inputs_owned());
1453  return std::make_pair(condition_inputs, condition_inputs_owned);
1454  }
1455 
1456  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1457  CHECK_GE(left_deep_join->inputCount(), 2u);
1458  const auto condition = left_deep_join->getInnerCondition();
1459  RexUsedInputsVisitor visitor(cat);
1460  auto result = visitor.visit(condition);
1461  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1462  ++nesting_level) {
1463  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1464  if (outer_condition) {
1465  const auto outer_result = visitor.visit(outer_condition);
1466  result.insert(outer_result.begin(), outer_result.end());
1467  }
1468  }
1469  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1470  return std::make_pair(result, used_inputs_owned);
1471  }
1472 
1473  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1474  CHECK_GT(ra_node->inputCount(), 1u)
1476  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1477  // no-op
1478  CHECK_GE(ra_node->inputCount(), 0u)
1480  } else {
1481  CHECK_EQ(ra_node->inputCount(), 1u)
1483  }
1484  return std::make_pair(std::unordered_set<const RexInput*>{},
1485  std::vector<std::shared_ptr<RexInput>>{});
1486 }
1487 
1489  std::vector<InputDescriptor>& input_descs,
1491  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1492  const RelAlgNode* ra_node,
1493  const std::unordered_set<const RexInput*>& source_used_inputs,
1494  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1495  VLOG(3) << "ra_node=" << ra_node->toString(RelRexToStringConfig::defaults())
1496  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1497  << " source_used_inputs.size()=" << source_used_inputs.size();
1498  for (const auto used_input : source_used_inputs) {
1499  const auto input_ra = used_input->getSourceNode();
1500  const int table_id = table_id_from_ra(input_ra);
1501  const auto col_id = used_input->getIndex();
1502  auto it = input_to_nest_level.find(input_ra);
1503  if (it != input_to_nest_level.end()) {
1504  const int input_desc = it->second;
1505  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1506  dynamic_cast<const RelScan*>(input_ra)
1507  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1508  : col_id,
1509  table_id,
1510  input_desc));
1511  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1512  throw std::runtime_error("Bushy joins not supported");
1513  }
1514  }
1515 }
1516 
1517 template <class RA>
1518 std::pair<std::vector<InputDescriptor>,
1519  std::list<std::shared_ptr<const InputColDescriptor>>>
1520 get_input_desc_impl(const RA* ra_node,
1521  const std::unordered_set<const RexInput*>& used_inputs,
1522  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1523  const std::vector<size_t>& input_permutation,
1525  std::vector<InputDescriptor> input_descs;
1526  const auto data_sink_node = get_data_sink(ra_node);
1527  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1528  const auto input_node_idx =
1529  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1530  auto input_ra = data_sink_node->getInput(input_node_idx);
1531  const int table_id = table_id_from_ra(input_ra);
1532  input_descs.emplace_back(table_id, input_idx);
1533  }
1534  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1535  collect_used_input_desc(input_descs,
1536  cat,
1537  input_col_descs_unique, // modified
1538  ra_node,
1539  used_inputs,
1540  input_to_nest_level);
1541  std::unordered_set<const RexInput*> join_source_used_inputs;
1542  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1543  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1544  get_join_source_used_inputs(ra_node, cat);
1545  collect_used_input_desc(input_descs,
1546  cat,
1547  input_col_descs_unique, // modified
1548  ra_node,
1549  join_source_used_inputs,
1550  input_to_nest_level);
1551  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1552  input_col_descs_unique.begin(), input_col_descs_unique.end());
1553 
1554  std::sort(input_col_descs.begin(),
1555  input_col_descs.end(),
1556  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1557  std::shared_ptr<const InputColDescriptor> const& rhs) {
1558  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1559  lhs->getColId(),
1560  lhs->getScanDesc().getTableId()) <
1561  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1562  rhs->getColId(),
1563  rhs->getScanDesc().getTableId());
1564  });
1565  return {input_descs,
1566  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1567  input_col_descs.end())};
1568 }
1569 
1570 template <class RA>
1571 std::tuple<std::vector<InputDescriptor>,
1572  std::list<std::shared_ptr<const InputColDescriptor>>,
1573  std::vector<std::shared_ptr<RexInput>>>
1574 get_input_desc(const RA* ra_node,
1575  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1576  const std::vector<size_t>& input_permutation,
1577  const Catalog_Namespace::Catalog& cat) {
1578  std::unordered_set<const RexInput*> used_inputs;
1579  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1580  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1581  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1582  auto input_desc_pair = get_input_desc_impl(
1583  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1584  return std::make_tuple(
1585  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1586 }
1587 
1588 size_t get_scalar_sources_size(const RelCompound* compound) {
1589  return compound->getScalarSourcesSize();
1590 }
1591 
1592 size_t get_scalar_sources_size(const RelProject* project) {
1593  return project->size();
1594 }
1595 
1596 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1597  return table_func->getTableFuncInputsSize();
1598 }
1599 
1600 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1601  return compound->getScalarSource(i);
1602 }
1603 
1604 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1605  return project->getProjectAt(i);
1606 }
1607 
1608 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1609  return table_func->getTableFuncInputAt(i);
1610 }
1611 
1612 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1613  const std::shared_ptr<Analyzer::Expr> expr) {
1614  const auto& ti = expr->get_type_info();
1615  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1616  return expr;
1617  }
1618  auto transient_dict_ti = ti;
1619  transient_dict_ti.set_compression(kENCODING_DICT);
1620  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1621  transient_dict_ti.set_fixed_size();
1622  return expr->add_cast(transient_dict_ti);
1623 }
1624 
1626  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1627  const std::shared_ptr<Analyzer::Expr>& expr) {
1628  try {
1629  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1630  } catch (...) {
1631  scalar_sources.push_back(fold_expr(expr.get()));
1632  }
1633 }
1634 
1635 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1636  const std::shared_ptr<Analyzer::Expr>& input) {
1637  const auto& input_ti = input->get_type_info();
1638  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1639  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1640  }
1641  return input;
1642 }
1643 
1644 template <class RA>
1645 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1646  const RA* ra_node,
1647  const RelAlgTranslator& translator,
1648  const ::ExecutorType executor_type) {
1649  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1650  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1651  VLOG(3) << "get_scalar_sources_size("
1652  << ra_node->toString(RelRexToStringConfig::defaults())
1653  << ") = " << scalar_sources_size;
1654  for (size_t i = 0; i < scalar_sources_size; ++i) {
1655  const auto scalar_rex = scalar_at(i, ra_node);
1656  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1657  // RexRef are synthetic scalars we append at the end of the real ones
1658  // for the sake of taking memory ownership, no real work needed here.
1659  continue;
1660  }
1661 
1662  const auto scalar_expr =
1663  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1664  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1665  if (executor_type == ExecutorType::Native) {
1666  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1667  } else if (executor_type == ExecutorType::TableFunctions) {
1668  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1669  } else {
1670  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1671  }
1672  }
1673 
1674  return scalar_sources;
1675 }
1676 
1677 template <class RA>
1678 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1679  const RA* ra_node,
1680  const RelAlgTranslator& translator,
1681  int32_t tableId,
1682  const Catalog_Namespace::Catalog& cat,
1683  const ColumnNameList& colNames,
1684  size_t starting_projection_column_idx) {
1685  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1686  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1687  const auto scalar_rex = scalar_at(i, ra_node);
1688  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1689  // RexRef are synthetic scalars we append at the end of the real ones
1690  // for the sake of taking memory ownership, no real work needed here.
1691  continue;
1692  }
1693 
1694  std::shared_ptr<Analyzer::Expr> translated_expr;
1695  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1696  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1697  tableId,
1698  cat,
1699  colNames[i - starting_projection_column_idx]);
1700  } else {
1701  translated_expr = translator.translateScalarRex(scalar_rex);
1702  }
1703  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1704  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1705  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1706  }
1707 
1708  return scalar_sources;
1709 }
1710 
1711 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1712  const RelCompound* compound,
1713  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1714  if (!compound->isAggregate()) {
1715  return {nullptr};
1716  }
1717  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1718  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1719  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1720  }
1721  return groupby_exprs;
1722 }
1723 
1724 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1725  const RelAggregate* aggregate,
1726  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1727  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1728  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1729  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1730  }
1731  return groupby_exprs;
1732 }
1733 
1735  const RelAlgTranslator& translator) {
1736  const auto filter_rex = compound->getFilterExpr();
1737  const auto filter_expr =
1738  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1739  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1741 }
1742 
1743 namespace {
1744 // If an encoded type is used in the context of COUNT(DISTINCT ...) then don't
1745 // bother decoding it. This is done by changing the sql type to an integer.
1746 void conditionally_change_arg_to_int_type(std::shared_ptr<Analyzer::Expr>& target_expr) {
1747  auto* agg_expr = dynamic_cast<Analyzer::AggExpr*>(target_expr.get());
1748  CHECK(agg_expr);
1749  if (agg_expr->get_is_distinct()) {
1750  SQLTypeInfo const& ti = agg_expr->get_arg()->get_type_info();
1751  if (ti.get_type() != kARRAY && ti.get_compression() == kENCODING_DATE_IN_DAYS) {
1752  target_expr = target_expr->deep_copy();
1753  auto* arg = dynamic_cast<Analyzer::AggExpr*>(target_expr.get())->get_arg();
1755  }
1756  }
1757 }
1758 } // namespace
1759 
1760 std::vector<Analyzer::Expr*> translate_targets(
1761  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1762  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1763  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1764  const RelCompound* compound,
1765  const RelAlgTranslator& translator,
1766  const ExecutorType executor_type) {
1767  std::vector<Analyzer::Expr*> target_exprs;
1768  for (size_t i = 0; i < compound->size(); ++i) {
1769  const auto target_rex = compound->getTargetExpr(i);
1770  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1771  std::shared_ptr<Analyzer::Expr> target_expr;
1772  if (target_rex_agg) {
1773  target_expr =
1774  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1776  } else {
1777  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1778  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1779  if (target_rex_ref) {
1780  const auto ref_idx = target_rex_ref->getIndex();
1781  CHECK_GE(ref_idx, size_t(1));
1782  CHECK_LE(ref_idx, groupby_exprs.size());
1783  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1784  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1785  } else {
1786  target_expr = translator.translateScalarRex(target_rex_scalar);
1787  auto rewritten_expr = rewrite_expr(target_expr.get());
1788  target_expr = fold_expr(rewritten_expr.get());
1789  if (executor_type == ExecutorType::Native) {
1790  try {
1791  target_expr = set_transient_dict(target_expr);
1792  } catch (...) {
1793  // noop
1794  }
1795  } else {
1796  target_expr = cast_dict_to_none(target_expr);
1797  }
1798  }
1799  }
1800  CHECK(target_expr);
1801  target_exprs_owned.push_back(target_expr);
1802  target_exprs.push_back(target_expr.get());
1803  }
1804  return target_exprs;
1805 }
1806 
1807 std::vector<Analyzer::Expr*> translate_targets(
1808  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1809  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1810  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1811  const RelAggregate* aggregate,
1812  const RelAlgTranslator& translator) {
1813  std::vector<Analyzer::Expr*> target_exprs;
1814  size_t group_key_idx = 1;
1815  for (const auto& groupby_expr : groupby_exprs) {
1816  auto target_expr =
1817  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1818  target_exprs_owned.push_back(target_expr);
1819  target_exprs.push_back(target_expr.get());
1820  }
1821 
1822  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1823  auto target_expr =
1824  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1825  CHECK(target_expr);
1826  target_expr = fold_expr(target_expr.get());
1827  target_exprs_owned.push_back(target_expr);
1828  target_exprs.push_back(target_expr.get());
1829  }
1830  return target_exprs;
1831 }
1832 
1834  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1835  return agg_expr && agg_expr->get_is_distinct();
1836 }
1837 
1838 bool is_agg(const Analyzer::Expr* expr) {
1839  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1840  if (agg_expr && agg_expr->get_contains_agg()) {
1841  auto agg_type = agg_expr->get_aggtype();
1842  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1843  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1844  return true;
1845  }
1846  }
1847  return false;
1848 }
1849 
1851  if (is_count_distinct(&expr)) {
1852  return SQLTypeInfo(kBIGINT, false);
1853  } else if (is_agg(&expr)) {
1855  }
1856  return get_logical_type_info(expr.get_type_info());
1857 }
1858 
1859 template <class RA>
1860 std::vector<TargetMetaInfo> get_targets_meta(
1861  const RA* ra_node,
1862  const std::vector<Analyzer::Expr*>& target_exprs) {
1863  std::vector<TargetMetaInfo> targets_meta;
1864  CHECK_EQ(ra_node->size(), target_exprs.size());
1865  for (size_t i = 0; i < ra_node->size(); ++i) {
1866  CHECK(target_exprs[i]);
1867  // TODO(alex): remove the count distinct type fixup.
1868  targets_meta.emplace_back(ra_node->getFieldName(i),
1869  get_logical_type_for_expr(*target_exprs[i]),
1870  target_exprs[i]->get_type_info());
1871  }
1872  return targets_meta;
1873 }
1874 
1875 template <>
1876 std::vector<TargetMetaInfo> get_targets_meta(
1877  const RelFilter* filter,
1878  const std::vector<Analyzer::Expr*>& target_exprs) {
1879  RelAlgNode const* input0 = filter->getInput(0);
1880  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1881  return get_targets_meta(input, target_exprs);
1882  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1883  return get_targets_meta(input, target_exprs);
1884  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1885  return get_targets_meta(input, target_exprs);
1886  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1887  return get_targets_meta(input, target_exprs);
1888  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1889  return get_targets_meta(input, target_exprs);
1890  }
1891  UNREACHABLE() << "Unhandled node type: "
1893  return {};
1894 }
1895 
1896 } // namespace
1897 
1899  const CompilationOptions& co_in,
1900  const ExecutionOptions& eo_in,
1901  const int64_t queue_time_ms) {
1902  CHECK(node);
1903  auto timer = DEBUG_TIMER(__func__);
1904 
1905  auto co = co_in;
1906  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1907  // encoded string updates
1908 
1909  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1910  auto& work_unit,
1911  const bool is_aggregate) {
1912  auto table_descriptor = node->getModifiedTableDescriptor();
1913  CHECK(table_descriptor);
1914  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1915  throw std::runtime_error(
1916  "UPDATE queries involving variable length columns are only supported on tables "
1917  "with the vacuum attribute set to 'delayed'");
1918  }
1919 
1920  Executor::clearExternalCaches(true, table_descriptor, cat_.getDatabaseId());
1921 
1922  auto updated_table_desc = node->getModifiedTableDescriptor();
1924  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1925  node->getTargetColumns(),
1926  node->getOutputMetainfo(),
1927  node->isVarlenUpdateRequired());
1928 
1929  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1930 
1931  auto execute_update_ra_exe_unit =
1932  [this, &co, &eo_in, &table_infos, &updated_table_desc](
1933  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1935 
1936  auto eo = eo_in;
1937  if (dml_transaction_parameters_->tableIsTemporary()) {
1938  eo.output_columnar_hint = true;
1939  co_project.allow_lazy_fetch = false;
1940  co_project.filter_on_deleted_column =
1941  false; // project the entire delete column for columnar update
1942  }
1943 
1944  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
1946  CHECK(update_transaction_parameters);
1947  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1948  try {
1949  auto table_update_metadata =
1950  executor_->executeUpdate(ra_exe_unit,
1951  table_infos,
1952  updated_table_desc,
1953  co_project,
1954  eo,
1955  cat_,
1956  executor_->row_set_mem_owner_,
1957  update_callback,
1958  is_aggregate);
1959  post_execution_callback_ = [table_update_metadata, this]() {
1960  dml_transaction_parameters_->finalizeTransaction(cat_);
1961  TableOptimizer table_optimizer{
1962  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1963  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1964  };
1965  } catch (const QueryExecutionError& e) {
1966  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1967  }
1968  };
1969 
1970  if (dml_transaction_parameters_->tableIsTemporary()) {
1971  // hold owned target exprs during execution if rewriting
1972  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1973  // rewrite temp table updates to generate the full column by moving the where
1974  // clause into a case if such a rewrite is not possible, bail on the update
1975  // operation build an expr for the update target
1976  auto update_transaction_params =
1978  CHECK(update_transaction_params);
1979  const auto td = update_transaction_params->getTableDescriptor();
1980  CHECK(td);
1981  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1982  if (update_column_names.size() > 1) {
1983  throw std::runtime_error(
1984  "Multi-column update is not yet supported for temporary tables.");
1985  }
1986 
1987  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1988  CHECK(cd);
1989  auto projected_column_to_update =
1990  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1991  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1992  work_unit.exe_unit, projected_column_to_update);
1993  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1994  throw std::runtime_error(
1995  "Variable length updates not yet supported on temporary tables.");
1996  }
1997  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1998  } else {
1999  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2000  }
2001  };
2002 
2003  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2004  auto work_unit =
2005  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2006 
2007  execute_update_for_node(compound, work_unit, compound->isAggregate());
2008  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2009  auto work_unit =
2010  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2011 
2012  if (project->isSimple()) {
2013  CHECK_EQ(size_t(1), project->inputCount());
2014  const auto input_ra = project->getInput(0);
2015  if (dynamic_cast<const RelSort*>(input_ra)) {
2016  const auto& input_table =
2017  get_temporary_table(&temporary_tables_, -input_ra->getId());
2018  CHECK(input_table);
2019  work_unit.exe_unit.scan_limit = input_table->rowCount();
2020  }
2021  }
2022 
2023  execute_update_for_node(project, work_unit, false);
2024  } else {
2025  throw std::runtime_error("Unsupported parent node for update: " +
2026  node->toString(RelRexToStringConfig::defaults()));
2027  }
2028 }
2029 
2031  const CompilationOptions& co,
2032  const ExecutionOptions& eo_in,
2033  const int64_t queue_time_ms) {
2034  CHECK(node);
2035  auto timer = DEBUG_TIMER(__func__);
2036 
2037  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2038  auto& work_unit,
2039  const bool is_aggregate) {
2040  auto* table_descriptor = node->getModifiedTableDescriptor();
2041  CHECK(table_descriptor);
2042  if (!table_descriptor->hasDeletedCol) {
2043  throw std::runtime_error(
2044  "DELETE queries are only supported on tables with the vacuum attribute set to "
2045  "'delayed'");
2046  }
2047 
2048  Executor::clearExternalCaches(false, table_descriptor, cat_.getDatabaseId());
2049 
2050  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2051 
2052  auto execute_delete_ra_exe_unit =
2053  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
2054  const bool is_aggregate) {
2056  std::make_unique<DeleteTransactionParameters>(table_descriptor);
2057  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2059  CHECK(delete_params);
2060  auto delete_callback = yieldDeleteCallback(*delete_params);
2062 
2063  auto eo = eo_in;
2064  if (dml_transaction_parameters_->tableIsTemporary()) {
2065  eo.output_columnar_hint = true;
2066  co_delete.filter_on_deleted_column =
2067  false; // project the entire delete column for columnar update
2068  } else {
2069  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2070  }
2071 
2072  try {
2073  auto table_update_metadata =
2074  executor_->executeUpdate(exe_unit,
2075  table_infos,
2076  table_descriptor,
2077  co_delete,
2078  eo,
2079  cat_,
2080  executor_->row_set_mem_owner_,
2081  delete_callback,
2082  is_aggregate);
2083  post_execution_callback_ = [table_update_metadata, this]() {
2084  dml_transaction_parameters_->finalizeTransaction(cat_);
2085  TableOptimizer table_optimizer{
2086  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2087  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2088  };
2089  } catch (const QueryExecutionError& e) {
2090  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2091  }
2092  };
2093 
2094  if (table_is_temporary(table_descriptor)) {
2095  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2096  auto cd = cat_.getDeletedColumn(table_descriptor);
2097  CHECK(cd);
2098  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2099  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2100  const auto rewritten_exe_unit =
2101  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2102  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2103  } else {
2104  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2105  }
2106  };
2107 
2108  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2109  const auto work_unit =
2110  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2111  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2112  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2113  auto work_unit =
2114  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2115  if (project->isSimple()) {
2116  CHECK_EQ(size_t(1), project->inputCount());
2117  const auto input_ra = project->getInput(0);
2118  if (dynamic_cast<const RelSort*>(input_ra)) {
2119  const auto& input_table =
2120  get_temporary_table(&temporary_tables_, -input_ra->getId());
2121  CHECK(input_table);
2122  work_unit.exe_unit.scan_limit = input_table->rowCount();
2123  }
2124  }
2125  execute_delete_for_node(project, work_unit, false);
2126  } else {
2127  throw std::runtime_error("Unsupported parent node for delete: " +
2128  node->toString(RelRexToStringConfig::defaults()));
2129  }
2130 }
2131 
2133  const CompilationOptions& co,
2134  const ExecutionOptions& eo,
2135  RenderInfo* render_info,
2136  const int64_t queue_time_ms) {
2137  auto timer = DEBUG_TIMER(__func__);
2138  const auto work_unit =
2139  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2140  CompilationOptions co_compound = co;
2141  return executeWorkUnit(work_unit,
2142  compound->getOutputMetainfo(),
2143  compound->isAggregate(),
2144  co_compound,
2145  eo,
2146  render_info,
2147  queue_time_ms);
2148 }
2149 
2151  const CompilationOptions& co,
2152  const ExecutionOptions& eo,
2153  RenderInfo* render_info,
2154  const int64_t queue_time_ms) {
2155  auto timer = DEBUG_TIMER(__func__);
2156  const auto work_unit = createAggregateWorkUnit(
2157  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2158  return executeWorkUnit(work_unit,
2159  aggregate->getOutputMetainfo(),
2160  true,
2161  co,
2162  eo,
2163  render_info,
2164  queue_time_ms);
2165 }
2166 
2167 namespace {
2168 
2169 // Returns true iff the execution unit contains window functions.
2171  return std::any_of(ra_exe_unit.target_exprs.begin(),
2172  ra_exe_unit.target_exprs.end(),
2173  [](const Analyzer::Expr* expr) {
2174  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
2175  });
2176 }
2177 
2178 } // namespace
2179 
2181  const RelProject* project,
2182  const CompilationOptions& co,
2183  const ExecutionOptions& eo,
2184  RenderInfo* render_info,
2185  const int64_t queue_time_ms,
2186  const std::optional<size_t> previous_count) {
2187  auto timer = DEBUG_TIMER(__func__);
2188  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2189  CompilationOptions co_project = co;
2190  if (project->isSimple()) {
2191  CHECK_EQ(size_t(1), project->inputCount());
2192  const auto input_ra = project->getInput(0);
2193  if (dynamic_cast<const RelSort*>(input_ra)) {
2194  co_project.device_type = ExecutorDeviceType::CPU;
2195  const auto& input_table =
2196  get_temporary_table(&temporary_tables_, -input_ra->getId());
2197  CHECK(input_table);
2198  work_unit.exe_unit.scan_limit =
2199  std::min(input_table->getLimit(), input_table->rowCount());
2200  }
2201  }
2202  return executeWorkUnit(work_unit,
2203  project->getOutputMetainfo(),
2204  false,
2205  co_project,
2206  eo,
2207  render_info,
2208  queue_time_ms,
2209  previous_count);
2210 }
2211 
2213  const CompilationOptions& co_in,
2214  const ExecutionOptions& eo,
2215  const int64_t queue_time_ms) {
2217  auto timer = DEBUG_TIMER(__func__);
2218 
2219  auto co = co_in;
2220 
2221  if (g_cluster) {
2222  throw std::runtime_error("Table functions not supported in distributed mode yet");
2223  }
2224  if (!g_enable_table_functions) {
2225  throw std::runtime_error("Table function support is disabled");
2226  }
2227  auto table_func_work_unit = createTableFunctionWorkUnit(
2228  table_func,
2229  eo.just_explain,
2230  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2231  const auto body = table_func_work_unit.body;
2232  CHECK(body);
2233 
2234  const auto table_infos =
2235  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2236 
2237  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2238  co.device_type,
2240  nullptr,
2241  executor_->getCatalog(),
2242  executor_->blockSize(),
2243  executor_->gridSize()),
2244  {}};
2245 
2246  auto global_hint = getGlobalQueryHint();
2247  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2248  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2249  auto cached_resultset =
2250  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2251  table_func->getQueryPlanDagHash());
2252  if (cached_resultset) {
2253  VLOG(1) << "recycle table function's resultset of the root node "
2254  << table_func->getRelNodeDagId() << " from resultset cache";
2255  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2256  addTemporaryTable(-body->getId(), result.getDataPtr());
2257  return result;
2258  }
2259  }
2260 
2261  auto query_exec_time_begin = timer_start();
2262  try {
2263  result = {executor_->executeTableFunction(
2264  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
2265  body->getOutputMetainfo()};
2266  } catch (const QueryExecutionError& e) {
2269  throw std::runtime_error("Table function ran out of memory during execution");
2270  }
2271  auto query_exec_time = timer_stop(query_exec_time_begin);
2272  result.setQueueTime(queue_time_ms);
2273  auto resultset_ptr = result.getDataPtr();
2274  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2276  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2278  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2279  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2280  !hasStepForUnion()) {
2281  resultset_ptr->setExecTime(query_exec_time);
2282  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2283  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2284  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2285  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2286  if (allow_auto_caching_resultset) {
2287  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2288  }
2289  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2290  table_func_work_unit.exe_unit.query_plan_dag_hash,
2291  resultset_ptr->getInputTableKeys(),
2292  resultset_ptr,
2293  resultset_ptr->getBufferSizeBytes(co.device_type),
2295  } else {
2296  if (eo.keep_result) {
2297  if (g_cluster) {
2298  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2299  "support resultset recycling on distributed mode";
2300  } else if (hasStepForUnion()) {
2301  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2302  "has union-(all) operator";
2303  } else if (is_validate_or_explain_query(eo)) {
2304  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2305  "is either validate or explain query";
2306  } else {
2307  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2308  }
2309  }
2310  }
2311 
2312  return result;
2313 }
2314 
2315 namespace {
2316 
2317 // Creates a new expression which has the range table index set to 1. This is needed to
2318 // reuse the hash join construction helpers to generate a hash table for the window
2319 // function partition: create an equals expression with left and right sides identical
2320 // except for the range table index.
2321 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2322  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2323  if (tuple) {
2324  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2325  for (const auto& element : tuple->getTuple()) {
2326  transformed_tuple.push_back(transform_to_inner(element.get()));
2327  }
2328  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2329  }
2330  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2331  if (!col) {
2332  throw std::runtime_error("Only columns supported in the window partition for now");
2333  }
2334  return makeExpr<Analyzer::ColumnVar>(
2335  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2336 }
2337 
2338 } // namespace
2339 
2341  const CompilationOptions& co,
2342  const ExecutionOptions& eo,
2343  ColumnCacheMap& column_cache_map,
2344  const int64_t queue_time_ms) {
2345  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2346  CHECK_EQ(query_infos.size(), size_t(1));
2347  if (query_infos.front().info.fragments.size() != 1) {
2348  throw std::runtime_error(
2349  "Only single fragment tables supported for window functions for now");
2350  }
2351  if (eo.executor_type == ::ExecutorType::Extern) {
2352  return;
2353  }
2354  query_infos.push_back(query_infos.front());
2355  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2356  // a query may hold multiple window functions having the same partition by condition
2357  // then after building the first hash partition we can reuse it for the rest of
2358  // the window functions
2359  // here, a cached partition can be shared via multiple window function contexts as is
2360  // but sorted partition should be copied to reuse since we use it for (intermediate)
2361  // output buffer
2362  // todo (yoonmin) : support recycler for window function computation?
2363  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2364  std::unordered_map<QueryPlanHash, std::unique_ptr<int64_t[]>> sorted_partition_cache;
2365  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2366  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2367  window_function_context_map;
2368  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2369  ++target_index) {
2370  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2371  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2372  if (!window_func) {
2373  continue;
2374  }
2375  // Always use baseline layout hash tables for now, make the expression a tuple.
2376  const auto& partition_keys = window_func->getPartitionKeys();
2377  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2378  if (partition_keys.size() >= 1) {
2379  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2380  if (partition_keys.size() > 1) {
2381  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2382  } else {
2383  CHECK_EQ(partition_keys.size(), size_t(1));
2384  partition_key_tuple = partition_keys.front();
2385  }
2386  // Creates a tautology equality with the partition expression on both sides.
2387  partition_key_cond =
2388  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2389  kBW_EQ,
2390  kONE,
2391  partition_key_tuple,
2392  transform_to_inner(partition_key_tuple.get()));
2393  }
2394  auto context =
2395  createWindowFunctionContext(window_func,
2396  partition_key_cond /*nullptr if no partition key*/,
2397  partition_cache,
2398  sorted_partition_key_ref_count_map,
2399  work_unit,
2400  query_infos,
2401  co,
2402  column_cache_map,
2403  executor_->getRowSetMemoryOwner());
2404  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2405  }
2406 
2407  for (auto& kv : window_function_context_map) {
2408  kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2409  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2410  }
2411 }
2412 
2413 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2414  const Analyzer::WindowFunction* window_func,
2415  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2416  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>>& partition_cache,
2417  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
2418  const WorkUnit& work_unit,
2419  const std::vector<InputTableInfo>& query_infos,
2420  const CompilationOptions& co,
2421  ColumnCacheMap& column_cache_map,
2422  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2423  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2424  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2427  std::unique_ptr<WindowFunctionContext> context;
2428  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2429  if (partition_key_cond) {
2430  auto partition_cond_str = partition_key_cond->toString();
2431  auto partition_key_hash = boost::hash_value(partition_cond_str);
2432  boost::hash_combine(partition_cache_key, partition_key_hash);
2433  std::shared_ptr<HashJoin> partition_ptr;
2434  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2435  if (cached_hash_table_it != partition_cache.end()) {
2436  partition_ptr = cached_hash_table_it->second;
2437  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2438  << partition_cache_key << ", partition condition: " << partition_cond_str
2439  << ")";
2440  } else {
2441  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2442  partition_key_cond,
2443  query_infos,
2444  memory_level,
2445  JoinType::INVALID, // for window function
2447  column_cache_map,
2449  work_unit.exe_unit.query_hint,
2450  work_unit.exe_unit.table_id_to_node_map);
2451  if (!hash_table_or_err.fail_reason.empty()) {
2452  throw std::runtime_error(hash_table_or_err.fail_reason);
2453  }
2454  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2455  partition_ptr = hash_table_or_err.hash_table;
2456  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2457  .second);
2458  VLOG(1) << "Put a generated hash table for computing window function context to "
2459  "cache (key: "
2460  << partition_cache_key << ", partition condition: " << partition_cond_str
2461  << ")";
2462  }
2463  CHECK(partition_ptr);
2464  context = std::make_unique<WindowFunctionContext>(window_func,
2465  partition_cache_key,
2466  partition_ptr,
2467  elem_count,
2468  co.device_type,
2469  row_set_mem_owner);
2470  } else {
2471  context = std::make_unique<WindowFunctionContext>(
2472  window_func, elem_count, co.device_type, row_set_mem_owner);
2473  }
2474  const auto& order_keys = window_func->getOrderKeys();
2475  if (!order_keys.empty()) {
2476  auto sorted_partition_cache_key = partition_cache_key;
2477  for (auto& order_key : order_keys) {
2478  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2479  }
2480  for (auto& collation : window_func->getCollation()) {
2481  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2482  }
2483  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2484  auto cache_key_cnt_it =
2485  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2486  if (!cache_key_cnt_it.second) {
2487  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2488  cache_key_cnt_it.first->second + 1;
2489  }
2490  }
2491  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2492  for (const auto& order_key : order_keys) {
2493  const auto order_col =
2494  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2495  if (!order_col) {
2496  throw std::runtime_error("Only order by columns supported for now");
2497  }
2498  const int8_t* column;
2499  size_t join_col_elem_count;
2500  std::tie(column, join_col_elem_count) =
2502  *order_col,
2503  query_infos.front().info.fragments.front(),
2504  memory_level,
2505  0,
2506  nullptr,
2507  /*thread_idx=*/0,
2508  chunks_owner,
2509  column_cache_map);
2510 
2511  CHECK_EQ(join_col_elem_count, elem_count);
2512  context->addOrderColumn(column, order_col.get(), chunks_owner);
2513  }
2514  return context;
2515 }
2516 
2518  const CompilationOptions& co,
2519  const ExecutionOptions& eo,
2520  RenderInfo* render_info,
2521  const int64_t queue_time_ms) {
2522  auto timer = DEBUG_TIMER(__func__);
2523  const auto work_unit =
2524  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2525  return executeWorkUnit(
2526  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2527 }
2528 
2529 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2530  std::vector<TargetMetaInfo> const& rhs) {
2531  if (lhs.size() == rhs.size()) {
2532  for (size_t i = 0; i < lhs.size(); ++i) {
2533  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2534  return false;
2535  }
2536  }
2537  return true;
2538  }
2539  return false;
2540 }
2541 
2542 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2543  return target_meta_info.get_type_info().is_geometry();
2544 }
2545 
2547  const RaExecutionSequence& seq,
2548  const CompilationOptions& co,
2549  const ExecutionOptions& eo,
2550  RenderInfo* render_info,
2551  const int64_t queue_time_ms) {
2552  auto timer = DEBUG_TIMER(__func__);
2553  if (!logical_union->isAll()) {
2554  throw std::runtime_error("UNION without ALL is not supported yet.");
2555  }
2556  // Will throw a std::runtime_error if types don't match.
2557  logical_union->checkForMatchingMetaInfoTypes();
2558  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2559  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2560  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2561  }
2562  auto work_unit =
2563  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2564  return executeWorkUnit(work_unit,
2565  logical_union->getOutputMetainfo(),
2566  false,
2568  eo,
2569  render_info,
2570  queue_time_ms);
2571 }
2572 
2574  const RelLogicalValues* logical_values,
2575  const ExecutionOptions& eo) {
2576  auto timer = DEBUG_TIMER(__func__);
2578  logical_values->getNumRows(),
2580  /*is_table_function=*/false);
2581 
2582  auto tuple_type = logical_values->getTupleType();
2583  for (size_t i = 0; i < tuple_type.size(); ++i) {
2584  auto& target_meta_info = tuple_type[i];
2585  if (target_meta_info.get_type_info().is_varlen()) {
2586  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2587  }
2588  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2589  // replace w/ bigint
2590  tuple_type[i] =
2591  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2592  }
2593  query_mem_desc.addColSlotInfo(
2594  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2595  }
2596  logical_values->setOutputMetainfo(tuple_type);
2597 
2598  std::vector<TargetInfo> target_infos;
2599  for (const auto& tuple_type_component : tuple_type) {
2600  target_infos.emplace_back(TargetInfo{false,
2601  kCOUNT,
2602  tuple_type_component.get_type_info(),
2603  SQLTypeInfo(kNULLT, false),
2604  false,
2605  false,
2606  /*is_varlen_projection=*/false});
2607  }
2608 
2609  std::shared_ptr<ResultSet> rs{
2610  ResultSetLogicalValuesBuilder{logical_values,
2611  target_infos,
2614  executor_->getRowSetMemoryOwner(),
2615  executor_}
2616  .build()};
2617 
2618  return {rs, tuple_type};
2619 }
2620 
2621 namespace {
2622 
2623 template <class T>
2624 int64_t insert_one_dict_str(T* col_data,
2625  const std::string& columnName,
2626  const SQLTypeInfo& columnType,
2627  const Analyzer::Constant* col_cv,
2628  const Catalog_Namespace::Catalog& catalog) {
2629  if (col_cv->get_is_null()) {
2630  *col_data = inline_fixed_encoding_null_val(columnType);
2631  } else {
2632  const int dict_id = columnType.get_comp_param();
2633  const auto col_datum = col_cv->get_constval();
2634  const auto& str = *col_datum.stringval;
2635  const auto dd = catalog.getMetadataForDict(dict_id);
2636  CHECK(dd && dd->stringDict);
2637  int32_t str_id = dd->stringDict->getOrAdd(str);
2638  if (!dd->dictIsTemp) {
2639  const auto checkpoint_ok = dd->stringDict->checkpoint();
2640  if (!checkpoint_ok) {
2641  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2642  columnName);
2643  }
2644  }
2645  const bool invalid = str_id > max_valid_int_value<T>();
2646  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2647  if (invalid) {
2648  LOG(ERROR) << "Could not encode string: " << str
2649  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2650  << " bits. Will store NULL instead.";
2651  }
2652  str_id = inline_fixed_encoding_null_val(columnType);
2653  }
2654  *col_data = str_id;
2655  }
2656  return *col_data;
2657 }
2658 
2659 template <class T>
2660 int64_t insert_one_dict_str(T* col_data,
2661  const ColumnDescriptor* cd,
2662  const Analyzer::Constant* col_cv,
2663  const Catalog_Namespace::Catalog& catalog) {
2664  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2665 }
2666 
2667 } // namespace
2668 
2670  const ExecutionOptions& eo) {
2671  auto timer = DEBUG_TIMER(__func__);
2672  if (eo.just_explain) {
2673  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2674  }
2675 
2676  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2679  executor_->getRowSetMemoryOwner(),
2680  executor_->getCatalog(),
2681  executor_->blockSize(),
2682  executor_->gridSize());
2683 
2684  std::vector<TargetMetaInfo> empty_targets;
2685  return {rs, empty_targets};
2686 }
2687 
2689  const Analyzer::Query& query,
2691  const Catalog_Namespace::SessionInfo& session) {
2692  // Note: We currently obtain an executor for this method, but we do not need it.
2693  // Therefore, we skip the executor state setup in the regular execution path. In the
2694  // future, we will likely want to use the executor to evaluate expressions in the insert
2695  // statement.
2696 
2697  const auto& values_lists = query.get_values_lists();
2698  const int table_id = query.get_result_table_id();
2699  const auto& col_id_list = query.get_result_col_list();
2700  size_t rows_number = values_lists.size();
2701  size_t leaf_count = inserter.getLeafCount();
2702  const auto td = cat_.getMetadataForTable(table_id);
2703  CHECK(td);
2704  size_t rows_per_leaf = rows_number;
2705  if (td->nShards == 0) {
2706  rows_per_leaf =
2707  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2708  }
2709  auto max_number_of_rows_per_package =
2710  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2711 
2712  std::vector<const ColumnDescriptor*> col_descriptors;
2713  std::vector<int> col_ids;
2714  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2715  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2716  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2717  std::unordered_map<int, int> sequential_ids;
2718 
2719  for (const int col_id : col_id_list) {
2720  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2721  const auto col_enc = cd->columnType.get_compression();
2722  if (cd->columnType.is_string()) {
2723  switch (col_enc) {
2724  case kENCODING_NONE: {
2725  auto it_ok =
2726  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2727  CHECK(it_ok.second);
2728  break;
2729  }
2730  case kENCODING_DICT: {
2731  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2732  CHECK(dd);
2733  const auto it_ok = col_buffers.emplace(
2734  col_id,
2735  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2736  max_number_of_rows_per_package));
2737  CHECK(it_ok.second);
2738  break;
2739  }
2740  default:
2741  CHECK(false);
2742  }
2743  } else if (cd->columnType.is_geometry()) {
2744  auto it_ok =
2745  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2746  CHECK(it_ok.second);
2747  } else if (cd->columnType.is_array()) {
2748  auto it_ok =
2749  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2750  CHECK(it_ok.second);
2751  } else {
2752  const auto it_ok = col_buffers.emplace(
2753  col_id,
2754  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2755  max_number_of_rows_per_package]()));
2756  CHECK(it_ok.second);
2757  }
2758  col_descriptors.push_back(cd);
2759  sequential_ids[col_id] = col_ids.size();
2760  col_ids.push_back(col_id);
2761  }
2762 
2763  // mark the target table's cached item as dirty
2764  std::vector<int> table_chunk_key_prefix{cat_.getCurrentDB().dbId, table_id};
2765  auto table_key = boost::hash_value(table_chunk_key_prefix);
2768 
2769  size_t start_row = 0;
2770  size_t rows_left = rows_number;
2771  while (rows_left != 0) {
2772  // clear the buffers
2773  for (const auto& kv : col_buffers) {
2774  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2775  }
2776  for (auto& kv : str_col_buffers) {
2777  kv.second.clear();
2778  }
2779  for (auto& kv : arr_col_buffers) {
2780  kv.second.clear();
2781  }
2782 
2783  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2784  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2785  // more efficient to do the loops below column by column instead of row by row.
2786  // But for now I consider such a refactoring not worth investigating, as we have more
2787  // efficient ways to insert many rows anyway.
2788  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2789  const auto& values_list = values_lists[row_idx + start_row];
2790  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2791  CHECK(values_list.size() == col_descriptors.size());
2792  auto col_cv =
2793  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2794  if (!col_cv) {
2795  auto col_cast =
2796  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2797  CHECK(col_cast);
2798  CHECK_EQ(kCAST, col_cast->get_optype());
2799  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2800  }
2801  CHECK(col_cv);
2802  const auto cd = col_descriptors[col_idx];
2803  auto col_datum = col_cv->get_constval();
2804  auto col_type = cd->columnType.get_type();
2805  uint8_t* col_data_bytes{nullptr};
2806  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2807  (!cd->columnType.is_string() ||
2808  cd->columnType.get_compression() == kENCODING_DICT)) {
2809  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2810  CHECK(col_data_bytes_it != col_buffers.end());
2811  col_data_bytes = col_data_bytes_it->second.get();
2812  }
2813  switch (col_type) {
2814  case kBOOLEAN: {
2815  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2816  auto null_bool_val =
2817  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2818  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2819  ? inline_fixed_encoding_null_val(cd->columnType)
2820  : (col_datum.boolval ? 1 : 0);
2821  break;
2822  }
2823  case kTINYINT: {
2824  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2825  col_data[row_idx] = col_cv->get_is_null()
2826  ? inline_fixed_encoding_null_val(cd->columnType)
2827  : col_datum.tinyintval;
2828  break;
2829  }
2830  case kSMALLINT: {
2831  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2832  col_data[row_idx] = col_cv->get_is_null()
2833  ? inline_fixed_encoding_null_val(cd->columnType)
2834  : col_datum.smallintval;
2835  break;
2836  }
2837  case kINT: {
2838  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2839  col_data[row_idx] = col_cv->get_is_null()
2840  ? inline_fixed_encoding_null_val(cd->columnType)
2841  : col_datum.intval;
2842  break;
2843  }
2844  case kBIGINT:
2845  case kDECIMAL:
2846  case kNUMERIC: {
2847  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2848  col_data[row_idx] = col_cv->get_is_null()
2849  ? inline_fixed_encoding_null_val(cd->columnType)
2850  : col_datum.bigintval;
2851  break;
2852  }
2853  case kFLOAT: {
2854  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2855  col_data[row_idx] = col_datum.floatval;
2856  break;
2857  }
2858  case kDOUBLE: {
2859  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2860  col_data[row_idx] = col_datum.doubleval;
2861  break;
2862  }
2863  case kTEXT:
2864  case kVARCHAR:
2865  case kCHAR: {
2866  switch (cd->columnType.get_compression()) {
2867  case kENCODING_NONE:
2868  str_col_buffers[col_ids[col_idx]].push_back(
2869  col_datum.stringval ? *col_datum.stringval : "");
2870  break;
2871  case kENCODING_DICT: {
2872  switch (cd->columnType.get_size()) {
2873  case 1:
2875  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
2876  cd,
2877  col_cv,
2878  cat_);
2879  break;
2880  case 2:
2882  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
2883  cd,
2884  col_cv,
2885  cat_);
2886  break;
2887  case 4:
2889  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
2890  cd,
2891  col_cv,
2892  cat_);
2893  break;
2894  default:
2895  CHECK(false);
2896  }
2897  break;
2898  }
2899  default:
2900  CHECK(false);
2901  }
2902  break;
2903  }
2904  case kTIME:
2905  case kTIMESTAMP:
2906  case kDATE: {
2907  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2908  col_data[row_idx] = col_cv->get_is_null()
2909  ? inline_fixed_encoding_null_val(cd->columnType)
2910  : col_datum.bigintval;
2911  break;
2912  }
2913  case kARRAY: {
2914  const auto is_null = col_cv->get_is_null();
2915  const auto size = cd->columnType.get_size();
2916  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2917  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2918  const auto is_point_coords =
2919  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2920  if (is_null && !is_point_coords) {
2921  if (size > 0) {
2922  int8_t* buf = (int8_t*)checked_malloc(size);
2923  put_null_array(static_cast<void*>(buf), elem_ti, "");
2924  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2925  p += elem_ti.get_size()) {
2926  put_null(static_cast<void*>(p), elem_ti, "");
2927  }
2928  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2929  } else {
2930  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2931  }
2932  break;
2933  }
2934  const auto l = col_cv->get_value_list();
2935  size_t len = l.size() * elem_ti.get_size();
2936  if (size > 0 && static_cast<size_t>(size) != len) {
2937  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2938  std::to_string(size / elem_ti.get_size()) +
2939  " values, " + "received " +
2940  std::to_string(l.size()));
2941  }
2942  if (elem_ti.is_string()) {
2943  CHECK(kENCODING_DICT == elem_ti.get_compression());
2944  CHECK(4 == elem_ti.get_size());
2945 
2946  int8_t* buf = (int8_t*)checked_malloc(len);
2947  int32_t* p = reinterpret_cast<int32_t*>(buf);
2948 
2949  int elemIndex = 0;
2950  for (auto& e : l) {
2951  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2952  CHECK(c);
2954  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2955  elemIndex++;
2956  }
2957  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2958 
2959  } else {
2960  int8_t* buf = (int8_t*)checked_malloc(len);
2961  int8_t* p = buf;
2962  for (auto& e : l) {
2963  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2964  CHECK(c);
2965  p = append_datum(p, c->get_constval(), elem_ti);
2966  CHECK(p);
2967  }
2968  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2969  }
2970  break;
2971  }
2972  case kPOINT:
2973  case kLINESTRING:
2974  case kPOLYGON:
2975  case kMULTIPOLYGON:
2976  str_col_buffers[col_ids[col_idx]].push_back(
2977  col_datum.stringval ? *col_datum.stringval : "");
2978  break;
2979  default:
2980  CHECK(false);
2981  }
2982  }
2983  }
2984  start_row += package_size;
2985  rows_left -= package_size;
2986 
2988  insert_data.databaseId = cat_.getCurrentDB().dbId;
2989  insert_data.tableId = table_id;
2990  insert_data.data.resize(col_ids.size());
2991  insert_data.columnIds = col_ids;
2992  for (const auto& kv : col_buffers) {
2993  DataBlockPtr p;
2994  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2995  insert_data.data[sequential_ids[kv.first]] = p;
2996  }
2997  for (auto& kv : str_col_buffers) {
2998  DataBlockPtr p;
2999  p.stringsPtr = &kv.second;
3000  insert_data.data[sequential_ids[kv.first]] = p;
3001  }
3002  for (auto& kv : arr_col_buffers) {
3003  DataBlockPtr p;
3004  p.arraysPtr = &kv.second;
3005  insert_data.data[sequential_ids[kv.first]] = p;
3006  }
3007  insert_data.numRows = package_size;
3008  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
3009  inserter.insertData(session, insert_data);
3010  }
3011 
3012  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3015  executor_->getRowSetMemoryOwner(),
3016  nullptr,
3017  0,
3018  0);
3019  std::vector<TargetMetaInfo> empty_targets;
3020  return {rs, empty_targets};
3021 }
3022 
3023 namespace {
3024 
3025 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
3026  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
3027  if (aggregate) {
3028  return 0;
3029  }
3030  const auto compound = dynamic_cast<const RelCompound*>(ra);
3031  return (compound && compound->isAggregate()) ? 0 : limit;
3032 }
3033 
3034 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
3035  return !order_entries.empty() && order_entries.front().is_desc;
3036 }
3037 
3038 } // namespace
3039 
3041  const CompilationOptions& co,
3042  const ExecutionOptions& eo,
3043  RenderInfo* render_info,
3044  const int64_t queue_time_ms) {
3045  auto timer = DEBUG_TIMER(__func__);
3047  const auto source = sort->getInput(0);
3048  const bool is_aggregate = node_is_aggregate(source);
3049  auto it = leaf_results_.find(sort->getId());
3050  auto order_entries = get_order_entries(sort);
3051  if (it != leaf_results_.end()) {
3052  // Add any transient string literals to the sdp on the agg
3053  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3054  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3055  executor_->row_set_mem_owner_);
3056  // Handle push-down for LIMIT for multi-node
3057  auto& aggregated_result = it->second;
3058  auto& result_rows = aggregated_result.rs;
3059  const size_t limit = sort->getLimit();
3060  const size_t offset = sort->getOffset();
3061  if (limit || offset) {
3062  if (!order_entries.empty()) {
3063  result_rows->sort(order_entries, limit + offset, executor_);
3064  }
3065  result_rows->dropFirstN(offset);
3066  if (limit) {
3067  result_rows->keepFirstN(limit);
3068  }
3069  }
3070 
3071  if (render_info) {
3072  // We've hit a sort step that is the very last step
3073  // in a distributed render query. We'll fill in the render targets
3074  // since we have all that data needed to do so. This is normally
3075  // done in executeWorkUnit, but that is bypassed in this case.
3076  build_render_targets(*render_info,
3077  source_work_unit.exe_unit.target_exprs,
3078  aggregated_result.targets_meta);
3079  }
3080 
3081  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3082  sort->setOutputMetainfo(aggregated_result.targets_meta);
3083 
3084  return result;
3085  }
3086 
3087  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3088  bool is_desc{false};
3089  bool use_speculative_top_n_sort{false};
3090 
3091  auto execute_sort_query = [this,
3092  sort,
3093  &source,
3094  &is_aggregate,
3095  &eo,
3096  &co,
3097  render_info,
3098  queue_time_ms,
3099  &groupby_exprs,
3100  &is_desc,
3101  &order_entries,
3102  &use_speculative_top_n_sort]() -> ExecutionResult {
3103  const size_t limit = sort->getLimit();
3104  const size_t offset = sort->getOffset();
3105  // check whether sort's input is cached
3106  auto source_node = sort->getInput(0);
3107  CHECK(source_node);
3108  ExecutionResult source_result{nullptr, {}};
3109  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3110  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3111  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3112  !sort->isEmptyResult()) {
3113  if (auto cached_resultset =
3114  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3115  source_query_plan_dag)) {
3116  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3117  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3118  << " from resultset cache";
3119  source_result =
3120  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3121  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3122  addTemporaryTable(-source_node->getId(), cached_resultset);
3123  }
3124  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3125  co.device_type == ExecutorDeviceType::GPU;
3126  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3127  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3128  }
3129  }
3130  if (!source_result.getDataPtr()) {
3131  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3132  is_desc = first_oe_is_desc(order_entries);
3133  ExecutionOptions eo_copy = {
3135  eo.keep_result,
3136  eo.allow_multifrag,
3137  eo.just_explain,
3138  eo.allow_loop_joins,
3139  eo.with_watchdog,
3140  eo.jit_debug,
3141  eo.just_validate || sort->isEmptyResult(),
3142  eo.with_dynamic_watchdog,
3143  eo.dynamic_watchdog_time_limit,
3144  eo.find_push_down_candidates,
3145  eo.just_calcite_explain,
3146  eo.gpu_input_mem_limit_percent,
3147  eo.allow_runtime_query_interrupt,
3148  eo.running_query_interrupt_freq,
3149  eo.pending_query_interrupt_freq,
3150  eo.executor_type,
3151  };
3152 
3153  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3154  source_result = executeWorkUnit(source_work_unit,
3155  source->getOutputMetainfo(),
3156  is_aggregate,
3157  co,
3158  eo_copy,
3159  render_info,
3160  queue_time_ms);
3161  use_speculative_top_n_sort =
3162  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3163  use_speculative_top_n(source_work_unit.exe_unit,
3164  source_result.getRows()->getQueryMemDesc());
3165  }
3166  if (render_info && render_info->isPotentialInSituRender()) {
3167  return source_result;
3168  }
3169  if (source_result.isFilterPushDownEnabled()) {
3170  return source_result;
3171  }
3172  auto rows_to_sort = source_result.getRows();
3173  if (eo.just_explain) {
3174  return {rows_to_sort, {}};
3175  }
3176  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3177  !use_speculative_top_n_sort) {
3178  const size_t top_n = limit == 0 ? 0 : limit + offset;
3179  rows_to_sort->sort(order_entries, top_n, executor_);
3180  }
3181  if (limit || offset) {
3182  if (g_cluster && sort->collationCount() == 0) {
3183  if (offset >= rows_to_sort->rowCount()) {
3184  rows_to_sort->dropFirstN(offset);
3185  } else {
3186  rows_to_sort->keepFirstN(limit + offset);
3187  }
3188  } else {
3189  rows_to_sort->dropFirstN(offset);
3190  if (limit) {
3191  rows_to_sort->keepFirstN(limit);
3192  }
3193  }
3194  }
3195  return {rows_to_sort, source_result.getTargetsMeta()};
3196  };
3197 
3198  try {
3199  return execute_sort_query();
3200  } catch (const SpeculativeTopNFailed& e) {
3201  CHECK_EQ(size_t(1), groupby_exprs.size());
3202  CHECK(groupby_exprs.front());
3203  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3204  return execute_sort_query();
3205  }
3206 }
3207 
3209  const RelSort* sort,
3210  std::list<Analyzer::OrderEntry>& order_entries,
3211  const ExecutionOptions& eo) {
3212  const auto source = sort->getInput(0);
3213  const size_t limit = sort->getLimit();
3214  const size_t offset = sort->getOffset();
3215  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3216  const size_t scan_total_limit =
3217  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3218  size_t max_groups_buffer_entry_guess{
3219  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3221  SortInfo sort_info{
3222  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3223  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3224  const auto& source_exe_unit = source_work_unit.exe_unit;
3225 
3226  // we do not allow sorting geometry or array types
3227  for (auto order_entry : order_entries) {
3228  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3229  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3230  const auto& ti = get_target_info(te, false);
3231  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3232  throw std::runtime_error(
3233  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3234  }
3235  }
3236 
3237  if (source_exe_unit.groupby_exprs.size() == 1) {
3238  if (!source_exe_unit.groupby_exprs.front()) {
3239  sort_algorithm = SortAlgorithm::StreamingTopN;
3240  } else {
3241  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3242  first_oe_is_desc(order_entries))) {
3243  sort_algorithm = SortAlgorithm::Default;
3244  }
3245  }
3246  }
3247 
3248  sort->setOutputMetainfo(source->getOutputMetainfo());
3249  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3250  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3251  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3252  std::move(source_exe_unit.input_col_descs),
3253  source_exe_unit.simple_quals,
3254  source_exe_unit.quals,
3255  source_exe_unit.join_quals,
3256  source_exe_unit.groupby_exprs,
3257  source_exe_unit.target_exprs,
3258  nullptr,
3259  {sort_info.order_entries,
3260  sort_algorithm,
3261  limit,
3262  offset,
3263  sort_info.limit_delivered},
3264  scan_total_limit,
3265  source_exe_unit.query_hint,
3266  source_exe_unit.query_plan_dag_hash,
3267  source_exe_unit.hash_table_build_plan_dag,
3268  source_exe_unit.table_id_to_node_map,
3269  source_exe_unit.use_bump_allocator,
3270  source_exe_unit.union_all,
3271  source_exe_unit.query_state},
3272  source,
3273  max_groups_buffer_entry_guess,
3274  std::move(source_work_unit.query_rewriter),
3275  source_work_unit.input_permutation,
3276  source_work_unit.left_deep_join_input_sizes};
3277 }
3278 
3279 namespace {
3280 
3287 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
3288  CHECK(!table_infos.empty());
3289  const auto& first_table = table_infos.front();
3290  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
3291  for (const auto& table_info : table_infos) {
3292  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
3293  max_num_groups = table_info.info.getNumTuplesUpperBound();
3294  }
3295  }
3296  return std::max(max_num_groups, size_t(1));
3297 }
3298 
3299 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
3300  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
3301 }
3302 
3304  const RenderInfo* render_info,
3305  const RelAlgNode* body) {
3306  if (!is_projection(ra_exe_unit)) {
3307  return false;
3308  }
3309  if (render_info && render_info->isPotentialInSituRender()) {
3310  return false;
3311  }
3312  if (!ra_exe_unit.sort_info.order_entries.empty()) {
3313  // disable output columnar when we have top-sort node query
3314  return false;
3315  }
3316  for (const auto& target_expr : ra_exe_unit.target_exprs) {
3317  // We don't currently support varlen columnar projections, so
3318  // return false if we find one
3319  if (target_expr->get_type_info().is_varlen()) {
3320  return false;
3321  }
3322  }
3323  if (auto top_project = dynamic_cast<const RelProject*>(body)) {
3324  if (top_project->isRowwiseOutputForced()) {
3325  return false;
3326  }
3327  }
3328  return true;
3329 }
3330 
3334 }
3335 
3343  for (const auto target_expr : ra_exe_unit.target_exprs) {
3344  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
3345  return false;
3346  }
3347  }
3348  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
3349  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
3350  return true;
3351  }
3352  return false;
3353 }
3354 
3355 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
3356  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
3357  ra_exe_unit.simple_quals.empty());
3358 }
3359 
3361  const RelAlgExecutionUnit& ra_exe_unit_in,
3362  const std::vector<InputTableInfo>& table_infos,
3363  const Executor* executor,
3364  const ExecutorDeviceType device_type_in,
3365  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
3366  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
3367  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
3368  const auto target_expr = ra_exe_unit.target_exprs[i];
3369  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3370  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
3371  continue;
3372  }
3373  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
3374  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
3375  CHECK(arg);
3376  const auto& arg_ti = arg->get_type_info();
3377  // Avoid calling getExpressionRange for variable length types (string and array),
3378  // it'd trigger an assertion since that API expects to be called only for types
3379  // for which the notion of range is well-defined. A bit of a kludge, but the
3380  // logic to reject these types anyway is at lower levels in the stack and not
3381  // really worth pulling into a separate function for now.
3382  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
3383  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
3384  continue;
3385  }
3386  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
3387  if (arg_range.getType() != ExpressionRangeType::Integer) {
3388  continue;
3389  }
3390  // When running distributed, the threshold for using the precise implementation
3391  // must be consistent across all leaves, otherwise we could have a mix of precise
3392  // and approximate bitmaps and we cannot aggregate them.
3393  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
3394  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
3395  const auto sub_bitmap_count =
3396  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
3397  int64_t approx_bitmap_sz_bits{0};
3398  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
3399  if (error_rate) {
3400  CHECK(error_rate->get_type_info().get_type() == kINT);
3401  CHECK_GE(error_rate->get_constval().intval, 1);
3402  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3403  } else {
3404  approx_bitmap_sz_bits = g_hll_precision_bits;
3405  }
3406  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3407  arg_range.getIntMin(),
3408  approx_bitmap_sz_bits,
3409  true,
3410  device_type,
3411  sub_bitmap_count};
3412  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3413  arg_range.getIntMin(),
3414  bitmap_sz_bits,
3415  false,
3416  device_type,
3417  sub_bitmap_count};
3418  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3419  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3420  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3421  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3422  target_exprs_owned.push_back(precise_count_distinct);
3423  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3424  }
3425  }
3426  return ra_exe_unit;
3427 }
3428 
3429 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3430  const CompilationOptions& co,
3431  const ExecutionOptions& eo) {
3433  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3434 }
3435 
3436 } // namespace
3437 
3439  const RelAlgExecutor::WorkUnit& work_unit,
3440  const std::vector<TargetMetaInfo>& targets_meta,
3441  const bool is_agg,
3442  const CompilationOptions& co_in,
3443  const ExecutionOptions& eo_in,
3444  RenderInfo* render_info,
3445  const int64_t queue_time_ms,
3446  const std::optional<size_t> previous_count) {
3448  auto timer = DEBUG_TIMER(__func__);
3449  auto query_exec_time_begin = timer_start();
3450 
3451  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3452  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3453 
3454  auto co = co_in;
3455  auto eo = eo_in;
3456  ColumnCacheMap column_cache;
3457  if (is_window_execution_unit(work_unit.exe_unit)) {
3459  throw std::runtime_error("Window functions support is disabled");
3460  }
3462  co.allow_lazy_fetch = false;
3463  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3464  }
3465  if (!eo.just_explain && eo.find_push_down_candidates) {
3466  // find potential candidates:
3467  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3468  if (!selected_filters.empty() || eo.just_calcite_explain) {
3469  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3470  }
3471  }
3472  if (render_info && render_info->isPotentialInSituRender()) {
3473  co.allow_lazy_fetch = false;
3474  }
3475  const auto body = work_unit.body;
3476  CHECK(body);
3477  auto it = leaf_results_.find(body->getId());
3478  VLOG(3) << "body->getId()=" << body->getId()
3479  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3480  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3481  if (it != leaf_results_.end()) {
3482  executor_->addTransientStringLiterals(work_unit.exe_unit,
3483  executor_->row_set_mem_owner_);
3484  auto& aggregated_result = it->second;
3485  auto& result_rows = aggregated_result.rs;
3486  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3487  body->setOutputMetainfo(aggregated_result.targets_meta);
3488  if (render_info) {
3489  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3490  }
3491  return result;
3492  }
3493  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3494 
3496  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3497 
3498  // register query hint if query_dag_ is valid
3499  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3500  if (query_dag_) {
3501  auto candidate = query_dag_->getQueryHint(body);
3502  if (candidate) {
3503  ra_exe_unit.query_hint = *candidate;
3504  }
3505  }
3506  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3507  if (is_window_execution_unit(ra_exe_unit)) {
3508  CHECK_EQ(table_infos.size(), size_t(1));
3509  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3510  max_groups_buffer_entry_guess =
3511  table_infos.front().info.fragments.front().getNumTuples();
3512  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3513  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3514  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3515  ra_exe_unit.scan_limit = *previous_count;
3516  } else {
3517  // TODO(adb): enable bump allocator path for render queries
3518  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3519  ra_exe_unit.scan_limit = 0;
3520  ra_exe_unit.use_bump_allocator = true;
3521  } else if (eo.executor_type == ::ExecutorType::Extern) {
3522  ra_exe_unit.scan_limit = 0;
3523  } else if (!eo.just_explain) {
3524  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3525  if (filter_count_all) {
3526  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3527  }
3528  }
3529  }
3530  }
3531 
3532  // when output_columnar_hint is true here, it means either 1) columnar output
3533  // configuration is on or 2) a user hint is given but we have to disable it if some
3534  // requirements are not satisfied
3535  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3536  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3537  VLOG(1) << "Using columnar layout for projection as output size of "
3538  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3540  eo.output_columnar_hint = true;
3541  }
3542  } else {
3543  eo.output_columnar_hint = false;
3544  }
3545 
3546  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3547  co.device_type,
3549  nullptr,
3550  executor_->getCatalog(),
3551  executor_->blockSize(),
3552  executor_->gridSize()),
3553  {}};
3554 
3555  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3556  const bool has_cardinality_estimation,
3557  const bool has_ndv_estimation) -> ExecutionResult {
3558  // Note that the groups buffer entry guess may be modified during query execution.
3559  // Create a local copy so we can track those changes if we need to attempt a retry
3560  // due to OOM
3561  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3562  try {
3563  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3564  is_agg,
3565  table_infos,
3566  ra_exe_unit,
3567  co,
3568  eo,
3569  cat_,
3570  render_info,
3571  has_cardinality_estimation,
3572  column_cache),
3573  targets_meta};
3574  } catch (const QueryExecutionError& e) {
3575  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3576  throw CardinalityEstimationRequired(/*range=*/0);
3577  }
3579  return handleOutOfMemoryRetry(
3580  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3581  targets_meta,
3582  is_agg,
3583  co,
3584  eo,
3585  render_info,
3587  queue_time_ms);
3588  }
3589  };
3590 
3591  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3592  for (const auto& table_info : table_infos) {
3593  const auto td = cat_.getMetadataForTable(table_info.table_id);
3594  if (td && (td->isTemporaryTable() || td->isView)) {
3595  use_resultset_cache = false;
3596  if (eo.keep_result) {
3597  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3598  "temporary table or view";
3599  }
3600  }
3601  }
3602 
3603  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3604  try {
3605  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3606  auto card = cached_cardinality.second;
3607  if (cached_cardinality.first && card >= 0) {
3608  result = execute_and_handle_errors(
3609  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3610  } else {
3611  result = execute_and_handle_errors(
3612  max_groups_buffer_entry_guess,
3614  /*has_ndv_estimation=*/false);
3615  }
3616  } catch (const CardinalityEstimationRequired& e) {
3617  // check the cardinality cache
3618  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3619  auto card = cached_cardinality.second;
3620  if (cached_cardinality.first && card >= 0) {
3621  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3622  } else {
3623  const auto ndv_groups_estimation =
3624  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3625  const auto estimated_groups_buffer_entry_guess =
3626  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3627  : std::min(groups_approx_upper_bound(table_infos),
3629  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3630  result = execute_and_handle_errors(
3631  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3632  if (!(eo.just_validate || eo.just_explain)) {
3633  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3634  }
3635  }
3636  }
3637 
3638  result.setQueueTime(queue_time_ms);
3639  if (render_info) {
3640  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3641  if (render_info->isPotentialInSituRender()) {
3642  // return an empty result (with the same queue time, and zero render time)
3643  return {std::make_shared<ResultSet>(
3644  queue_time_ms,
3645  0,
3646  executor_->row_set_mem_owner_
3647  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3648  : nullptr),
3649  {}};
3650  }
3651  }
3652 
3653  for (auto& target_info : result.getTargetsMeta()) {
3654  if (target_info.get_type_info().is_string() &&
3655  !target_info.get_type_info().is_dict_encoded_string()) {
3656  // currently, we do not support resultset caching if non-encoded string is projected
3657  use_resultset_cache = false;
3658  if (eo.keep_result) {
3659  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3660  "string column projection";
3661  }
3662  }
3663  }
3664 
3665  const auto res = result.getDataPtr();
3666  auto allow_auto_caching_resultset =
3667  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3668  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3669  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset) &&
3670  !work_unit.exe_unit.sort_info.limit_delivered) {
3671  auto query_exec_time = timer_stop(query_exec_time_begin);
3672  res->setExecTime(query_exec_time);
3673  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3674  res->setTargetMetaInfo(body->getOutputMetainfo());
3675  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3676  res->setInputTableKeys(std::move(input_table_keys));
3677  if (allow_auto_caching_resultset) {
3678  VLOG(1) << "Automatically keep query resultset to recycler";
3679  }
3680  res->setUseSpeculativeTopNSort(
3681  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3682  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
3683  ra_exe_unit.query_plan_dag_hash,
3684  res->getInputTableKeys(),
3685  res,
3686  res->getBufferSizeBytes(co.device_type),
3688  } else {
3689  if (eo.keep_result) {
3690  if (g_cluster) {
3691  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3692  "resultset recycling on distributed mode";
3693  } else if (hasStepForUnion()) {
3694  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
3695  "operator";
3696  } else if (render_info && render_info->isPotentialInSituRender()) {
3697  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
3698  "a in-situ rendering query";
3699  } else if (is_validate_or_explain_query(eo)) {
3700  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
3701  "validate or explain query";
3702  } else {
3703  VLOG(1) << "Query hint \'keep_result\' is ignored";
3704  }
3705  }
3706  }
3707 
3708  return result;
3709 }
3710 
3711 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3712  const bool is_agg,
3713  const CompilationOptions& co,
3714  const ExecutionOptions& eo) {
3715  const auto count =
3716  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3717  kCOUNT,
3718  nullptr,
3719  false,
3720  nullptr);
3721  const auto count_all_exe_unit =
3722  work_unit.exe_unit.createCountAllExecutionUnit(count.get());
3723  size_t one{1};
3724  ResultSetPtr count_all_result;
3725  try {
3726  ColumnCacheMap column_cache;
3727  count_all_result =
3728  executor_->executeWorkUnit(one,
3729  is_agg,
3730  get_table_infos(work_unit.exe_unit, executor_),
3731  count_all_exe_unit,
3732  co,
3733  eo,
3734  cat_,
3735  nullptr,
3736  false,
3737  column_cache);
3738  } catch (const foreign_storage::ForeignStorageException& error) {
3739  throw error;
3740  } catch (const QueryMustRunOnCpu&) {
3741  // force a retry of the top level query on CPU
3742  throw;
3743  } catch (const std::exception& e) {
3744  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3745  return std::nullopt;
3746  }
3747  const auto count_row = count_all_result->getNextRow(false, false);
3748  CHECK_EQ(size_t(1), count_row.size());
3749  const auto& count_tv = count_row.front();
3750  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3751  CHECK(count_scalar_tv);
3752  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3753  CHECK(count_ptr);
3754  CHECK_GE(*count_ptr, 0);
3755  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3756  return std::max(count_upper_bound, size_t(1));
3757 }
3758 
3759 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3760  const auto& ra_exe_unit = work_unit.exe_unit;
3761  if (ra_exe_unit.input_descs.size() != 1) {
3762  return false;
3763  }
3764  const auto& table_desc = ra_exe_unit.input_descs.front();
3765  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3766  return false;
3767  }
3768  const int table_id = table_desc.getTableId();
3769  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3770  const auto comp_expr =
3771  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3772  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3773  return false;
3774  }
3775  const auto lhs = comp_expr->get_left_operand();
3776  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3777  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3778  return false;
3779  }
3780  const auto rhs = comp_expr->get_right_operand();
3781  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3782  if (!rhs_const) {
3783  return false;
3784  }
3785  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3786  if (cd->isVirtualCol) {
3787  CHECK_EQ("rowid", cd->columnName);
3788  return true;
3789  }
3790  }
3791  return false;
3792 }
3793 
3795  const RelAlgExecutor::WorkUnit& work_unit,
3796  const std::vector<TargetMetaInfo>& targets_meta,
3797  const bool is_agg,
3798  const CompilationOptions& co,
3799  const ExecutionOptions& eo,
3800  RenderInfo* render_info,
3801  const bool was_multifrag_kernel_launch,
3802  const int64_t queue_time_ms) {
3803  // Disable the bump allocator
3804  // Note that this will have basically the same affect as using the bump allocator for
3805  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3806  // path and the bump allocator path for kernel per fragment execution.
3807  auto ra_exe_unit_in = work_unit.exe_unit;
3808  ra_exe_unit_in.use_bump_allocator = false;
3809 
3810  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3811  co.device_type,
3813  nullptr,
3814  executor_->getCatalog(),
3815  executor_->blockSize(),
3816  executor_->gridSize()),
3817  {}};
3818 
3819  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3820  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3821  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3822  eo.keep_result,
3823  false,
3824  false,
3825  eo.allow_loop_joins,
3826  eo.with_watchdog,
3827  eo.jit_debug,
3828  false,
3831  false,
3832  false,
3837  eo.executor_type,
3839 
3840  if (was_multifrag_kernel_launch) {
3841  try {
3842  // Attempt to retry using the kernel per fragment path. The smaller input size
3843  // required may allow the entire kernel to execute in GPU memory.
3844  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3845  "kernels disabled.";
3846  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3847  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3848  ColumnCacheMap column_cache;
3849  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3850  is_agg,
3851  table_infos,
3852  ra_exe_unit,
3853  co,
3854  eo_no_multifrag,
3855  cat_,
3856  nullptr,
3857  true,
3858  column_cache),
3859  targets_meta};
3860  result.setQueueTime(queue_time_ms);
3861  } catch (const QueryExecutionError& e) {
3863  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3864  }
3865  }
3866 
3867  if (render_info) {
3868  render_info->setForceNonInSituData();
3869  }
3870 
3871  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3872  // Only reset the group buffer entry guess if we ran out of slots, which
3873  // suggests a
3874  // highly pathological input which prevented a good estimation of distinct tuple
3875  // count. For projection queries, this will force a per-fragment scan limit, which is
3876  // compatible with the CPU path
3877  VLOG(1) << "Resetting max groups buffer entry guess.";
3878  max_groups_buffer_entry_guess = 0;
3879 
3880  int iteration_ctr = -1;
3881  while (true) {
3882  iteration_ctr++;
3884  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3885  ColumnCacheMap column_cache;
3886  try {
3887  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3888  is_agg,
3889  table_infos,
3890  ra_exe_unit,
3891  co_cpu,
3892  eo_no_multifrag,
3893  cat_,
3894  nullptr,
3895  true,
3896  column_cache),
3897  targets_meta};
3898  } catch (const QueryExecutionError& e) {
3899  // Ran out of slots
3900  if (e.getErrorCode() < 0) {
3901  // Even the conservative guess failed; it should only happen when we group
3902  // by a huge cardinality array. Maybe we should throw an exception instead?
3903  // Such a heavy query is entirely capable of exhausting all the host memory.
3904  CHECK(max_groups_buffer_entry_guess);
3905  // Only allow two iterations of increasingly large entry guesses up to a maximum
3906  // of 512MB per column per kernel
3907  if (g_enable_watchdog || iteration_ctr > 1) {
3908  throw std::runtime_error("Query ran out of output slots in the result");
3909  }
3910  max_groups_buffer_entry_guess *= 2;
3911  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3912  "groups buffer entry "
3913  "guess equal to "
3914  << max_groups_buffer_entry_guess;
3915  } else {
3917  }
3918  continue;
3919  }
3920  result.setQueueTime(queue_time_ms);
3921  return result;
3922  }
3923  return result;
3924 }
3925 
3926 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3927  LOG(ERROR) << "Query execution failed with error "
3928  << getErrorMessageFromCode(error_code);
3929  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3930  // We ran out of GPU memory, this doesn't count as an error if the query is
3931  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3932  // --allow-cpu-retry.
3933  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3934  if (!g_allow_cpu_retry) {
3935  throw std::runtime_error(
3936  "Query ran out of GPU memory, unable to automatically retry on CPU");
3937  }
3938  return;
3939  }
3940  throw std::runtime_error(getErrorMessageFromCode(error_code));
3941 }
3942 
3943 namespace {
3944 struct ErrorInfo {
3945  const char* code{nullptr};
3946  const char* description{nullptr};
3947 };
3948 ErrorInfo getErrorDescription(const int32_t error_code) {
3949  // 'designated initializers' don't compile on Windows for std 17
3950  // They require /std:c++20. They been removed for the windows port.
3951  switch (error_code) {
3953  return {"ERR_DIV_BY_ZERO", "Division by zero"};
3955  return {"ERR_OUT_OF_GPU_MEM",
3956 
3957  "Query couldn't keep the entire working set of columns in GPU memory"};
3959  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
3961  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
3963  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
3965  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
3967  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
3969  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3970  "Columnar conversion not supported for variable length types"};
3972  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
3974  return {"ERR_STRING_CONST_IN_RESULTSET",
3975 
3976  "NONE ENCODED String types are not supported as input result set."};
3978  return {"ERR_OUT_OF_RENDER_MEM",
3979 
3980  "Insufficient GPU memory for query results in render output buffer "
3981  "sized by render-mem-bytes"};
3983  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3984  "Streaming-Top-N not supported in Render Query"};
3986  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3987  "Multiple distinct values encountered"};
3988  case Executor::ERR_GEOS:
3989  return {"ERR_GEOS", "ERR_GEOS"};
3991  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
3992 
3993  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
3994  default:
3995  return {nullptr, nullptr};
3996  }
3997 }
3998 
3999 } // namespace
4000 
4001 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
4002  if (error_code < 0) {
4003  return "Ran out of slots in the query output buffer";
4004  }
4005  const auto errorInfo = getErrorDescription(error_code);
4006 
4007  if (errorInfo.code) {
4008  return errorInfo.code + ": "s + errorInfo.description;
4009  } else {
4010  return "Other error: code "s + std::to_string(error_code);
4011  }
4012 }
4013 
4016  VLOG(1) << "Running post execution callback.";
4017  (*post_execution_callback_)();
4018  }
4019 }
4020 
4022  const SortInfo& sort_info,
4023  const ExecutionOptions& eo) {
4024  const auto compound = dynamic_cast<const RelCompound*>(node);
4025  if (compound) {
4026  return createCompoundWorkUnit(compound, sort_info, eo);
4027  }
4028  const auto project = dynamic_cast<const RelProject*>(node);
4029  if (project) {
4030  return createProjectWorkUnit(project, sort_info, eo);
4031  }
4032  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4033  if (aggregate) {
4034  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4035  }
4036  const auto filter = dynamic_cast<const RelFilter*>(node);
4037  if (filter) {
4038  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4039  }
4040  LOG(FATAL) << "Unhandled node type: "
4042  return {};
4043 }
4044 
4045 namespace {
4046 
4048  auto sink = get_data_sink(ra);
4049  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
4050  return join->getJoinType();
4051  }
4052  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
4053  return JoinType::INNER;
4054  }
4055 
4056  return JoinType::INVALID;
4057 }
4058 
4059 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
4060  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4061  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
4062  return nullptr;
4063  }
4064  const auto equi_join_condition =
4065  dynamic_cast<const RexOperator*>(condition->getOperand(0));
4066  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
4067  return nullptr;
4068  }
4069  const auto both_are_null_condition =
4070  dynamic_cast<const RexOperator*>(condition->getOperand(1));
4071  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
4072  both_are_null_condition->size() != 2) {
4073  return nullptr;
4074  }
4075  const auto lhs_is_null =
4076  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
4077  const auto rhs_is_null =
4078  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
4079  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
4080  rhs_is_null->getOperator() != kISNULL) {
4081  return nullptr;
4082  }
4083  CHECK_EQ(size_t(1), lhs_is_null->size());
4084  CHECK_EQ(size_t(1), rhs_is_null->size());
4085  CHECK_EQ(size_t(2), equi_join_condition->size());
4086  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
4087  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
4088  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
4089  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
4090  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
4091  return nullptr;
4092  }
4093  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
4094  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
4095  RexDeepCopyVisitor deep_copy_visitor;
4096  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
4097  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
4098  eq_operands.emplace_back(lhs_op_copy.release());
4099  eq_operands.emplace_back(rhs_op_copy.release());
4100  return boost::make_unique<const RexOperator>(
4101  kBW_EQ, eq_operands, equi_join_condition->getType());
4102  }
4103  return nullptr;
4104 }
4105 
4106 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
4107  const RexScalar* scalar) {
4108  const auto condition = dynamic_cast<const RexOperator*>(scalar);
4109  if (condition && condition->getOperator() == kAND) {
4110  CHECK_GE(condition->size(), size_t(2));
4111  auto acc = get_bitwise_equals(condition->getOperand(0));
4112  if (!acc) {
4113  return nullptr;
4114  }
4115  for (size_t i = 1; i < condition->size(); ++i) {
4116  std::vector<std::unique_ptr<const RexScalar>> and_operands;
4117  and_operands.emplace_back(std::move(acc));
4118  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
4119  acc =
4120  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
4121  }
4122  return acc;
4123  }
4124  return get_bitwise_equals(scalar);
4125 }
4126 
4127 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
4128  CHECK_GE(left_deep_join->inputCount(), size_t(2));
4129  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
4130  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
4131  ++nesting_level) {
4132  if (left_deep_join->getOuterCondition(nesting_level)) {
4133  join_types[nesting_level - 1] = JoinType::LEFT;
4134  }
4135  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
4136  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
4137  join_types[nesting_level - 1] = cur_level_join_type;
4138  }
4139  }
4140  return join_types;
4141 }
4142 
4143 template <class RA>
4144 std::vector<size_t> do_table_reordering(
4145  std::vector<InputDescriptor>& input_descs,
4146  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
4147  const JoinQualsPerNestingLevel& left_deep_join_quals,
4148  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4149  const RA* node,
4150  const std::vector<InputTableInfo>& query_infos,
4151  const Executor* executor) {
4152  if (g_cluster) {
4153  // Disable table reordering in distributed mode. The aggregator does not have enough
4154  // information to break ties
4155  return {};
4156  }
4157  const auto& cat = *executor->getCatalog();
4158  for (const auto& table_info : query_infos) {
4159  if (table_info.table_id < 0) {
4160  continue;
4161  }
4162  const auto td = cat.getMetadataForTable(table_info.table_id);
4163  CHECK(td);
4164  if (table_is_replicated(td)) {
4165  return {};
4166  }
4167  }
4168  const auto input_permutation =
4169  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
4170  input_to_nest_level = get_input_nest_levels(node, input_permutation);
4171  std::tie(input_descs, input_col_descs, std::ignore) =
4172  get_input_desc(node, input_to_nest_level, input_permutation, cat);
4173  return input_permutation;
4174 }
4175 
4177  const RelLeftDeepInnerJoin* left_deep_join) {
4178  std::vector<size_t> input_sizes;
4179  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
4180  const auto inputs = get_node_output(left_deep_join->getInput(i));
4181  input_sizes.push_back(inputs.size());
4182  }
4183  return input_sizes;
4184 }
4185 
4186 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
4187  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
4188  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
4189  for (const auto& qual : quals) {
4190  const auto rewritten_qual = rewrite_expr(qual.get());
4191  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
4192  }
4193  return rewritten_quals;
4194 }
4195 
4196 } // namespace
4197 
4199  const RelCompound* compound,
4200  const SortInfo& sort_info,
4201  const ExecutionOptions& eo) {
4202  std::vector<InputDescriptor> input_descs;
4203  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4204  auto input_to_nest_level = get_input_nest_levels(compound, {});
4205  std::tie(input_descs, input_col_descs, std::ignore) =
4206  get_input_desc(compound, input_to_nest_level, {}, cat_);
4207  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4208  const auto query_infos = get_table_infos(input_descs, executor_);
4209  CHECK_EQ(size_t(1), compound->inputCount());
4210  const auto left_deep_join =
4211  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4212  JoinQualsPerNestingLevel left_deep_join_quals;
4213  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4214  : std::vector<JoinType>{get_join_type(compound)};
4215  std::vector<size_t> input_permutation;
4216  std::vector<size_t> left_deep_join_input_sizes;
4217  std::optional<unsigned> left_deep_tree_id;
4218  if (left_deep_join) {
4219  left_deep_tree_id = left_deep_join->getId();
4220  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4221  left_deep_join_quals = translateLeftDeepJoinFilter(
4222  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4224  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4225  join_types.end()) {
4226  input_permutation = do_table_reordering(input_descs,
4227  input_col_descs,
4228  left_deep_join_quals,
4229  input_to_nest_level,
4230  compound,
4231  query_infos,
4232  executor_);
4233  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4234  std::tie(input_descs, input_col_descs, std::ignore) =
4235  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
4236  left_deep_join_quals = translateLeftDeepJoinFilter(
4237  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4238  }
4239  }
4240  RelAlgTranslator translator(cat_,
4241  query_state_,
4242  executor_,
4243  input_to_nest_level,
4244  join_types,
4245  now_,
4246  eo.just_explain);
4247  const auto scalar_sources =
4248  translate_scalar_sources(compound, translator, eo.executor_type);
4249  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4250  const auto quals_cf = translate_quals(compound, translator);
4251  const auto target_exprs = translate_targets(target_exprs_owned_,
4252  scalar_sources,
4253  groupby_exprs,
4254  compound,
4255  translator,
4256  eo.executor_type);
4257 
4258  auto query_hint = RegisteredQueryHint::defaults();
4259  if (query_dag_) {
4260  auto candidate = query_dag_->getQueryHint(compound);
4261  if (candidate) {
4262  query_hint = *candidate;
4263  }
4264  }
4265  CHECK_EQ(compound->size(), target_exprs.size());
4266  const RelAlgExecutionUnit exe_unit = {input_descs,
4267  input_col_descs,
4268  quals_cf.simple_quals,
4269  rewrite_quals(quals_cf.quals),
4270  left_deep_join_quals,
4271  groupby_exprs,
4272  target_exprs,
4273  nullptr,
4274  sort_info,
4275  0,
4276  query_hint,
4277  compound->getQueryPlanDagHash(),
4278  {},
4279  {},
4280  false,
4281  std::nullopt,
4282  query_state_};
4283  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4284  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4285  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4286  compound->setOutputMetainfo(targets_meta);
4287  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4288  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4289  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4290  rewritten_exe_unit.join_quals);
4291  }
4292  if (has_valid_query_plan_dag(compound)) {
4293  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4294  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4295  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4296  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4297  }
4298  return {rewritten_exe_unit,
4299  compound,
4301  std::move(query_rewriter),
4302  input_permutation,
4303  left_deep_join_input_sizes};
4304 }
4305 
4306 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
4307  const RelAlgNode* node) {
4308  auto input_to_nest_level = get_input_nest_levels(node, {});
4309  const auto left_deep_join =
4310  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4311  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4312  : std::vector<JoinType>{get_join_type(node)};
4313  return std::make_shared<RelAlgTranslator>(
4314  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
4315 }
4316 
4317 namespace {
4318 
4319 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
4320  CHECK(qual_expr);
4321  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
4322  if (!bin_oper || bin_oper->getOperator() != kAND) {
4323  return {qual_expr};
4324  }
4325  CHECK_GE(bin_oper->size(), size_t(2));
4326  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
4327  for (size_t i = 1; i < bin_oper->size(); ++i) {
4328  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
4329  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
4330  }
4331  return lhs_cf;
4332 }
4333 
4334 std::shared_ptr<Analyzer::Expr> build_logical_expression(
4335  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
4336  const SQLOps sql_op) {
4337  CHECK(!factors.empty());
4338  auto acc = factors.front();
4339  for (size_t i = 1; i < factors.size(); ++i) {
4340  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
4341  }
4342  return acc;
4343 }
4344 
4345 template <class QualsList>
4346 bool list_contains_expression(const QualsList& haystack,
4347  const std::shared_ptr<Analyzer::Expr>& needle) {
4348  for (const auto& qual : haystack) {
4349  if (*qual == *needle) {
4350  return true;
4351  }
4352  }
4353  return false;
4354 }
4355 
4356 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
4357 // evaluations of `p` and allows use of the original form in joins if `p`
4358 // can be used for hash joins.
4359 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
4360  const std::shared_ptr<Analyzer::Expr>& expr) {
4361  const auto expr_terms = qual_to_disjunctive_form(expr);
4362  CHECK_GE(expr_terms.size(), size_t(1));
4363  const auto& first_term = expr_terms.front();
4364  const auto first_term_factors = qual_to_conjunctive_form(first_term);
4365  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
4366  // First, collect the conjunctive components common to all the disjunctive components.
4367  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
4368  for (const auto& first_term_factor : first_term_factors.quals) {
4369  bool is_common =
4370  expr_terms.size() > 1; // Only report common factors for disjunction.
4371  for (size_t i = 1; i < expr_terms.size(); ++i) {
4372  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
4373  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
4374  is_common = false;
4375  break;
4376  }
4377  }
4378  if (is_common) {
4379  common_factors.push_back(first_term_factor);
4380  }
4381  }
4382  if (common_factors.empty()) {
4383  return expr;
4384  }
4385  // Now that the common expressions are known, collect the remaining expressions.
4386  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
4387  for (const auto& term : expr_terms) {
4388  const auto term_cf = qual_to_conjunctive_form(term);
4389  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
4390  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
4391  for (const auto& qual : term_cf.quals) {
4392  if (!list_contains_expression(common_factors, qual)) {
4393  remaining_quals.push_back(qual);
4394  }
4395  }
4396  if (!remaining_quals.empty()) {
4397  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
4398  }
4399  }
4400  // Reconstruct the expression with the transformation applied.
4401  const auto common_expr = build_logical_expression(common_factors, kAND);
4402  if (remaining_terms.empty()) {
4403  return common_expr;
4404  }
4405  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
4406  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
4407 }
4408 
4409 } // namespace
4410 
4411 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
4412  const RexScalar* join_condition,
4413  const std::vector<JoinType>& join_types,
4414  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4415  const bool just_explain) const {
4416  RelAlgTranslator translator(
4417  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4418  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4419  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4420  for (const auto rex_condition_component : rex_condition_cf) {
4421  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4422  const auto join_condition =
4424  bw_equals ? bw_equals.get() : rex_condition_component));
4425  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4426 
4427  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4428  for (const auto& cf_qual : cf_quals) {
4429  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4430  }
4431  };
4432 
4433  append_folded_cf_quals(join_condition_cf.quals);
4434  append_folded_cf_quals(join_condition_cf.simple_quals);
4435  }
4436  return combine_equi_join_conditions(join_condition_quals);
4437 }
4438 
4439 // Translate left deep join filter and separate the conjunctive form qualifiers
4440 // per nesting level. The code generated for hash table lookups on each level
4441 // must dominate its uses in deeper nesting levels.
4443  const RelLeftDeepInnerJoin* join,
4444  const std::vector<InputDescriptor>& input_descs,
4445  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
4446  const bool just_explain) {
4447  const auto join_types = left_deep_join_types(join);
4448  const auto join_condition_quals = makeJoinQuals(
4449  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4450  MaxRangeTableIndexVisitor rte_idx_visitor;
4451  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4452  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4453  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4454  const auto outer_condition = join->getOuterCondition(rte_idx);
4455  if (outer_condition) {
4456  result[rte_idx - 1].quals =
4457  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4458  CHECK_LE(rte_idx, join_types.size());
4459  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4460  result[rte_idx - 1].type = JoinType::LEFT;
4461  continue;
4462  }
4463  for (const auto& qual : join_condition_quals) {
4464  if (visited_quals.count(qual)) {
4465  continue;
4466  }
4467  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4468  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4469  const auto it_ok = visited_quals.emplace(qual);
4470  CHECK(it_ok.second);
4471  result[rte_idx - 1].quals.push_back(qual);
4472  }
4473  }
4474  CHECK_LE(rte_idx, join_types.size());
4475  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4476  join_types[rte_idx - 1] == JoinType::SEMI ||
4477  join_types[rte_idx - 1] == JoinType::ANTI);
4478  result[rte_idx - 1].type = join_types[rte_idx - 1];
4479  }
4480  return result;
4481 }
4482 
4483 namespace {
4484 
4485 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4486  const RelAlgNode* ra_node,
4487  const size_t nest_level,
4488  const std::vector<TargetMetaInfo>& in_metainfo,
4489  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4490  CHECK_LE(size_t(1), ra_node->inputCount());
4491  CHECK_GE(size_t(2), ra_node->inputCount());
4492  const auto input = ra_node->getInput(nest_level);
4493  const auto it_rte_idx = input_to_nest_level.find(input);
4494  CHECK(it_rte_idx != input_to_nest_level.end());
4495  const int rte_idx = it_rte_idx->second;
4496  const int table_id = table_id_from_ra(input);
4497  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4498  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4499  int input_idx = 0;
4500  for (const auto& input_meta : in_metainfo) {
4501  inputs.push_back(
4502  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4503  table_id,
4504  scan_ra ? input_idx + 1 : input_idx,
4505  rte_idx));
4506  ++input_idx;
4507  }
4508  return inputs;
4509 }
4510 
4511 std::vector<Analyzer::Expr*> get_raw_pointers(
4512  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4513  std::vector<Analyzer::Expr*> output(input.size());
4514  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4515  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4516  return output;
4517 }
4518 
4519 } // namespace
4520 
4522  const RelAggregate* aggregate,
4523  const SortInfo& sort_info,
4524  const bool just_explain) {
4525  std::vector<InputDescriptor> input_descs;
4526  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4527  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4528  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4529  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4530  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4531  const auto join_type = get_join_type(aggregate);
4532 
4533  RelAlgTranslator translator(cat_,
4534  query_state_,
4535  executor_,
4536  input_to_nest_level,
4537  {join_type},
4538  now_,
4539  just_explain);
4540  CHECK_EQ(size_t(1), aggregate->inputCount());
4541  const auto source = aggregate->getInput(0);
4542  const auto& in_metainfo = source->getOutputMetainfo();
4543  const auto scalar_sources =
4544  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4545  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4546  const auto target_exprs = translate_targets(
4547  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
4548 
4549  const auto query_infos = get_table_infos(input_descs, executor_);
4550 
4551  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4552  aggregate->setOutputMetainfo(targets_meta);
4553  auto query_hint = RegisteredQueryHint::defaults();
4554  if (query_dag_) {
4555  auto candidate = query_dag_->getQueryHint(aggregate);
4556  if (candidate) {
4557  query_hint = *candidate;
4558  }
4559  }
4560  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4561  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4562  return {RelAlgExecutionUnit{input_descs,
4563  input_col_descs,
4564  {},
4565  {},
4566  {},
4567  groupby_exprs,
4568  target_exprs,
4569  nullptr,
4570  sort_info,
4571  0,
4572  query_hint,
4573  aggregate->getQueryPlanDagHash(),
4574  join_info.hash_table_plan_dag,
4575  join_info.table_id_to_node_map,
4576  false,
4577  std::nullopt,
4578  query_state_},
4579  aggregate,
4581  nullptr};
4582 }
4583 
4585  const RelProject* project,
4586  const SortInfo& sort_info,
4587  const ExecutionOptions& eo) {
4588  std::vector<InputDescriptor> input_descs;
4589  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4590  auto input_to_nest_level = get_input_nest_levels(project, {});
4591  std::tie(input_descs, input_col_descs, std::ignore) =
4592  get_input_desc(project, input_to_nest_level, {}, cat_);
4593  const auto query_infos = get_table_infos(input_descs, executor_);
4594 
4595  const auto left_deep_join =
4596  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4597  JoinQualsPerNestingLevel left_deep_join_quals;
4598  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4599  : std::vector<JoinType>{get_join_type(project)};
4600  std::vector<size_t> input_permutation;
4601  std::vector<size_t> left_deep_join_input_sizes;
4602  std::optional<unsigned> left_deep_tree_id;
4603  if (left_deep_join) {
4604  left_deep_tree_id = left_deep_join->getId();
4605  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4606  const auto query_infos = get_table_infos(input_descs, executor_);
4607  left_deep_join_quals = translateLeftDeepJoinFilter(
4608  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4610  input_permutation = do_table_reordering(input_descs,
4611  input_col_descs,
4612  left_deep_join_quals,
4613  input_to_nest_level,
4614  project,
4615  query_infos,
4616  executor_);
4617  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4618  std::tie(input_descs, input_col_descs, std::ignore) =
4619  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4620  left_deep_join_quals = translateLeftDeepJoinFilter(
4621  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4622  }
4623  }
4624 
4625  RelAlgTranslator translator(cat_,
4626  query_state_,
4627  executor_,
4628  input_to_nest_level,
4629  join_types,
4630  now_,
4631  eo.just_explain);
4632  const auto target_exprs_owned =
4633  translate_scalar_sources(project, translator, eo.executor_type);
4634 
4635  target_exprs_owned_.insert(
4636  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4637  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4638  auto query_hint = RegisteredQueryHint::defaults();
4639  if (query_dag_) {
4640  auto candidate = query_dag_->getQueryHint(project);
4641  if (candidate) {
4642  query_hint = *candidate;
4643  }
4644  }
4645  const RelAlgExecutionUnit exe_unit = {input_descs,
4646  input_col_descs,
4647  {},
4648  {},
4649  left_deep_join_quals,
4650  {nullptr},
4651  target_exprs,
4652  nullptr,
4653  sort_info,
4654  0,
4655  query_hint,
4656  project->getQueryPlanDagHash(),
4657  {},
4658  {},
4659  false,
4660  std::nullopt,
4661  query_state_};
4662  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4663  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4664  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4665  project->setOutputMetainfo(targets_meta);
4666  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4667  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4668  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4669  rewritten_exe_unit.join_quals);
4670  }
4671  if (has_valid_query_plan_dag(project)) {
4672  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4673  project, left_deep_tree_id, left_deep_trees_info, executor_);
4674  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4675  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4676  }
4677  return {rewritten_exe_unit,
4678  project,
4680  std::move(query_rewriter),
4681  input_permutation,
4682  left_deep_join_input_sizes};
4683 }
4684 
4685 namespace {
4686 
4687 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4688  RelAlgNode const* input_node) {
4689  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4690  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4691  const int negative_node_id = -input_node->getId();
4692  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4693  target_exprs.reserve(tmis.size());
4694  for (size_t i = 0; i < tmis.size(); ++i) {
4695  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4696  tmis[i].get_type_info(), negative_node_id, i, 0));
4697  }
4698  return target_exprs;
4699 }
4700 
4701 } // namespace
4702 
4704  const RelLogicalUnion* logical_union,
4705  const SortInfo& sort_info,
4706  const ExecutionOptions& eo) {
4707  std::vector<InputDescriptor> input_descs;
4708  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4709  // Map ra input ptr to index (0, 1).
4710  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4711  std::tie(input_descs, input_col_descs, std::ignore) =
4712  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4713  const auto query_infos = get_table_infos(input_descs, executor_);
4714  auto const max_num_tuples =
4715  std::accumulate(query_infos.cbegin(),
4716  query_infos.cend(),
4717  size_t(0),
4718  [](auto max, auto const& query_info) {
4719  return std::max(max, query_info.info.getNumTuples());
4720  });
4721 
4722  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4723  for (auto& pair : input_to_nest_level) {
4724  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4725  << pair.second << ')';
4726  }
4727 
4728  RelAlgTranslator translator(
4729  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4730 
4731  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4732  // may differ on StringDictionaries.
4733  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4734  for (unsigned i = 0; i < 2; ++i) {
4735  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4736  CHECK(!input_exprs_owned.empty())
4737  << "No metainfo found for input node(" << i << ") "
4738  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
4739  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4740  for (auto& input_expr : input_exprs_owned) {
4741  VLOG(3) << " " << input_expr->toString();
4742  }
4743  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4744  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4745  }
4746 
4747  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4748  << " input_col_descs=" << shared::printContainer(input_col_descs)
4749  << " target_exprs.size()=" << target_exprs_pair[0].size()
4750  << " max_num_tuples=" << max_num_tuples;
4751 
4752  const RelAlgExecutionUnit exe_unit = {input_descs,
4753  input_col_descs,
4754  {}, // quals_cf.simple_quals,
4755  {}, // rewrite_quals(quals_cf.quals),
4756  {},
4757  {nullptr},
4758  target_exprs_pair[0],
4759  nullptr,
4760  sort_info,
4761  max_num_tuples,
4764  {},
4765  {},
4766  false,
4767  logical_union->isAll(),
4768  query_state_,
4769  target_exprs_pair[1]};
4770  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4771  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4772 
4773  RelAlgNode const* input0 = logical_union->getInput(0);
4774  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4775  logical_union->setOutputMetainfo(
4776  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4777  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4778  logical_union->setOutputMetainfo(
4779  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4780  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4781  logical_union->setOutputMetainfo(
4782  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4783  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4784  logical_union->setOutputMetainfo(
4785  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4786  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4787  logical_union->setOutputMetainfo(
4788  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4789  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4790  logical_union->setOutputMetainfo(
4791  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4792  } else if (dynamic_cast<const RelSort*>(input0)) {
4793  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4794  } else {
4795  throw QueryNotSupported("Unsupported input type: " +
4797  }
4798  VLOG(3) << "logical_union->getOutputMetainfo()="
4799  << shared::printContainer(logical_union->getOutputMetainfo())
4800  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4801  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4802 
4803  return {rewritten_exe_unit,
4804  logical_union,
4806  std::move(query_rewriter)};
4807 }
4808 
4810  const RelTableFunction* rel_table_func,
4811  const bool just_explain,
4812  const bool is_gpu) {
4813  std::vector<InputDescriptor> input_descs;
4814  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4815  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4816  std::tie(input_descs, input_col_descs, std::ignore) =
4817  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4818  const auto query_infos = get_table_infos(input_descs, executor_);
4819  RelAlgTranslator translator(
4820  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4821  const auto input_exprs_owned = translate_scalar_sources(
4822  rel_table_func, translator, ::ExecutorType::TableFunctions);
4823  target_exprs_owned_.insert(
4824  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4825  auto input_exprs = get_raw_pointers(input_exprs_owned);
4826 
4827  const auto table_function_impl_and_type_infos = [=]() {
4828  if (is_gpu) {
4829  try {
4830  return bind_table_function(
4831  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4832  } catch (ExtensionFunctionBindingError& e) {
4833  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4834  << " Redirecting " << rel_table_func->getFunctionName()
4835  << " step to run on CPU.";
4836  throw QueryMustRunOnCpu();
4837  }
4838  } else {
4839  try {
4840  return bind_table_function(
4841  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4842  } catch (ExtensionFunctionBindingError& e) {
4843  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4844  throw;
4845  }
4846  }
4847  }();
4848  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4849  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4850 
4851  size_t output_row_sizing_param = 0;
4852  if (table_function_impl
4853  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
4854  const auto parameter_index =
4855  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4856  CHECK_GT(parameter_index, size_t(0));
4857  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4858  const auto parameter_expr =
4859  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4860  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4861  if (!parameter_expr_literal) {
4862  throw std::runtime_error(
4863  "Provided output buffer sizing parameter is not a literal. Only literal "
4864  "values are supported with output buffer sizing configured table "
4865  "functions.");
4866  }
4867  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4868  if (literal_val < 0) {
4869  throw std::runtime_error("Provided output sizing parameter " +
4870  std::to_string(literal_val) +
4871  " must be positive integer.");
4872  }
4873  output_row_sizing_param = static_cast<size_t>(literal_val);
4874  } else {
4875  // RowMultiplier not specified in the SQL query. Set it to 1
4876  output_row_sizing_param = 1; // default value for RowMultiplier
4877  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
4878  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4879  makeExpr<Analyzer::Constant>(kINT, false, d);
4880  // Push the constant 1 to input_exprs
4881  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4882  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4883  }
4884  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4885  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4886  } else {
4887  UNREACHABLE();
4888  }
4889 
4890  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4891  size_t input_index = 0;
4892  size_t arg_index = 0;
4893  const auto table_func_args = table_function_impl.getInputArgs();
4894  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
4895  for (const auto& ti : table_function_type_infos) {
4896  if (ti.is_column_list()) {
4897  for (int i = 0; i < ti.get_dimension(); i++) {
4898  auto& input_expr = input_exprs[input_index];
4899  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4900  CHECK(col_var);
4901 
4902  // avoid setting type info to ti here since ti doesn't have all the
4903  // properties correctly set
4904  auto type_info = input_expr->get_type_info();
4905  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4906  type_info.set_type(ti.get_type()); // set type to column list
4907  type_info.set_dimension(ti.get_dimension());
4908  input_expr->set_type_info(type_info);
4909 
4910  input_col_exprs.push_back(col_var);
4911  input_index++;
4912  }
4913  } else if (ti.is_column()) {
4914  auto& input_expr = input_exprs[input_index];
4915  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4916  CHECK(col_var);
4917 
4918  // same here! avoid setting type info to ti since it doesn't have all the
4919  // properties correctly set
4920  auto type_info = input_expr->get_type_info();
4921  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4922  type_info.set_type(ti.get_type()); // set type to column
4923  input_expr->set_type_info(type_info);
4924 
4925  input_col_exprs.push_back(col_var);
4926  input_index++;
4927  } else {
4928  auto input_expr = input_exprs[input_index];
4929  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
4930  if (ext_func_arg_ti != input_expr->get_type_info()) {
4931  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
4932  }
4933  input_index++;
4934  }
4935  arg_index++;
4936  }
4937  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4938  std::vector<Analyzer::Expr*> table_func_outputs;
4939  constexpr int32_t transient_pos{-1};
4940  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4941  auto ti = table_function_impl.getOutputSQLType(i);
4942  if (ti.is_dict_encoded_string()) {
4943  auto p = table_function_impl.getInputID(i);
4944 
4945  int32_t input_pos = p.first;
4946  if (input_pos == transient_pos) {
4947  ti.set_comp_param(TRANSIENT_DICT_ID);
4948  } else {
4949  // Iterate over the list of arguments to compute the offset. Use this offset to
4950  // get the corresponding input
4951  int32_t offset = 0;
4952  for (int j = 0; j < input_pos; j++) {
4953  const auto ti = table_function_type_infos[j];
4954  offset += ti.is_column_list() ? ti.get_dimension() : 1;
4955  }
4956  input_pos = offset + p.second;
4957 
4958  CHECK_LT(input_pos, input_exprs.size());
4959  int32_t comp_param =
4960  input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4961  ti.set_comp_param(comp_param);
4962  }
4963  }
4964  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4965  table_func_outputs.push_back(target_exprs_owned_.back().get());
4966  }
4967  const TableFunctionExecutionUnit exe_unit = {
4968  input_descs,
4969  input_col_descs,
4970  input_exprs, // table function inputs
4971  input_col_exprs, // table function column inputs (duplicates w/ above)
4972  table_func_outputs, // table function projected exprs
4973  output_row_sizing_param, // output buffer sizing param
4974  table_function_impl};
4975  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4976  rel_table_func->setOutputMetainfo(targets_meta);
4977  return {exe_unit, rel_table_func};
4978 }
4979 
4980 namespace {
4981 
4982 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4984  const RelAlgTranslator& translator,
4985  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4986  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4987  std::vector<TargetMetaInfo> in_metainfo;
4988  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4989  const auto data_sink_node = get_data_sink(filter);
4990  auto input_it = inputs_owned.begin();
4991  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4992  const auto source = data_sink_node->getInput(nest_level);
4993  const auto scan_source = dynamic_cast<const RelScan*>(source);
4994  if (scan_source) {
4995  CHECK(source->getOutputMetainfo().empty());
4996  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4997  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4998  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4999  }
5000  const auto source_metadata =
5001  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
5002  in_metainfo.insert(
5003  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5004  exprs_owned.insert(
5005  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5006  } else {
5007  const auto& source_metadata = source->getOutputMetainfo();
5008  input_it += source_metadata.size();
5009  in_metainfo.insert(
5010  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
5011  const auto scalar_sources_owned = synthesize_inputs(
5012  data_sink_node, nest_level, source_metadata, input_to_nest_level);
5013  exprs_owned.insert(
5014  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
5015  }
5016  }
5017  return std::make_pair(in_metainfo, exprs_owned);
5018 }
5019 
5020 } // namespace
5021 
5023  const SortInfo& sort_info,
5024  const bool just_explain) {
5025  CHECK_EQ(size_t(1), filter->inputCount());
5026  std::vector<InputDescriptor> input_descs;
5027  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5028  std::vector<TargetMetaInfo> in_metainfo;
5029  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5030  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5031 
5032  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5033  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5034  get_input_desc(filter, input_to_nest_level, {}, cat_);
5035  const auto join_type = get_join_type(filter);
5036  RelAlgTranslator translator(cat_,
5037  query_state_,
5038  executor_,
5039  input_to_nest_level,
5040  {join_type},
5041  now_,
5042  just_explain);
5043  std::tie(in_metainfo, target_exprs_owned) =
5044  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5045  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
5046  const auto query_infos = get_table_infos(input_descs, executor_);
5047 
5048  const auto qual = fold_expr(filter_expr.get());
5049  target_exprs_owned_.insert(
5050  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5051 
5052  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5053  filter->setOutputMetainfo(in_metainfo);
5054  const auto rewritten_qual = rewrite_expr(qual.get());
5055  auto query_hint = RegisteredQueryHint::defaults();
5056  if (query_dag_) {
5057  auto candidate = query_dag_->getQueryHint(filter);
5058  if (candidate) {
5059  query_hint = *candidate;
5060  }
5061  }
5062  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5063  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5064  return {{input_descs,
5065  input_col_descs,
5066  {},
5067  {rewritten_qual ? rewritten_qual : qual},
5068  {},
5069  {nullptr},
5070  target_exprs,
5071  nullptr,
5072  sort_info,
5073  0,
5074  query_hint,
5075  filter->getQueryPlanDagHash(),
5076  join_info.hash_table_plan_dag,
5077  join_info.table_id_to_node_map},
5078  filter,
5080  nullptr};
5081 }
5082 
5084 
5086  if (auto foreign_storage_mgr =
5088  // Parallelism hints need to be reset to empty so that we don't accidentally re-use
5089  // them. This can cause attempts to fetch strings that do not shard to the correct
5090  // node in distributed mode.
5091  foreign_storage_mgr->setParallelismHints({});
5092  }
5093 }
5094 
5096  CHECK(executor_);
5097  const auto phys_inputs = get_physical_inputs(cat_, ra);
5098  const auto phys_table_ids = get_physical_table_inputs(ra);
5099  executor_->setCatalog(&cat_);
5100  executor_->setupCaching(phys_inputs, phys_table_ids);
5101 }
5102 
5104  const auto& ra = query_dag_->getRootNode();
5106 }
5107 
5108 std::unordered_set<int> RelAlgExecutor::getPhysicalTableIds() const {
5110 }
5111 
5114 }
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
static void invalidateCachesByTable(size_t table_key)
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:1596
int64_t queue_time_ms_
TextEncodingCastCounts(const size_t text_decoding_casts, const size_t text_encoding_casts)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
TextEncodingCastCounts visitUOper(const Analyzer::UOper *u_oper) const override
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:136
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:512
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
std::string cat(Ts &&...args)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
const Expr * get_escape_expr() const
Definition: Analyzer.h:906
bool has_valid_query_plan_dag(const RelAlgNode *node)
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1622
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1351
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
Definition: sqltypes.h:49
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)