OmniSciDB  16c4e035a1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
20 #include "Parser/ParserNode.h"
37 #include "QueryEngine/RexVisitor.h"
41 #include "Shared/measure.h"
42 #include "Shared/misc.h"
43 #include "Shared/shard_key.h"
44 
45 #include <boost/algorithm/cxx11/any_of.hpp>
46 #include <boost/range/adaptor/reversed.hpp>
47 
48 #include <algorithm>
49 #include <functional>
50 #include <numeric>
51 
53 bool g_enable_interop{false};
54 bool g_enable_union{false};
58 
59 extern bool g_enable_bump_allocator;
61 extern bool g_enable_system_tables;
62 
63 namespace {
64 
65 bool node_is_aggregate(const RelAlgNode* ra) {
66  const auto compound = dynamic_cast<const RelCompound*>(ra);
67  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
68  return ((compound && compound->isAggregate()) || aggregate);
69 }
70 
71 std::unordered_set<PhysicalInput> get_physical_inputs(
73  const RelAlgNode* ra) {
74  auto phys_inputs = get_physical_inputs(ra);
75  std::unordered_set<PhysicalInput> phys_inputs2;
76  for (auto& phi : phys_inputs) {
77  phys_inputs2.insert(
78  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
79  }
80  return phys_inputs2;
81 }
82 
83 void set_parallelism_hints(const RelAlgNode& ra_node,
84  const Catalog_Namespace::Catalog& catalog) {
85  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
86  parallelism_hints_per_table;
87  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
88  int table_id = physical_input.table_id;
89  auto table = catalog.getMetadataForTable(table_id, false);
90  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
91  !table->is_system_table) {
92  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
93  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
94  auto foreign_table = catalog.getForeignTable(table_id);
95  for (const auto& fragment :
96  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
97  Chunk_NS::Chunk chunk{col_desc};
98  ChunkKey chunk_key = {
99  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
100  // do not include chunk hints that are in CPU memory
101  if (!chunk.isChunkOnDevice(
102  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
103  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
105  fragment.fragmentId});
106  }
107  }
108  }
109  }
110  if (!parallelism_hints_per_table.empty()) {
111  auto foreign_storage_mgr =
113  CHECK(foreign_storage_mgr);
114  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
115  }
116 }
117 
119  const Catalog_Namespace::Catalog& catalog) {
120  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
121  int table_id = physical_input.table_id;
122  auto table = catalog.getMetadataForTable(table_id, false);
123  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
124  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
125  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
126  auto foreign_table = catalog.getForeignTable(table_id);
127  if (col_desc->columnType.is_dict_encoded_type()) {
128  CHECK(foreign_table->fragmenter != nullptr);
129  for (const auto& fragment :
130  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
131  ChunkKey chunk_key = {
132  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
133  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
134  CHECK(metadata_map.find(col_id) != metadata_map.end());
135  if (foreign_storage::is_metadata_placeholder(*(metadata_map.at(col_id)))) {
136  // When this goes out of scope it will stay in CPU cache but become
137  // evictable
138  std::shared_ptr<Chunk_NS::Chunk> chunk =
139  Chunk_NS::Chunk::getChunk(col_desc,
140  &(catalog.getDataMgr()),
141  chunk_key,
143  0,
144  0,
145  0);
146  }
147  }
148  }
149  }
150  }
151 }
152 
154  const Catalog_Namespace::Catalog& catalog) {
155  // Iterate through ra_node inputs for types that need to be loaded pre-execution
156  // If they do not have valid metadata, load them into CPU memory to generate
157  // the metadata and leave them ready to be used by the query
158  set_parallelism_hints(ra_node, catalog);
159  prepare_string_dictionaries(ra_node, catalog);
160 }
161 
163  const Catalog_Namespace::Catalog& catalog,
164  const CompilationOptions& co) {
166  std::map<int32_t, std::vector<int32_t>> system_table_columns_by_table_id;
167  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
168  int table_id = physical_input.table_id;
169  auto table = catalog.getMetadataForTable(table_id, false);
170  if (table && table->is_system_table) {
171  auto column_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
172  system_table_columns_by_table_id[table_id].emplace_back(column_id);
173  }
174  }
175  // Execute on CPU for queries involving system tables
176  if (!system_table_columns_by_table_id.empty() &&
178  throw QueryMustRunOnCpu();
179  }
180 
181  for (const auto& [table_id, column_ids] : system_table_columns_by_table_id) {
182  // Clear any previously cached data, since system tables depend on point in
183  // time data snapshots.
185  ChunkKey{catalog.getDatabaseId(), table_id}, Data_Namespace::CPU_LEVEL);
186  auto td = catalog.getMetadataForTable(table_id);
187  CHECK(td);
188  CHECK(td->fragmenter);
189  auto fragment_count = td->fragmenter->getFragmentsForQuery().fragments.size();
190  CHECK_LE(fragment_count, static_cast<size_t>(1));
191  if (fragment_count > 0) {
192  for (auto column_id : column_ids) {
193  // Prefetch system table chunks in order to force chunk statistics metadata
194  // computation.
195  auto cd = catalog.getMetadataForColumn(table_id, column_id);
196  ChunkKey chunk_key{catalog.getDatabaseId(), table_id, column_id, 0};
198  cd, &(catalog.getDataMgr()), chunk_key, Data_Namespace::CPU_LEVEL, 0, 0, 0);
199  }
200  }
201  }
202  }
203 }
204 
206  return !dag.contain_not_supported_rel_node &&
207  dag.extracted_dag.compare(EMPTY_QUERY_PLAN) != 0;
208 }
209 
210 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
211  public:
212  std::vector<unsigned> visitLeftDeepInnerJoin(
213  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
214  return {left_deep_join_tree->getId()};
215  }
216 
217  protected:
218  std::vector<unsigned> aggregateResult(
219  const std::vector<unsigned>& aggregate,
220  const std::vector<unsigned>& next_result) const override {
221  auto result = aggregate;
222  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
223  return result;
224  }
225 };
226 
227 } // namespace
228 
230  const ExecutionOptions& eo) {
231  if (eo.find_push_down_candidates) {
232  return 0;
233  }
234 
235  if (eo.just_explain) {
236  return 0;
237  }
238 
239  CHECK(query_dag_);
240 
241  query_dag_->resetQueryExecutionState();
242  const auto& ra = query_dag_->getRootNode();
243 
244  auto lock = executor_->acquireExecuteMutex();
245  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
246  const auto phys_inputs = get_physical_inputs(cat_, &ra);
247  const auto phys_table_ids = get_physical_table_inputs(&ra);
248  executor_->setCatalog(&cat_);
249  executor_->setupCaching(phys_inputs, phys_table_ids);
250 
251  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
252  auto ed_seq = RaExecutionSequence(&ra);
253 
254  if (!getSubqueries().empty()) {
255  return 0;
256  }
257 
258  CHECK(!ed_seq.empty());
259  if (ed_seq.size() > 1) {
260  return 0;
261  }
262 
265  executor_->setCatalog(&cat_);
266  executor_->temporary_tables_ = &temporary_tables_;
267 
269  auto exec_desc_ptr = ed_seq.getDescriptor(0);
270  CHECK(exec_desc_ptr);
271  auto& exec_desc = *exec_desc_ptr;
272  const auto body = exec_desc.getBody();
273  if (body->isNop()) {
274  return 0;
275  }
276 
277  const auto project = dynamic_cast<const RelProject*>(body);
278  if (project) {
279  auto work_unit =
280  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
281 
282  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
283  executor_);
284  }
285 
286  const auto compound = dynamic_cast<const RelCompound*>(body);
287  if (compound) {
288  if (compound->isDeleteViaSelect()) {
289  return 0;
290  } else if (compound->isUpdateViaSelect()) {
291  return 0;
292  } else {
293  if (compound->isAggregate()) {
294  return 0;
295  }
296 
297  const auto work_unit =
298  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
299 
300  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
301  executor_);
302  }
303  }
304 
305  return 0;
306 }
307 
309  const ExecutionOptions& eo,
310  const bool just_explain_plan,
311  RenderInfo* render_info) {
312  CHECK(query_dag_);
313  auto timer = DEBUG_TIMER(__func__);
315 
316  auto run_query = [&](const CompilationOptions& co_in) {
317  auto execution_result =
318  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
319 
320  constexpr bool vlog_result_set_summary{false};
321  if constexpr (vlog_result_set_summary) {
322  VLOG(1) << execution_result.getRows()->summaryToString();
323  }
324 
326  VLOG(1) << "Running post execution callback.";
327  (*post_execution_callback_)();
328  }
329  return execution_result;
330  };
331 
332  try {
333  return run_query(co);
334  } catch (const QueryMustRunOnCpu&) {
335  if (!g_allow_cpu_retry) {
336  throw;
337  }
338  }
339  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
340  auto co_cpu = CompilationOptions::makeCpuOnly(co);
341 
342  if (render_info) {
343  render_info->setForceNonInSituData();
344  }
345  return run_query(co_cpu);
346 }
347 
349  const ExecutionOptions& eo,
350  const bool just_explain_plan,
351  RenderInfo* render_info) {
353  auto timer = DEBUG_TIMER(__func__);
354  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
355 
356  query_dag_->resetQueryExecutionState();
357  const auto& ra = query_dag_->getRootNode();
358 
359  // capture the lock acquistion time
360  auto clock_begin = timer_start();
362  executor_->resetInterrupt();
363  }
364  std::string query_session{""};
365  std::string query_str{"N/A"};
366  std::string query_submitted_time{""};
367  // gather necessary query's info
368  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
369  query_session = query_state_->getConstSessionInfo()->get_session_id();
370  query_str = query_state_->getQueryStr();
371  query_submitted_time = query_state_->getQuerySubmittedTime();
372  }
373 
374  auto validate_or_explain_query =
375  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
376  auto interruptable = !render_info && !query_session.empty() &&
377  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
378  if (interruptable) {
379  // if we reach here, the current query which was waiting an idle executor
380  // within the dispatch queue is now scheduled to the specific executor
381  // (not UNITARY_EXECUTOR)
382  // so we update the query session's status with the executor that takes this query
383  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
384  query_session, query_str, query_submitted_time);
385 
386  // now the query is going to be executed, so update the status as
387  // "RUNNING_QUERY_KERNEL"
388  executor_->updateQuerySessionStatus(
389  query_session,
390  query_submitted_time,
391  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
392  }
393 
394  // so it should do cleanup session info after finishing its execution
395  ScopeGuard clearQuerySessionInfo =
396  [this, &query_session, &interruptable, &query_submitted_time] {
397  // reset the runtime query interrupt status after the end of query execution
398  if (interruptable) {
399  // cleanup running session's info
400  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
401  }
402  };
403 
404  auto acquire_execute_mutex = [](Executor * executor) -> auto {
405  auto ret = executor->acquireExecuteMutex();
406  return ret;
407  };
408  // now we acquire executor lock in here to make sure that this executor holds
409  // all necessary resources and at the same time protect them against other executor
410  auto lock = acquire_execute_mutex(executor_);
411 
412  if (interruptable) {
413  // check whether this query session is "already" interrupted
414  // this case occurs when there is very short gap between being interrupted and
415  // taking the execute lock
416  // if so we have to remove "all" queries initiated by this session and we do in here
417  // without running the query
418  try {
419  executor_->checkPendingQueryStatus(query_session);
420  } catch (QueryExecutionError& e) {
422  throw std::runtime_error("Query execution has been interrupted (pending query)");
423  }
424  throw e;
425  } catch (...) {
426  throw std::runtime_error("Checking pending query status failed: unknown error");
427  }
428  }
429  int64_t queue_time_ms = timer_stop(clock_begin);
430 
432 
433  // Notify foreign tables to load prior to caching
435 
436  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
437  const auto phys_inputs = get_physical_inputs(cat_, &ra);
438  const auto phys_table_ids = get_physical_table_inputs(&ra);
439  executor_->setCatalog(&cat_);
440  executor_->setupCaching(phys_inputs, phys_table_ids);
441 
442  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
443  auto ed_seq = RaExecutionSequence(&ra);
444 
445  if (just_explain_plan) {
446  std::stringstream ss;
447  std::vector<const RelAlgNode*> nodes;
448  for (size_t i = 0; i < ed_seq.size(); i++) {
449  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
450  }
451  size_t ctr = nodes.size();
452  size_t tab_ctr = 0;
453  for (auto& body : boost::adaptors::reverse(nodes)) {
454  const auto index = ctr--;
455  const auto tabs = std::string(tab_ctr++, '\t');
456  CHECK(body);
457  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
458  if (auto sort = dynamic_cast<const RelSort*>(body)) {
459  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
460  }
461  if (dynamic_cast<const RelProject*>(body) ||
462  dynamic_cast<const RelCompound*>(body)) {
463  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
464  ss << tabs << " : " << join->toString() << "\n";
465  }
466  }
467  }
468  const auto& subqueries = getSubqueries();
469  if (!subqueries.empty()) {
470  ss << "Subqueries: "
471  << "\n";
472  for (const auto& subquery : subqueries) {
473  const auto ra = subquery->getRelAlg();
474  ss << "\t" << ra->toString() << "\n";
475  }
476  }
477  auto rs = std::make_shared<ResultSet>(ss.str());
478  return {rs, {}};
479  }
480 
481  if (render_info) {
482  // set render to be non-insitu in certain situations.
484  ed_seq.size() > 1) {
485  // old logic
486  // disallow if more than one ED
487  render_info->setInSituDataIfUnset(false);
488  }
489  }
490 
491  if (eo.find_push_down_candidates) {
492  // this extra logic is mainly due to current limitations on multi-step queries
493  // and/or subqueries.
495  ed_seq, co, eo, render_info, queue_time_ms);
496  }
497  timer_setup.stop();
498 
499  // Dispatch the subqueries first
500  for (auto subquery : getSubqueries()) {
501  const auto subquery_ra = subquery->getRelAlg();
502  CHECK(subquery_ra);
503  if (subquery_ra->hasContextData()) {
504  continue;
505  }
506  // Execute the subquery and cache the result.
507  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
508  RaExecutionSequence subquery_seq(subquery_ra);
509  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
510  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
511  }
512  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
513 }
514 
516  AggregatedColRange agg_col_range_cache;
517  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
518  return executor_->computeColRangesCache(phys_inputs);
519 }
520 
522  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
523  return executor_->computeStringDictionaryGenerations(phys_inputs);
524 }
525 
527  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
528  return executor_->computeTableGenerations(phys_table_ids);
529 }
530 
531 Executor* RelAlgExecutor::getExecutor() const {
532  return executor_;
533 }
534 
536  CHECK(executor_);
537  executor_->row_set_mem_owner_ = nullptr;
538 }
539 
540 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
542  auto sort_node = dynamic_cast<const RelSort*>(root_node);
543  if (sort_node) {
544  // we assume that test query that needs join info does not contain any sort node
545  return {};
546  }
547  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
548  RelLeftDeepTreeIdsCollector visitor;
549  auto left_deep_tree_ids = visitor.visit(root_node);
550  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
551 }
552 
553 namespace {
554 
556  CHECK_EQ(size_t(1), sort->inputCount());
557  const auto source = sort->getInput(0);
558  if (dynamic_cast<const RelSort*>(source)) {
559  throw std::runtime_error("Sort node not supported as input to another sort");
560  }
561 }
562 
563 } // namespace
564 
566  const RaExecutionSequence& seq,
567  const size_t step_idx,
568  const CompilationOptions& co,
569  const ExecutionOptions& eo,
570  RenderInfo* render_info) {
571  INJECT_TIMER(executeRelAlgQueryStep);
572 
573  auto exe_desc_ptr = seq.getDescriptor(step_idx);
574  CHECK(exe_desc_ptr);
575  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
576 
577  size_t shard_count{0};
578  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
579  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
580  };
581 
582  if (sort) {
584  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
586  source_work_unit.exe_unit, *executor_->getCatalog());
587  if (!shard_count) {
588  // No point in sorting on the leaf, only execute the input to the sort node.
589  CHECK_EQ(size_t(1), sort->inputCount());
590  const auto source = sort->getInput(0);
591  if (sort->collationCount() || node_is_aggregate(source)) {
592  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
593  CHECK_EQ(temp_seq.size(), size_t(1));
594  ExecutionOptions eo_copy = {
596  eo.allow_multifrag,
597  eo.just_explain,
598  eo.allow_loop_joins,
599  eo.with_watchdog,
600  eo.jit_debug,
601  eo.just_validate || sort->isEmptyResult(),
610  eo.executor_type,
611  };
612  // Use subseq to avoid clearing existing temporary tables
613  return {
614  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
615  merge_type(source),
616  source->getId(),
617  false};
618  }
619  }
620  }
623  std::make_pair(step_idx, step_idx + 1),
624  co,
625  eo,
626  render_info,
628  merge_type(exe_desc_ptr->getBody()),
629  exe_desc_ptr->getBody()->getId(),
630  false};
632  VLOG(1) << "Running post execution callback.";
633  (*post_execution_callback_)();
634  }
635  return result;
636 }
637 
639  const AggregatedColRange& agg_col_range,
640  const StringDictionaryGenerations& string_dictionary_generations,
641  const TableGenerations& table_generations) {
642  // capture the lock acquistion time
643  auto clock_begin = timer_start();
645  executor_->resetInterrupt();
646  }
647  queue_time_ms_ = timer_stop(clock_begin);
648  executor_->row_set_mem_owner_ =
649  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
650  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
651  executor_->table_generations_ = table_generations;
652  executor_->agg_col_range_cache_ = agg_col_range;
653 }
654 
656  const CompilationOptions& co,
657  const ExecutionOptions& eo,
658  RenderInfo* render_info,
659  const int64_t queue_time_ms,
660  const bool with_existing_temp_tables) {
662  auto timer = DEBUG_TIMER(__func__);
663  if (!with_existing_temp_tables) {
665  }
668  executor_->setCatalog(&cat_);
669  executor_->temporary_tables_ = &temporary_tables_;
670 
671  time(&now_);
672  CHECK(!seq.empty());
673 
674  auto get_descriptor_count = [&seq, &eo]() -> size_t {
675  if (eo.just_explain) {
676  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
677  // run the logical values descriptor to generate the result set, then the next
678  // descriptor to generate the explain
679  CHECK_GE(seq.size(), size_t(2));
680  return 2;
681  } else {
682  return 1;
683  }
684  } else {
685  return seq.size();
686  }
687  };
688 
689  const auto exec_desc_count = get_descriptor_count();
690  // this join info needs to be maintained throughout an entire query runtime
691  for (size_t i = 0; i < exec_desc_count; i++) {
692  VLOG(1) << "Executing query step " << i;
693  // only render on the last step
694  try {
695  executeRelAlgStep(seq,
696  i,
697  co,
698  eo,
699  (i == exec_desc_count - 1) ? render_info : nullptr,
700  queue_time_ms);
701  } catch (const QueryMustRunOnCpu&) {
702  // Do not allow per-step retry if flag is off or in distributed mode
703  // TODO(todd): Determine if and when we can relax this restriction
704  // for distributed
707  throw;
708  }
709  LOG(INFO) << "Retrying current query step " << i << " on CPU";
710  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
711  executeRelAlgStep(seq,
712  i,
713  co_cpu,
714  eo,
715  (i == exec_desc_count - 1) ? render_info : nullptr,
716  queue_time_ms);
717  } catch (const NativeExecutionError&) {
718  if (!g_enable_interop) {
719  throw;
720  }
721  auto eo_extern = eo;
722  eo_extern.executor_type = ::ExecutorType::Extern;
723  auto exec_desc_ptr = seq.getDescriptor(i);
724  const auto body = exec_desc_ptr->getBody();
725  const auto compound = dynamic_cast<const RelCompound*>(body);
726  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
727  LOG(INFO) << "Also failed to run the query using interoperability";
728  throw;
729  }
730  executeRelAlgStep(seq,
731  i,
732  co,
733  eo_extern,
734  (i == exec_desc_count - 1) ? render_info : nullptr,
735  queue_time_ms);
736  }
737  }
738 
739  return seq.getDescriptor(exec_desc_count - 1)->getResult();
740 }
741 
743  const RaExecutionSequence& seq,
744  const std::pair<size_t, size_t> interval,
745  const CompilationOptions& co,
746  const ExecutionOptions& eo,
747  RenderInfo* render_info,
748  const int64_t queue_time_ms) {
750  executor_->setCatalog(&cat_);
751  executor_->temporary_tables_ = &temporary_tables_;
753  time(&now_);
754  for (size_t i = interval.first; i < interval.second; i++) {
755  // only render on the last step
756  try {
757  executeRelAlgStep(seq,
758  i,
759  co,
760  eo,
761  (i == interval.second - 1) ? render_info : nullptr,
762  queue_time_ms);
763  } catch (const QueryMustRunOnCpu&) {
764  // Do not allow per-step retry if flag is off or in distributed mode
765  // TODO(todd): Determine if and when we can relax this restriction
766  // for distributed
769  throw;
770  }
771  LOG(INFO) << "Retrying current query step " << i << " on CPU";
772  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
773  executeRelAlgStep(seq,
774  i,
775  co_cpu,
776  eo,
777  (i == interval.second - 1) ? render_info : nullptr,
778  queue_time_ms);
779  }
780  }
781 
782  return seq.getDescriptor(interval.second - 1)->getResult();
783 }
784 
786  const size_t step_idx,
787  const CompilationOptions& co,
788  const ExecutionOptions& eo,
789  RenderInfo* render_info,
790  const int64_t queue_time_ms) {
792  auto timer = DEBUG_TIMER(__func__);
794  auto exec_desc_ptr = seq.getDescriptor(step_idx);
795  CHECK(exec_desc_ptr);
796  auto& exec_desc = *exec_desc_ptr;
797  const auto body = exec_desc.getBody();
798  if (body->isNop()) {
799  handleNop(exec_desc);
800  return;
801  }
802 
803  const ExecutionOptions eo_work_unit{
805  eo.allow_multifrag,
806  eo.just_explain,
807  eo.allow_loop_joins,
808  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
809  eo.jit_debug,
810  eo.just_validate,
819  eo.executor_type,
820  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
821 
822  auto handle_hint = [co,
823  eo_work_unit,
824  body,
825  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
826  ExecutionOptions eo_hint_applied = eo_work_unit;
827  CompilationOptions co_hint_applied = co;
828  auto target_node = body;
829  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
830  target_node = sort_body->getInput(0);
831  }
832  auto query_hints = getParsedQueryHint(target_node);
833  auto columnar_output_hint_enabled = false;
834  auto rowwise_output_hint_enabled = false;
835  if (query_hints) {
836  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
837  VLOG(1) << "A user forces to run the query on the CPU execution mode";
838  co_hint_applied.device_type = ExecutorDeviceType::CPU;
839  }
840  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
841  VLOG(1) << "A user forces the query to run with columnar output";
842  columnar_output_hint_enabled = true;
843  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
844  VLOG(1) << "A user forces the query to run with rowwise output";
845  rowwise_output_hint_enabled = true;
846  }
847  }
848  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
849  ? !rowwise_output_hint_enabled
850  : columnar_output_hint_enabled;
851  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
852  LOG(INFO) << "Currently, we do not support applying query hint to change query "
853  "output layout in distributed mode.";
854  }
855  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
856  return std::make_pair(co_hint_applied, eo_hint_applied);
857  };
858 
859  // Notify foreign tables to load prior to execution
861  auto hint_applied = handle_hint();
862  const auto compound = dynamic_cast<const RelCompound*>(body);
863  if (compound) {
864  if (compound->isDeleteViaSelect()) {
865  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
866  } else if (compound->isUpdateViaSelect()) {
867  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
868  } else {
869  exec_desc.setResult(executeCompound(
870  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
871  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
872  << static_cast<int>(-compound->getId()) << ", ...)"
873  << " exec_desc.getResult().getDataPtr()->rowCount()="
874  << exec_desc.getResult().getDataPtr()->rowCount();
875  if (exec_desc.getResult().isFilterPushDownEnabled()) {
876  return;
877  }
878  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
879  }
880  return;
881  }
882  const auto project = dynamic_cast<const RelProject*>(body);
883  if (project) {
884  if (project->isDeleteViaSelect()) {
885  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
886  } else if (project->isUpdateViaSelect()) {
887  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
888  } else {
889  std::optional<size_t> prev_count;
890  // Disabling the intermediate count optimization in distributed, as the previous
891  // execution descriptor will likely not hold the aggregated result.
892  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
893  // If the previous node produced a reliable count, skip the pre-flight count.
894  RelAlgNode const* const prev_body = project->getInput(0);
895  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
896  if (RaExecutionDesc const* const prev_exec_desc =
897  prev_body->hasContextData()
898  ? prev_body->getContextData()
899  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
900  const auto& prev_exe_result = prev_exec_desc->getResult();
901  const auto prev_result = prev_exe_result.getRows();
902  if (prev_result) {
903  prev_count = prev_result->rowCount();
904  VLOG(3) << "Setting output row count for projection node to previous node ("
905  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
906  }
907  }
908  }
909  }
910  exec_desc.setResult(executeProject(project,
911  hint_applied.first,
912  hint_applied.second,
913  render_info,
914  queue_time_ms,
915  prev_count));
916  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
917  << static_cast<int>(-project->getId()) << ", ...)"
918  << " exec_desc.getResult().getDataPtr()->rowCount()="
919  << exec_desc.getResult().getDataPtr()->rowCount();
920  if (exec_desc.getResult().isFilterPushDownEnabled()) {
921  return;
922  }
923  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
924  }
925  return;
926  }
927  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
928  if (aggregate) {
929  exec_desc.setResult(executeAggregate(
930  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
931  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
932  return;
933  }
934  const auto filter = dynamic_cast<const RelFilter*>(body);
935  if (filter) {
936  exec_desc.setResult(executeFilter(
937  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
938  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
939  return;
940  }
941  const auto sort = dynamic_cast<const RelSort*>(body);
942  if (sort) {
943  exec_desc.setResult(executeSort(
944  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
945  if (exec_desc.getResult().isFilterPushDownEnabled()) {
946  return;
947  }
948  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
949  return;
950  }
951  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
952  if (logical_values) {
953  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
954  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
955  return;
956  }
957  const auto modify = dynamic_cast<const RelModify*>(body);
958  if (modify) {
959  exec_desc.setResult(executeModify(modify, hint_applied.second));
960  return;
961  }
962  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
963  if (logical_union) {
964  exec_desc.setResult(executeUnion(logical_union,
965  seq,
966  hint_applied.first,
967  hint_applied.second,
968  render_info,
969  queue_time_ms));
970  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
971  return;
972  }
973  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
974  if (table_func) {
975  exec_desc.setResult(executeTableFunction(
976  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
977  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
978  return;
979  }
980  LOG(FATAL) << "Unhandled body type: " << body->toString();
981 }
982 
984  // just set the result of the previous node as the result of no op
985  auto body = ed.getBody();
986  CHECK(dynamic_cast<const RelAggregate*>(body));
987  CHECK_EQ(size_t(1), body->inputCount());
988  const auto input = body->getInput(0);
989  body->setOutputMetainfo(input->getOutputMetainfo());
990  const auto it = temporary_tables_.find(-input->getId());
991  CHECK(it != temporary_tables_.end());
992  // set up temp table as it could be used by the outer query or next step
993  addTemporaryTable(-body->getId(), it->second);
994 
995  ed.setResult({it->second, input->getOutputMetainfo()});
996 }
997 
998 namespace {
999 
1000 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
1001  public:
1003 
1004  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
1005  return synthesized_physical_inputs_owned;
1006  }
1007 
1008  std::unordered_set<const RexInput*> visitInput(
1009  const RexInput* rex_input) const override {
1010  const auto input_ra = rex_input->getSourceNode();
1011  CHECK(input_ra);
1012  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
1013  if (scan_ra) {
1014  const auto td = scan_ra->getTableDescriptor();
1015  if (td) {
1016  const auto col_id = rex_input->getIndex();
1017  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
1018  if (cd && cd->columnType.get_physical_cols() > 0) {
1019  CHECK(IS_GEO(cd->columnType.get_type()));
1020  std::unordered_set<const RexInput*> synthesized_physical_inputs;
1021  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1022  auto physical_input =
1023  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1024  synthesized_physical_inputs_owned.emplace_back(physical_input);
1025  synthesized_physical_inputs.insert(physical_input);
1026  }
1027  return synthesized_physical_inputs;
1028  }
1029  }
1030  }
1031  return {rex_input};
1032  }
1033 
1034  protected:
1035  std::unordered_set<const RexInput*> aggregateResult(
1036  const std::unordered_set<const RexInput*>& aggregate,
1037  const std::unordered_set<const RexInput*>& next_result) const override {
1038  auto result = aggregate;
1039  result.insert(next_result.begin(), next_result.end());
1040  return result;
1041  }
1042 
1043  private:
1044  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1046 };
1047 
1048 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1049  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1050  return table_func;
1051  }
1052  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1053  CHECK_EQ(size_t(2), join->inputCount());
1054  return join;
1055  }
1056  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1057  CHECK_EQ(size_t(1), ra_node->inputCount());
1058  }
1059  auto only_src = ra_node->getInput(0);
1060  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1061  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1062  return is_join ? only_src : ra_node;
1063 }
1064 
1065 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1067  RexUsedInputsVisitor visitor(cat);
1068  const auto filter_expr = compound->getFilterExpr();
1069  std::unordered_set<const RexInput*> used_inputs =
1070  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1071  const auto sources_size = compound->getScalarSourcesSize();
1072  for (size_t i = 0; i < sources_size; ++i) {
1073  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1074  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1075  }
1076  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1077  return std::make_pair(used_inputs, used_inputs_owned);
1078 }
1079 
1080 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1082  CHECK_EQ(size_t(1), aggregate->inputCount());
1083  std::unordered_set<const RexInput*> used_inputs;
1084  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1085  const auto source = aggregate->getInput(0);
1086  const auto& in_metainfo = source->getOutputMetainfo();
1087  const auto group_count = aggregate->getGroupByCount();
1088  CHECK_GE(in_metainfo.size(), group_count);
1089  for (size_t i = 0; i < group_count; ++i) {
1090  auto synthesized_used_input = new RexInput(source, i);
1091  used_inputs_owned.emplace_back(synthesized_used_input);
1092  used_inputs.insert(synthesized_used_input);
1093  }
1094  for (const auto& agg_expr : aggregate->getAggExprs()) {
1095  for (size_t i = 0; i < agg_expr->size(); ++i) {
1096  const auto operand_idx = agg_expr->getOperand(i);
1097  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1098  auto synthesized_used_input = new RexInput(source, operand_idx);
1099  used_inputs_owned.emplace_back(synthesized_used_input);
1100  used_inputs.insert(synthesized_used_input);
1101  }
1102  }
1103  return std::make_pair(used_inputs, used_inputs_owned);
1104 }
1105 
1106 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1108  RexUsedInputsVisitor visitor(cat);
1109  std::unordered_set<const RexInput*> used_inputs;
1110  for (size_t i = 0; i < project->size(); ++i) {
1111  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1112  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1113  }
1114  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1115  return std::make_pair(used_inputs, used_inputs_owned);
1116 }
1117 
1118 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1121  RexUsedInputsVisitor visitor(cat);
1122  std::unordered_set<const RexInput*> used_inputs;
1123  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1124  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1125  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1126  }
1127  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1128  return std::make_pair(used_inputs, used_inputs_owned);
1129 }
1130 
1131 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1133  std::unordered_set<const RexInput*> used_inputs;
1134  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1135  const auto data_sink_node = get_data_sink(filter);
1136  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1137  const auto source = data_sink_node->getInput(nest_level);
1138  const auto scan_source = dynamic_cast<const RelScan*>(source);
1139  if (scan_source) {
1140  CHECK(source->getOutputMetainfo().empty());
1141  for (size_t i = 0; i < scan_source->size(); ++i) {
1142  auto synthesized_used_input = new RexInput(scan_source, i);
1143  used_inputs_owned.emplace_back(synthesized_used_input);
1144  used_inputs.insert(synthesized_used_input);
1145  }
1146  } else {
1147  const auto& partial_in_metadata = source->getOutputMetainfo();
1148  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1149  auto synthesized_used_input = new RexInput(source, i);
1150  used_inputs_owned.emplace_back(synthesized_used_input);
1151  used_inputs.insert(synthesized_used_input);
1152  }
1153  }
1154  }
1155  return std::make_pair(used_inputs, used_inputs_owned);
1156 }
1157 
1158 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1160  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1161  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1162  used_inputs_owned.reserve(logical_union->inputCount());
1163  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1164  auto const n_inputs = logical_union->inputCount();
1165  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1166  auto input = logical_union->getInput(nest_level);
1167  for (size_t i = 0; i < input->size(); ++i) {
1168  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1169  used_inputs.insert(used_inputs_owned.back().get());
1170  }
1171  }
1172  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1173 }
1174 
1175 int table_id_from_ra(const RelAlgNode* ra_node) {
1176  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1177  if (scan_ra) {
1178  const auto td = scan_ra->getTableDescriptor();
1179  CHECK(td);
1180  return td->tableId;
1181  }
1182  return -ra_node->getId();
1183 }
1184 
1185 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1186  const RelAlgNode* ra_node,
1187  const std::vector<size_t>& input_permutation) {
1188  const auto data_sink_node = get_data_sink(ra_node);
1189  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1190  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1191  const auto input_node_idx =
1192  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1193  const auto input_ra = data_sink_node->getInput(input_node_idx);
1194  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1195  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1196  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1197  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1198  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1199  CHECK(it_ok.second);
1200  LOG_IF(INFO, !input_permutation.empty())
1201  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1202  }
1203  return input_to_nest_level;
1204 }
1205 
1206 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1209  const auto data_sink_node = get_data_sink(ra_node);
1210  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1211  CHECK_EQ(join->inputCount(), 2u);
1212  const auto condition = join->getCondition();
1213  RexUsedInputsVisitor visitor(cat);
1214  auto condition_inputs = visitor.visit(condition);
1215  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1216  visitor.get_inputs_owned());
1217  return std::make_pair(condition_inputs, condition_inputs_owned);
1218  }
1219 
1220  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1221  CHECK_GE(left_deep_join->inputCount(), 2u);
1222  const auto condition = left_deep_join->getInnerCondition();
1223  RexUsedInputsVisitor visitor(cat);
1224  auto result = visitor.visit(condition);
1225  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1226  ++nesting_level) {
1227  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1228  if (outer_condition) {
1229  const auto outer_result = visitor.visit(outer_condition);
1230  result.insert(outer_result.begin(), outer_result.end());
1231  }
1232  }
1233  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1234  return std::make_pair(result, used_inputs_owned);
1235  }
1236 
1237  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1238  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1239  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1240  // no-op
1241  CHECK_GE(ra_node->inputCount(), 0u) << ra_node->toString();
1242  } else {
1243  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1244  }
1245  return std::make_pair(std::unordered_set<const RexInput*>{},
1246  std::vector<std::shared_ptr<RexInput>>{});
1247 }
1248 
1250  std::vector<InputDescriptor>& input_descs,
1252  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1253  const RelAlgNode* ra_node,
1254  const std::unordered_set<const RexInput*>& source_used_inputs,
1255  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1256  VLOG(3) << "ra_node=" << ra_node->toString()
1257  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1258  << " source_used_inputs.size()=" << source_used_inputs.size();
1259  for (const auto used_input : source_used_inputs) {
1260  const auto input_ra = used_input->getSourceNode();
1261  const int table_id = table_id_from_ra(input_ra);
1262  const auto col_id = used_input->getIndex();
1263  auto it = input_to_nest_level.find(input_ra);
1264  if (it != input_to_nest_level.end()) {
1265  const int input_desc = it->second;
1266  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1267  dynamic_cast<const RelScan*>(input_ra)
1268  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1269  : col_id,
1270  table_id,
1271  input_desc));
1272  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1273  throw std::runtime_error("Bushy joins not supported");
1274  }
1275  }
1276 }
1277 
1278 template <class RA>
1279 std::pair<std::vector<InputDescriptor>,
1280  std::list<std::shared_ptr<const InputColDescriptor>>>
1281 get_input_desc_impl(const RA* ra_node,
1282  const std::unordered_set<const RexInput*>& used_inputs,
1283  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1284  const std::vector<size_t>& input_permutation,
1286  std::vector<InputDescriptor> input_descs;
1287  const auto data_sink_node = get_data_sink(ra_node);
1288  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1289  const auto input_node_idx =
1290  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1291  auto input_ra = data_sink_node->getInput(input_node_idx);
1292  const int table_id = table_id_from_ra(input_ra);
1293  input_descs.emplace_back(table_id, input_idx);
1294  }
1295  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1296  collect_used_input_desc(input_descs,
1297  cat,
1298  input_col_descs_unique, // modified
1299  ra_node,
1300  used_inputs,
1301  input_to_nest_level);
1302  std::unordered_set<const RexInput*> join_source_used_inputs;
1303  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1304  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1305  get_join_source_used_inputs(ra_node, cat);
1306  collect_used_input_desc(input_descs,
1307  cat,
1308  input_col_descs_unique, // modified
1309  ra_node,
1310  join_source_used_inputs,
1311  input_to_nest_level);
1312  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1313  input_col_descs_unique.begin(), input_col_descs_unique.end());
1314 
1315  std::sort(input_col_descs.begin(),
1316  input_col_descs.end(),
1317  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1318  std::shared_ptr<const InputColDescriptor> const& rhs) {
1319  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1320  lhs->getColId(),
1321  lhs->getScanDesc().getTableId()) <
1322  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1323  rhs->getColId(),
1324  rhs->getScanDesc().getTableId());
1325  });
1326  return {input_descs,
1327  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1328  input_col_descs.end())};
1329 }
1330 
1331 template <class RA>
1332 std::tuple<std::vector<InputDescriptor>,
1333  std::list<std::shared_ptr<const InputColDescriptor>>,
1334  std::vector<std::shared_ptr<RexInput>>>
1335 get_input_desc(const RA* ra_node,
1336  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1337  const std::vector<size_t>& input_permutation,
1338  const Catalog_Namespace::Catalog& cat) {
1339  std::unordered_set<const RexInput*> used_inputs;
1340  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1341  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1342  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1343  auto input_desc_pair = get_input_desc_impl(
1344  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1345  return std::make_tuple(
1346  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1347 }
1348 
1349 size_t get_scalar_sources_size(const RelCompound* compound) {
1350  return compound->getScalarSourcesSize();
1351 }
1352 
1353 size_t get_scalar_sources_size(const RelProject* project) {
1354  return project->size();
1355 }
1356 
1357 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1358  return table_func->getTableFuncInputsSize();
1359 }
1360 
1361 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1362  return compound->getScalarSource(i);
1363 }
1364 
1365 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1366  return project->getProjectAt(i);
1367 }
1368 
1369 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1370  return table_func->getTableFuncInputAt(i);
1371 }
1372 
1373 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1374  const std::shared_ptr<Analyzer::Expr> expr) {
1375  const auto& ti = expr->get_type_info();
1376  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1377  return expr;
1378  }
1379  auto transient_dict_ti = ti;
1380  transient_dict_ti.set_compression(kENCODING_DICT);
1381  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1382  transient_dict_ti.set_fixed_size();
1383  return expr->add_cast(transient_dict_ti);
1384 }
1385 
1387  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1388  const std::shared_ptr<Analyzer::Expr>& expr) {
1389  try {
1390  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1391  } catch (...) {
1392  scalar_sources.push_back(fold_expr(expr.get()));
1393  }
1394 }
1395 
1396 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1397  const std::shared_ptr<Analyzer::Expr>& input) {
1398  const auto& input_ti = input->get_type_info();
1399  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1400  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1401  }
1402  return input;
1403 }
1404 
1405 template <class RA>
1406 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1407  const RA* ra_node,
1408  const RelAlgTranslator& translator,
1409  const ::ExecutorType executor_type) {
1410  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1411  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1412  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1413  << ") = " << scalar_sources_size;
1414  for (size_t i = 0; i < scalar_sources_size; ++i) {
1415  const auto scalar_rex = scalar_at(i, ra_node);
1416  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1417  // RexRef are synthetic scalars we append at the end of the real ones
1418  // for the sake of taking memory ownership, no real work needed here.
1419  continue;
1420  }
1421 
1422  const auto scalar_expr =
1423  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1424  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1425  if (executor_type == ExecutorType::Native) {
1426  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1427  } else if (executor_type == ExecutorType::TableFunctions) {
1428  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1429  } else {
1430  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1431  }
1432  }
1433 
1434  return scalar_sources;
1435 }
1436 
1437 template <class RA>
1438 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1439  const RA* ra_node,
1440  const RelAlgTranslator& translator,
1441  int32_t tableId,
1442  const Catalog_Namespace::Catalog& cat,
1443  const ColumnNameList& colNames,
1444  size_t starting_projection_column_idx) {
1445  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1446  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1447  const auto scalar_rex = scalar_at(i, ra_node);
1448  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1449  // RexRef are synthetic scalars we append at the end of the real ones
1450  // for the sake of taking memory ownership, no real work needed here.
1451  continue;
1452  }
1453 
1454  std::shared_ptr<Analyzer::Expr> translated_expr;
1455  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1456  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1457  tableId,
1458  cat,
1459  colNames[i - starting_projection_column_idx]);
1460  } else {
1461  translated_expr = translator.translateScalarRex(scalar_rex);
1462  }
1463  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1464  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1465  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1466  }
1467 
1468  return scalar_sources;
1469 }
1470 
1471 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1472  const RelCompound* compound,
1473  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1474  if (!compound->isAggregate()) {
1475  return {nullptr};
1476  }
1477  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1478  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1479  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1480  }
1481  return groupby_exprs;
1482 }
1483 
1484 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1485  const RelAggregate* aggregate,
1486  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1487  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1488  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1489  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1490  }
1491  return groupby_exprs;
1492 }
1493 
1495  const RelAlgTranslator& translator) {
1496  const auto filter_rex = compound->getFilterExpr();
1497  const auto filter_expr =
1498  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1499  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1501 }
1502 
1503 std::vector<Analyzer::Expr*> translate_targets(
1504  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1505  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1506  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1507  const RelCompound* compound,
1508  const RelAlgTranslator& translator,
1509  const ExecutorType executor_type) {
1510  std::vector<Analyzer::Expr*> target_exprs;
1511  for (size_t i = 0; i < compound->size(); ++i) {
1512  const auto target_rex = compound->getTargetExpr(i);
1513  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1514  std::shared_ptr<Analyzer::Expr> target_expr;
1515  if (target_rex_agg) {
1516  target_expr =
1517  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1518  } else {
1519  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1520  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1521  if (target_rex_ref) {
1522  const auto ref_idx = target_rex_ref->getIndex();
1523  CHECK_GE(ref_idx, size_t(1));
1524  CHECK_LE(ref_idx, groupby_exprs.size());
1525  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1526  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1527  } else {
1528  target_expr = translator.translateScalarRex(target_rex_scalar);
1529  auto rewritten_expr = rewrite_expr(target_expr.get());
1530  target_expr = fold_expr(rewritten_expr.get());
1531  if (executor_type == ExecutorType::Native) {
1532  try {
1533  target_expr = set_transient_dict(target_expr);
1534  } catch (...) {
1535  // noop
1536  }
1537  } else {
1538  target_expr = cast_dict_to_none(target_expr);
1539  }
1540  }
1541  }
1542  CHECK(target_expr);
1543  target_exprs_owned.push_back(target_expr);
1544  target_exprs.push_back(target_expr.get());
1545  }
1546  return target_exprs;
1547 }
1548 
1549 std::vector<Analyzer::Expr*> translate_targets(
1550  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1551  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1552  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1553  const RelAggregate* aggregate,
1554  const RelAlgTranslator& translator) {
1555  std::vector<Analyzer::Expr*> target_exprs;
1556  size_t group_key_idx = 1;
1557  for (const auto& groupby_expr : groupby_exprs) {
1558  auto target_expr =
1559  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1560  target_exprs_owned.push_back(target_expr);
1561  target_exprs.push_back(target_expr.get());
1562  }
1563 
1564  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1565  auto target_expr =
1566  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1567  CHECK(target_expr);
1568  target_expr = fold_expr(target_expr.get());
1569  target_exprs_owned.push_back(target_expr);
1570  target_exprs.push_back(target_expr.get());
1571  }
1572  return target_exprs;
1573 }
1574 
1576  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1577  return agg_expr && agg_expr->get_is_distinct();
1578 }
1579 
1580 bool is_agg(const Analyzer::Expr* expr) {
1581  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1582  if (agg_expr && agg_expr->get_contains_agg()) {
1583  auto agg_type = agg_expr->get_aggtype();
1584  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1585  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1586  return true;
1587  }
1588  }
1589  return false;
1590 }
1591 
1593  if (is_count_distinct(&expr)) {
1594  return SQLTypeInfo(kBIGINT, false);
1595  } else if (is_agg(&expr)) {
1597  }
1598  return get_logical_type_info(expr.get_type_info());
1599 }
1600 
1601 template <class RA>
1602 std::vector<TargetMetaInfo> get_targets_meta(
1603  const RA* ra_node,
1604  const std::vector<Analyzer::Expr*>& target_exprs) {
1605  std::vector<TargetMetaInfo> targets_meta;
1606  CHECK_EQ(ra_node->size(), target_exprs.size());
1607  for (size_t i = 0; i < ra_node->size(); ++i) {
1608  CHECK(target_exprs[i]);
1609  // TODO(alex): remove the count distinct type fixup.
1610  targets_meta.emplace_back(ra_node->getFieldName(i),
1611  get_logical_type_for_expr(*target_exprs[i]),
1612  target_exprs[i]->get_type_info());
1613  }
1614  return targets_meta;
1615 }
1616 
1617 template <>
1618 std::vector<TargetMetaInfo> get_targets_meta(
1619  const RelFilter* filter,
1620  const std::vector<Analyzer::Expr*>& target_exprs) {
1621  RelAlgNode const* input0 = filter->getInput(0);
1622  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1623  return get_targets_meta(input, target_exprs);
1624  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1625  return get_targets_meta(input, target_exprs);
1626  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1627  return get_targets_meta(input, target_exprs);
1628  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1629  return get_targets_meta(input, target_exprs);
1630  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1631  return get_targets_meta(input, target_exprs);
1632  }
1633  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1634  return {};
1635 }
1636 
1637 } // namespace
1638 
1640  const CompilationOptions& co_in,
1641  const ExecutionOptions& eo_in,
1642  const int64_t queue_time_ms) {
1643  CHECK(node);
1644  auto timer = DEBUG_TIMER(__func__);
1645 
1646  auto co = co_in;
1647  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1648  // encoded string updates
1649 
1650  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1651  auto& work_unit,
1652  const bool is_aggregate) {
1653  auto table_descriptor = node->getModifiedTableDescriptor();
1654  CHECK(table_descriptor);
1655  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1656  throw std::runtime_error(
1657  "UPDATE queries involving variable length columns are only supported on tables "
1658  "with the vacuum attribute set to 'delayed'");
1659  }
1660 
1661  if (table_descriptor->fragmenter) {
1662  auto table_key = boost::hash_value(
1663  table_descriptor->fragmenter->getFragmentsForQuery().chunkKeyPrefix);
1665  } else {
1667  }
1668 
1669  auto updated_table_desc = node->getModifiedTableDescriptor();
1671  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1672  node->getTargetColumns(),
1673  node->getOutputMetainfo(),
1674  node->isVarlenUpdateRequired());
1675 
1676  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1677 
1678  auto execute_update_ra_exe_unit =
1679  [this, &co, &eo_in, &table_infos, &updated_table_desc](
1680  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1682 
1683  auto eo = eo_in;
1684  if (dml_transaction_parameters_->tableIsTemporary()) {
1685  eo.output_columnar_hint = true;
1686  co_project.allow_lazy_fetch = false;
1687  co_project.filter_on_deleted_column =
1688  false; // project the entire delete column for columnar update
1689  }
1690 
1691  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
1693  CHECK(update_transaction_parameters);
1694  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1695  try {
1696  auto table_update_metadata =
1697  executor_->executeUpdate(ra_exe_unit,
1698  table_infos,
1699  updated_table_desc,
1700  co_project,
1701  eo,
1702  cat_,
1703  executor_->row_set_mem_owner_,
1704  update_callback,
1705  is_aggregate);
1706  post_execution_callback_ = [table_update_metadata, this]() {
1707  dml_transaction_parameters_->finalizeTransaction(cat_);
1708  TableOptimizer table_optimizer{
1709  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1710  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1711  };
1712  } catch (const QueryExecutionError& e) {
1713  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1714  }
1715  };
1716 
1717  if (dml_transaction_parameters_->tableIsTemporary()) {
1718  // hold owned target exprs during execution if rewriting
1719  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1720  // rewrite temp table updates to generate the full column by moving the where
1721  // clause into a case if such a rewrite is not possible, bail on the update
1722  // operation build an expr for the update target
1723  auto update_transaction_params =
1725  CHECK(update_transaction_params);
1726  const auto td = update_transaction_params->getTableDescriptor();
1727  CHECK(td);
1728  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1729  if (update_column_names.size() > 1) {
1730  throw std::runtime_error(
1731  "Multi-column update is not yet supported for temporary tables.");
1732  }
1733 
1734  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1735  CHECK(cd);
1736  auto projected_column_to_update =
1737  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1738  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1739  work_unit.exe_unit, projected_column_to_update);
1740  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1741  throw std::runtime_error(
1742  "Variable length updates not yet supported on temporary tables.");
1743  }
1744  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1745  } else {
1746  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1747  }
1748  };
1749 
1750  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1751  auto work_unit =
1752  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1753 
1754  execute_update_for_node(compound, work_unit, compound->isAggregate());
1755  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1756  auto work_unit =
1757  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1758 
1759  if (project->isSimple()) {
1760  CHECK_EQ(size_t(1), project->inputCount());
1761  const auto input_ra = project->getInput(0);
1762  if (dynamic_cast<const RelSort*>(input_ra)) {
1763  const auto& input_table =
1764  get_temporary_table(&temporary_tables_, -input_ra->getId());
1765  CHECK(input_table);
1766  work_unit.exe_unit.scan_limit = input_table->rowCount();
1767  }
1768  }
1769 
1770  execute_update_for_node(project, work_unit, false);
1771  } else {
1772  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1773  }
1774 }
1775 
1777  const CompilationOptions& co,
1778  const ExecutionOptions& eo_in,
1779  const int64_t queue_time_ms) {
1780  CHECK(node);
1781  auto timer = DEBUG_TIMER(__func__);
1782 
1783  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1784  auto& work_unit,
1785  const bool is_aggregate) {
1786  auto* table_descriptor = node->getModifiedTableDescriptor();
1787  CHECK(table_descriptor);
1788  if (!table_descriptor->hasDeletedCol) {
1789  throw std::runtime_error(
1790  "DELETE queries are only supported on tables with the vacuum attribute set to "
1791  "'delayed'");
1792  }
1793 
1794  if (table_descriptor->fragmenter) {
1795  auto table_key = boost::hash_value(
1796  table_descriptor->fragmenter->getFragmentsForQuery().chunkKeyPrefix);
1798  } else {
1800  }
1801 
1802  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1803 
1804  auto execute_delete_ra_exe_unit =
1805  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1806  const bool is_aggregate) {
1808  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1809  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1811  CHECK(delete_params);
1812  auto delete_callback = yieldDeleteCallback(*delete_params);
1814 
1815  auto eo = eo_in;
1816  if (dml_transaction_parameters_->tableIsTemporary()) {
1817  eo.output_columnar_hint = true;
1818  co_delete.filter_on_deleted_column =
1819  false; // project the entire delete column for columnar update
1820  } else {
1821  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1822  }
1823 
1824  try {
1825  auto table_update_metadata =
1826  executor_->executeUpdate(exe_unit,
1827  table_infos,
1828  table_descriptor,
1829  co_delete,
1830  eo,
1831  cat_,
1832  executor_->row_set_mem_owner_,
1833  delete_callback,
1834  is_aggregate);
1835  post_execution_callback_ = [table_update_metadata, this]() {
1836  dml_transaction_parameters_->finalizeTransaction(cat_);
1837  TableOptimizer table_optimizer{
1838  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1839  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1840  };
1841  } catch (const QueryExecutionError& e) {
1842  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1843  }
1844  };
1845 
1846  if (table_is_temporary(table_descriptor)) {
1847  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1848  auto cd = cat_.getDeletedColumn(table_descriptor);
1849  CHECK(cd);
1850  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1851  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1852  const auto rewritten_exe_unit =
1853  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1854  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1855  } else {
1856  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1857  }
1858  };
1859 
1860  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1861  const auto work_unit =
1862  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1863  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1864  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1865  auto work_unit =
1866  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1867  if (project->isSimple()) {
1868  CHECK_EQ(size_t(1), project->inputCount());
1869  const auto input_ra = project->getInput(0);
1870  if (dynamic_cast<const RelSort*>(input_ra)) {
1871  const auto& input_table =
1872  get_temporary_table(&temporary_tables_, -input_ra->getId());
1873  CHECK(input_table);
1874  work_unit.exe_unit.scan_limit = input_table->rowCount();
1875  }
1876  }
1877  execute_delete_for_node(project, work_unit, false);
1878  } else {
1879  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1880  }
1881 }
1882 
1884  const CompilationOptions& co,
1885  const ExecutionOptions& eo,
1886  RenderInfo* render_info,
1887  const int64_t queue_time_ms) {
1888  auto timer = DEBUG_TIMER(__func__);
1889  const auto work_unit =
1890  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1891  CompilationOptions co_compound = co;
1892  return executeWorkUnit(work_unit,
1893  compound->getOutputMetainfo(),
1894  compound->isAggregate(),
1895  co_compound,
1896  eo,
1897  render_info,
1898  queue_time_ms);
1899 }
1900 
1902  const CompilationOptions& co,
1903  const ExecutionOptions& eo,
1904  RenderInfo* render_info,
1905  const int64_t queue_time_ms) {
1906  auto timer = DEBUG_TIMER(__func__);
1907  const auto work_unit = createAggregateWorkUnit(
1908  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1909  return executeWorkUnit(work_unit,
1910  aggregate->getOutputMetainfo(),
1911  true,
1912  co,
1913  eo,
1914  render_info,
1915  queue_time_ms);
1916 }
1917 
1918 namespace {
1919 
1920 // Returns true iff the execution unit contains window functions.
1922  return std::any_of(ra_exe_unit.target_exprs.begin(),
1923  ra_exe_unit.target_exprs.end(),
1924  [](const Analyzer::Expr* expr) {
1925  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1926  });
1927 }
1928 
1929 } // namespace
1930 
1932  const RelProject* project,
1933  const CompilationOptions& co,
1934  const ExecutionOptions& eo,
1935  RenderInfo* render_info,
1936  const int64_t queue_time_ms,
1937  const std::optional<size_t> previous_count) {
1938  auto timer = DEBUG_TIMER(__func__);
1939  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1940  CompilationOptions co_project = co;
1941  if (project->isSimple()) {
1942  CHECK_EQ(size_t(1), project->inputCount());
1943  const auto input_ra = project->getInput(0);
1944  if (dynamic_cast<const RelSort*>(input_ra)) {
1945  co_project.device_type = ExecutorDeviceType::CPU;
1946  const auto& input_table =
1947  get_temporary_table(&temporary_tables_, -input_ra->getId());
1948  CHECK(input_table);
1949  work_unit.exe_unit.scan_limit =
1950  std::min(input_table->getLimit(), input_table->rowCount());
1951  }
1952  }
1953  return executeWorkUnit(work_unit,
1954  project->getOutputMetainfo(),
1955  false,
1956  co_project,
1957  eo,
1958  render_info,
1959  queue_time_ms,
1960  previous_count);
1961 }
1962 
1964  const CompilationOptions& co_in,
1965  const ExecutionOptions& eo,
1966  const int64_t queue_time_ms) {
1968  auto timer = DEBUG_TIMER(__func__);
1969 
1970  auto co = co_in;
1971 
1972  if (g_cluster) {
1973  throw std::runtime_error("Table functions not supported in distributed mode yet");
1974  }
1975  if (!g_enable_table_functions) {
1976  throw std::runtime_error("Table function support is disabled");
1977  }
1978  auto table_func_work_unit = createTableFunctionWorkUnit(
1979  table_func,
1980  eo.just_explain,
1981  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1982  const auto body = table_func_work_unit.body;
1983  CHECK(body);
1984 
1985  const auto table_infos =
1986  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1987 
1988  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1989  co.device_type,
1991  nullptr,
1992  executor_->getCatalog(),
1993  executor_->blockSize(),
1994  executor_->gridSize()),
1995  {}};
1996 
1997  try {
1998  result = {executor_->executeTableFunction(
1999  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
2000  body->getOutputMetainfo()};
2001  } catch (const QueryExecutionError& e) {
2004  throw std::runtime_error("Table function ran out of memory during execution");
2005  }
2006  result.setQueueTime(queue_time_ms);
2007  return result;
2008 }
2009 
2010 namespace {
2011 
2012 // Creates a new expression which has the range table index set to 1. This is needed to
2013 // reuse the hash join construction helpers to generate a hash table for the window
2014 // function partition: create an equals expression with left and right sides identical
2015 // except for the range table index.
2016 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
2017  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
2018  if (tuple) {
2019  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
2020  for (const auto& element : tuple->getTuple()) {
2021  transformed_tuple.push_back(transform_to_inner(element.get()));
2022  }
2023  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
2024  }
2025  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
2026  if (!col) {
2027  throw std::runtime_error("Only columns supported in the window partition for now");
2028  }
2029  return makeExpr<Analyzer::ColumnVar>(
2030  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2031 }
2032 
2033 } // namespace
2034 
2036  const CompilationOptions& co,
2037  const ExecutionOptions& eo,
2038  ColumnCacheMap& column_cache_map,
2039  const int64_t queue_time_ms) {
2040  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
2041  CHECK_EQ(query_infos.size(), size_t(1));
2042  if (query_infos.front().info.fragments.size() != 1) {
2043  throw std::runtime_error(
2044  "Only single fragment tables supported for window functions for now");
2045  }
2046  if (eo.executor_type == ::ExecutorType::Extern) {
2047  return;
2048  }
2049  query_infos.push_back(query_infos.front());
2050  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2051  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
2052  ++target_index) {
2053  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
2054  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2055  if (!window_func) {
2056  continue;
2057  }
2058  // Always use baseline layout hash tables for now, make the expression a tuple.
2059  const auto& partition_keys = window_func->getPartitionKeys();
2060  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2061  if (partition_keys.size() >= 1) {
2062  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2063  if (partition_keys.size() > 1) {
2064  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2065  } else {
2066  CHECK_EQ(partition_keys.size(), size_t(1));
2067  partition_key_tuple = partition_keys.front();
2068  }
2069  // Creates a tautology equality with the partition expression on both sides.
2070  partition_key_cond =
2071  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2072  kBW_EQ,
2073  kONE,
2074  partition_key_tuple,
2075  transform_to_inner(partition_key_tuple.get()));
2076  }
2077  auto context =
2078  createWindowFunctionContext(window_func,
2079  partition_key_cond /*nullptr if no partition key*/,
2080  ra_exe_unit,
2081  query_infos,
2082  co,
2083  column_cache_map,
2084  executor_->getRowSetMemoryOwner());
2085  context->compute();
2086  window_project_node_context->addWindowFunctionContext(std::move(context),
2087  target_index);
2088  }
2089 }
2090 
2091 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2092  const Analyzer::WindowFunction* window_func,
2093  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2094  const RelAlgExecutionUnit& ra_exe_unit,
2095  const std::vector<InputTableInfo>& query_infos,
2096  const CompilationOptions& co,
2097  ColumnCacheMap& column_cache_map,
2098  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2099  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2100  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2103  std::unique_ptr<WindowFunctionContext> context;
2104  if (partition_key_cond) {
2105  const auto join_table_or_err =
2106  executor_->buildHashTableForQualifier(partition_key_cond,
2107  query_infos,
2108  memory_level,
2109  JoinType::INVALID, // for window function
2111  column_cache_map,
2112  ra_exe_unit.hash_table_build_plan_dag,
2113  ra_exe_unit.query_hint,
2114  ra_exe_unit.table_id_to_node_map);
2115  if (!join_table_or_err.fail_reason.empty()) {
2116  throw std::runtime_error(join_table_or_err.fail_reason);
2117  }
2118  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2119  context = std::make_unique<WindowFunctionContext>(window_func,
2120  join_table_or_err.hash_table,
2121  elem_count,
2122  co.device_type,
2123  row_set_mem_owner);
2124  } else {
2125  context = std::make_unique<WindowFunctionContext>(
2126  window_func, elem_count, co.device_type, row_set_mem_owner);
2127  }
2128  const auto& order_keys = window_func->getOrderKeys();
2129  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2130  for (const auto& order_key : order_keys) {
2131  const auto order_col =
2132  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2133  if (!order_col) {
2134  throw std::runtime_error("Only order by columns supported for now");
2135  }
2136  const int8_t* column;
2137  size_t join_col_elem_count;
2138  std::tie(column, join_col_elem_count) =
2140  *order_col,
2141  query_infos.front().info.fragments.front(),
2142  memory_level,
2143  0,
2144  nullptr,
2145  /*thread_idx=*/0,
2146  chunks_owner,
2147  column_cache_map);
2148 
2149  CHECK_EQ(join_col_elem_count, elem_count);
2150  context->addOrderColumn(column, order_col.get(), chunks_owner);
2151  }
2152  return context;
2153 }
2154 
2156  const CompilationOptions& co,
2157  const ExecutionOptions& eo,
2158  RenderInfo* render_info,
2159  const int64_t queue_time_ms) {
2160  auto timer = DEBUG_TIMER(__func__);
2161  const auto work_unit =
2162  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2163  return executeWorkUnit(
2164  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2165 }
2166 
2167 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2168  std::vector<TargetMetaInfo> const& rhs) {
2169  if (lhs.size() == rhs.size()) {
2170  for (size_t i = 0; i < lhs.size(); ++i) {
2171  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2172  return false;
2173  }
2174  }
2175  return true;
2176  }
2177  return false;
2178 }
2179 
2180 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2181  return target_meta_info.get_type_info().is_geometry();
2182 }
2183 
2185  const RaExecutionSequence& seq,
2186  const CompilationOptions& co,
2187  const ExecutionOptions& eo,
2188  RenderInfo* render_info,
2189  const int64_t queue_time_ms) {
2190  auto timer = DEBUG_TIMER(__func__);
2191  if (!logical_union->isAll()) {
2192  throw std::runtime_error("UNION without ALL is not supported yet.");
2193  }
2194  // Will throw a std::runtime_error if types don't match.
2195  logical_union->checkForMatchingMetaInfoTypes();
2196  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2197  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2198  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2199  }
2200  auto work_unit =
2201  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2202  return executeWorkUnit(work_unit,
2203  logical_union->getOutputMetainfo(),
2204  false,
2206  eo,
2207  render_info,
2208  queue_time_ms);
2209 }
2210 
2212  const RelLogicalValues* logical_values,
2213  const ExecutionOptions& eo) {
2214  auto timer = DEBUG_TIMER(__func__);
2216  logical_values->getNumRows(),
2218  /*is_table_function=*/false);
2219 
2220  auto tuple_type = logical_values->getTupleType();
2221  for (size_t i = 0; i < tuple_type.size(); ++i) {
2222  auto& target_meta_info = tuple_type[i];
2223  if (target_meta_info.get_type_info().is_varlen()) {
2224  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2225  }
2226  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2227  // replace w/ bigint
2228  tuple_type[i] =
2229  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2230  }
2231  query_mem_desc.addColSlotInfo(
2232  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2233  }
2234  logical_values->setOutputMetainfo(tuple_type);
2235 
2236  std::vector<TargetInfo> target_infos;
2237  for (const auto& tuple_type_component : tuple_type) {
2238  target_infos.emplace_back(TargetInfo{false,
2239  kCOUNT,
2240  tuple_type_component.get_type_info(),
2241  SQLTypeInfo(kNULLT, false),
2242  false,
2243  false,
2244  /*is_varlen_projection=*/false});
2245  }
2246 
2247  std::shared_ptr<ResultSet> rs{
2248  ResultSetLogicalValuesBuilder{logical_values,
2249  target_infos,
2252  executor_->getRowSetMemoryOwner(),
2253  executor_}
2254  .build()};
2255 
2256  return {rs, tuple_type};
2257 }
2258 
2259 namespace {
2260 
2261 template <class T>
2262 int64_t insert_one_dict_str(T* col_data,
2263  const std::string& columnName,
2264  const SQLTypeInfo& columnType,
2265  const Analyzer::Constant* col_cv,
2266  const Catalog_Namespace::Catalog& catalog) {
2267  if (col_cv->get_is_null()) {
2268  *col_data = inline_fixed_encoding_null_val(columnType);
2269  } else {
2270  const int dict_id = columnType.get_comp_param();
2271  const auto col_datum = col_cv->get_constval();
2272  const auto& str = *col_datum.stringval;
2273  const auto dd = catalog.getMetadataForDict(dict_id);
2274  CHECK(dd && dd->stringDict);
2275  int32_t str_id = dd->stringDict->getOrAdd(str);
2276  if (!dd->dictIsTemp) {
2277  const auto checkpoint_ok = dd->stringDict->checkpoint();
2278  if (!checkpoint_ok) {
2279  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2280  columnName);
2281  }
2282  }
2283  const bool invalid = str_id > max_valid_int_value<T>();
2284  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2285  if (invalid) {
2286  LOG(ERROR) << "Could not encode string: " << str
2287  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2288  << " bits. Will store NULL instead.";
2289  }
2290  str_id = inline_fixed_encoding_null_val(columnType);
2291  }
2292  *col_data = str_id;
2293  }
2294  return *col_data;
2295 }
2296 
2297 template <class T>
2298 int64_t insert_one_dict_str(T* col_data,
2299  const ColumnDescriptor* cd,
2300  const Analyzer::Constant* col_cv,
2301  const Catalog_Namespace::Catalog& catalog) {
2302  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2303 }
2304 
2305 } // namespace
2306 
2308  const ExecutionOptions& eo) {
2309  auto timer = DEBUG_TIMER(__func__);
2310  if (eo.just_explain) {
2311  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2312  }
2313 
2314  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2317  executor_->getRowSetMemoryOwner(),
2318  executor_->getCatalog(),
2319  executor_->blockSize(),
2320  executor_->gridSize());
2321 
2322  std::vector<TargetMetaInfo> empty_targets;
2323  return {rs, empty_targets};
2324 }
2325 
2326 namespace {
2327 int64_t int_value_from_numbers_ptr(const SQLTypeInfo& type_info, const int8_t* data) {
2328  size_t sz = 0;
2329  switch (type_info.get_type()) {
2330  case kTINYINT:
2331  case kSMALLINT:
2332  case kINT:
2333  case kBIGINT:
2334  case kTIMESTAMP:
2335  case kTIME:
2336  case kDATE:
2337  sz = type_info.get_logical_size();
2338  break;
2339  case kTEXT:
2340  case kVARCHAR:
2341  case kCHAR:
2342  CHECK(type_info.get_compression() == kENCODING_DICT);
2343  sz = type_info.get_size();
2344  break;
2345  default:
2346  CHECK(false) << "Unexpected sharding key datatype";
2347  }
2348 
2349  switch (sz) {
2350  case 1:
2351  return *(reinterpret_cast<const int8_t*>(data));
2352  case 2:
2353  return *(reinterpret_cast<const int16_t*>(data));
2354  case 4:
2355  return *(reinterpret_cast<const int32_t*>(data));
2356  case 8:
2357  return *(reinterpret_cast<const int64_t*>(data));
2358  default:
2359  CHECK(false);
2360  return 0;
2361  }
2362 }
2363 
2366  const Fragmenter_Namespace::InsertData& data) {
2367  auto shard_column_md = cat.getShardColumnMetadataForTable(td);
2368  CHECK(shard_column_md);
2369  auto sharded_column_id = shard_column_md->columnId;
2370  const TableDescriptor* shard{nullptr};
2371  for (size_t i = 0; i < data.columnIds.size(); ++i) {
2372  if (data.columnIds[i] == sharded_column_id) {
2373  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
2374  const auto shard_count = shard_tables.size();
2375  CHECK(data.data[i].numbersPtr);
2376  auto value = int_value_from_numbers_ptr(shard_column_md->columnType,
2377  data.data[i].numbersPtr);
2378  const size_t shard_idx = SHARD_FOR_KEY(value, shard_count);
2379  shard = shard_tables[shard_idx];
2380  break;
2381  }
2382  }
2383  return shard;
2384 }
2385 
2386 } // namespace
2387 
2389  // Note: We currently obtain an executor for this method, but we do not need it.
2390  // Therefore, we skip the executor state setup in the regular execution path. In the
2391  // future, we will likely want to use the executor to evaluate expressions in the insert
2392  // statement.
2393 
2394  const auto& targets = query.get_targetlist();
2395  const int table_id = query.get_result_table_id();
2396  const auto& col_id_list = query.get_result_col_list();
2397 
2398  std::vector<const ColumnDescriptor*> col_descriptors;
2399  std::vector<int> col_ids;
2400  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2401  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2402  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2403 
2404  for (const int col_id : col_id_list) {
2405  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2406  const auto col_enc = cd->columnType.get_compression();
2407  if (cd->columnType.is_string()) {
2408  switch (col_enc) {
2409  case kENCODING_NONE: {
2410  auto it_ok =
2411  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2412  CHECK(it_ok.second);
2413  break;
2414  }
2415  case kENCODING_DICT: {
2416  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2417  CHECK(dd);
2418  const auto it_ok = col_buffers.emplace(
2419  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2420  CHECK(it_ok.second);
2421  break;
2422  }
2423  default:
2424  CHECK(false);
2425  }
2426  } else if (cd->columnType.is_geometry()) {
2427  auto it_ok =
2428  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2429  CHECK(it_ok.second);
2430  } else if (cd->columnType.is_array()) {
2431  auto it_ok =
2432  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2433  CHECK(it_ok.second);
2434  } else {
2435  const auto it_ok = col_buffers.emplace(
2436  col_id,
2437  std::unique_ptr<uint8_t[]>(
2438  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2439  // the buffer
2440  CHECK(it_ok.second);
2441  }
2442  col_descriptors.push_back(cd);
2443  col_ids.push_back(col_id);
2444  }
2445  size_t col_idx = 0;
2447  insert_data.databaseId = cat_.getCurrentDB().dbId;
2448  insert_data.tableId = table_id;
2449 
2450  // mark the target table's cached item as dirty
2451  std::vector<int> table_chunk_key_prefix{insert_data.databaseId, insert_data.tableId};
2452  auto table_key = boost::hash_value(table_chunk_key_prefix);
2454 
2455  for (auto target_entry : targets) {
2456  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2457  if (!col_cv) {
2458  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2459  CHECK(col_cast);
2460  CHECK_EQ(kCAST, col_cast->get_optype());
2461  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2462  }
2463  CHECK(col_cv);
2464  const auto cd = col_descriptors[col_idx];
2465  auto col_datum = col_cv->get_constval();
2466  auto col_type = cd->columnType.get_type();
2467  uint8_t* col_data_bytes{nullptr};
2468  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2469  (!cd->columnType.is_string() ||
2470  cd->columnType.get_compression() == kENCODING_DICT)) {
2471  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2472  CHECK(col_data_bytes_it != col_buffers.end());
2473  col_data_bytes = col_data_bytes_it->second.get();
2474  }
2475  switch (col_type) {
2476  case kBOOLEAN: {
2477  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2478  auto null_bool_val =
2479  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2480  *col_data = col_cv->get_is_null() || null_bool_val
2481  ? inline_fixed_encoding_null_val(cd->columnType)
2482  : (col_datum.boolval ? 1 : 0);
2483  break;
2484  }
2485  case kTINYINT: {
2486  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2487  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2488  : col_datum.tinyintval;
2489  break;
2490  }
2491  case kSMALLINT: {
2492  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2493  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2494  : col_datum.smallintval;
2495  break;
2496  }
2497  case kINT: {
2498  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2499  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2500  : col_datum.intval;
2501  break;
2502  }
2503  case kBIGINT:
2504  case kDECIMAL:
2505  case kNUMERIC: {
2506  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2507  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2508  : col_datum.bigintval;
2509  break;
2510  }
2511  case kFLOAT: {
2512  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2513  *col_data = col_datum.floatval;
2514  break;
2515  }
2516  case kDOUBLE: {
2517  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2518  *col_data = col_datum.doubleval;
2519  break;
2520  }
2521  case kTEXT:
2522  case kVARCHAR:
2523  case kCHAR: {
2524  switch (cd->columnType.get_compression()) {
2525  case kENCODING_NONE:
2526  str_col_buffers[col_ids[col_idx]].push_back(
2527  col_datum.stringval ? *col_datum.stringval : "");
2528  break;
2529  case kENCODING_DICT: {
2530  switch (cd->columnType.get_size()) {
2531  case 1:
2533  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2534  break;
2535  case 2:
2537  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2538  break;
2539  case 4:
2541  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2542  break;
2543  default:
2544  CHECK(false);
2545  }
2546  break;
2547  }
2548  default:
2549  CHECK(false);
2550  }
2551  break;
2552  }
2553  case kTIME:
2554  case kTIMESTAMP:
2555  case kDATE: {
2556  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2557  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2558  : col_datum.bigintval;
2559  break;
2560  }
2561  case kARRAY: {
2562  const auto is_null = col_cv->get_is_null();
2563  const auto size = cd->columnType.get_size();
2564  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2565  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2566  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2567  if (is_null && !is_point_coords) {
2568  if (size > 0) {
2569  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2570  int8_t* buf = (int8_t*)checked_malloc(size);
2571  put_null_array(static_cast<void*>(buf), elem_ti, "");
2572  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2573  p += elem_ti.get_size()) {
2574  put_null(static_cast<void*>(p), elem_ti, "");
2575  }
2576  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2577  } else {
2578  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2579  }
2580  break;
2581  }
2582  const auto l = col_cv->get_value_list();
2583  size_t len = l.size() * elem_ti.get_size();
2584  if (size > 0 && static_cast<size_t>(size) != len) {
2585  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2586  std::to_string(size / elem_ti.get_size()) +
2587  " values, " + "received " + std::to_string(l.size()));
2588  }
2589  if (elem_ti.is_string()) {
2590  CHECK(kENCODING_DICT == elem_ti.get_compression());
2591  CHECK(4 == elem_ti.get_size());
2592 
2593  int8_t* buf = (int8_t*)checked_malloc(len);
2594  int32_t* p = reinterpret_cast<int32_t*>(buf);
2595 
2596  int elemIndex = 0;
2597  for (auto& e : l) {
2598  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2599  CHECK(c);
2600  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2601  elemIndex++;
2602  }
2603  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2604 
2605  } else {
2606  int8_t* buf = (int8_t*)checked_malloc(len);
2607  int8_t* p = buf;
2608  for (auto& e : l) {
2609  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2610  CHECK(c);
2611  p = append_datum(p, c->get_constval(), elem_ti);
2612  CHECK(p);
2613  }
2614  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2615  }
2616  break;
2617  }
2618  case kPOINT:
2619  case kLINESTRING:
2620  case kPOLYGON:
2621  case kMULTIPOLYGON:
2622  str_col_buffers[col_ids[col_idx]].push_back(
2623  col_datum.stringval ? *col_datum.stringval : "");
2624  break;
2625  default:
2626  CHECK(false);
2627  }
2628  ++col_idx;
2629  }
2630  for (const auto& kv : col_buffers) {
2631  insert_data.columnIds.push_back(kv.first);
2632  DataBlockPtr p;
2633  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2634  insert_data.data.push_back(p);
2635  }
2636  for (auto& kv : str_col_buffers) {
2637  insert_data.columnIds.push_back(kv.first);
2638  DataBlockPtr p;
2639  p.stringsPtr = &kv.second;
2640  insert_data.data.push_back(p);
2641  }
2642  for (auto& kv : arr_col_buffers) {
2643  insert_data.columnIds.push_back(kv.first);
2644  DataBlockPtr p;
2645  p.arraysPtr = &kv.second;
2646  insert_data.data.push_back(p);
2647  }
2648  insert_data.numRows = 1;
2649  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2650  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2651  CHECK(table_descriptor);
2652  if (table_descriptor->nShards > 0) {
2653  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2654  CHECK(shard);
2655  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2656  } else {
2657  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2658  }
2659 
2660  // Ensure checkpoint happens across all shards, if not in distributed
2661  // mode (aggregator handles checkpointing in distributed mode)
2662  if (!g_cluster &&
2663  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2664  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2665  }
2666 
2667  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2670  executor_->getRowSetMemoryOwner(),
2671  nullptr,
2672  0,
2673  0);
2674  std::vector<TargetMetaInfo> empty_targets;
2675  return {rs, empty_targets};
2676 }
2677 
2678 namespace {
2679 
2680 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2681 // the executor interface to use the collation directly and remove this conversion.
2682 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2683  std::list<Analyzer::OrderEntry> result;
2684  for (size_t i = 0; i < sort->collationCount(); ++i) {
2685  const auto sort_field = sort->getCollation(i);
2686  result.emplace_back(sort_field.getField() + 1,
2687  sort_field.getSortDir() == SortDirection::Descending,
2688  sort_field.getNullsPosition() == NullSortedPosition::First);
2689  }
2690  return result;
2691 }
2692 
2693 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2694  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2695  if (aggregate) {
2696  return 0;
2697  }
2698  const auto compound = dynamic_cast<const RelCompound*>(ra);
2699  return (compound && compound->isAggregate()) ? 0 : limit;
2700 }
2701 
2702 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2703  return !order_entries.empty() && order_entries.front().is_desc;
2704 }
2705 
2706 } // namespace
2707 
2709  const CompilationOptions& co,
2710  const ExecutionOptions& eo,
2711  RenderInfo* render_info,
2712  const int64_t queue_time_ms) {
2713  auto timer = DEBUG_TIMER(__func__);
2715  const auto source = sort->getInput(0);
2716  const bool is_aggregate = node_is_aggregate(source);
2717  auto it = leaf_results_.find(sort->getId());
2718  if (it != leaf_results_.end()) {
2719  // Add any transient string literals to the sdp on the agg
2720  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2721  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
2722  executor_->row_set_mem_owner_);
2723  // Handle push-down for LIMIT for multi-node
2724  auto& aggregated_result = it->second;
2725  auto& result_rows = aggregated_result.rs;
2726  const size_t limit = sort->getLimit();
2727  const size_t offset = sort->getOffset();
2728  const auto order_entries = get_order_entries(sort);
2729  if (limit || offset) {
2730  if (!order_entries.empty()) {
2731  result_rows->sort(order_entries, limit + offset, executor_);
2732  }
2733  result_rows->dropFirstN(offset);
2734  if (limit) {
2735  result_rows->keepFirstN(limit);
2736  }
2737  }
2738  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2739  sort->setOutputMetainfo(aggregated_result.targets_meta);
2740  return result;
2741  }
2742 
2743  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2744  bool is_desc{false};
2745 
2746  auto execute_sort_query = [this,
2747  sort,
2748  &source,
2749  &is_aggregate,
2750  &eo,
2751  &co,
2752  render_info,
2753  queue_time_ms,
2754  &groupby_exprs,
2755  &is_desc]() -> ExecutionResult {
2756  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2757  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2758  ExecutionOptions eo_copy = {
2760  eo.allow_multifrag,
2761  eo.just_explain,
2762  eo.allow_loop_joins,
2763  eo.with_watchdog,
2764  eo.jit_debug,
2765  eo.just_validate || sort->isEmptyResult(),
2766  eo.with_dynamic_watchdog,
2767  eo.dynamic_watchdog_time_limit,
2768  eo.find_push_down_candidates,
2769  eo.just_calcite_explain,
2770  eo.gpu_input_mem_limit_percent,
2771  eo.allow_runtime_query_interrupt,
2772  eo.running_query_interrupt_freq,
2773  eo.pending_query_interrupt_freq,
2774  eo.executor_type,
2775  };
2776 
2777  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2778  auto source_result = executeWorkUnit(source_work_unit,
2779  source->getOutputMetainfo(),
2780  is_aggregate,
2781  co,
2782  eo_copy,
2783  render_info,
2784  queue_time_ms);
2785  if (render_info && render_info->isPotentialInSituRender()) {
2786  return source_result;
2787  }
2788  if (source_result.isFilterPushDownEnabled()) {
2789  return source_result;
2790  }
2791  auto rows_to_sort = source_result.getRows();
2792  if (eo.just_explain) {
2793  return {rows_to_sort, {}};
2794  }
2795  const size_t limit = sort->getLimit();
2796  const size_t offset = sort->getOffset();
2797  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2798  !use_speculative_top_n(source_work_unit.exe_unit,
2799  rows_to_sort->getQueryMemDesc())) {
2800  const size_t top_n = limit == 0 ? 0 : limit + offset;
2801  rows_to_sort->sort(
2802  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2803  }
2804  if (limit || offset) {
2805  if (g_cluster && sort->collationCount() == 0) {
2806  if (offset >= rows_to_sort->rowCount()) {
2807  rows_to_sort->dropFirstN(offset);
2808  } else {
2809  rows_to_sort->keepFirstN(limit + offset);
2810  }
2811  } else {
2812  rows_to_sort->dropFirstN(offset);
2813  if (limit) {
2814  rows_to_sort->keepFirstN(limit);
2815  }
2816  }
2817  }
2818  return {rows_to_sort, source_result.getTargetsMeta()};
2819  };
2820 
2821  try {
2822  return execute_sort_query();
2823  } catch (const SpeculativeTopNFailed& e) {
2824  CHECK_EQ(size_t(1), groupby_exprs.size());
2825  CHECK(groupby_exprs.front());
2826  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2827  return execute_sort_query();
2828  }
2829 }
2830 
2832  const RelSort* sort,
2833  const ExecutionOptions& eo) {
2834  const auto source = sort->getInput(0);
2835  const size_t limit = sort->getLimit();
2836  const size_t offset = sort->getOffset();
2837  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2838  const size_t scan_total_limit =
2839  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2840  size_t max_groups_buffer_entry_guess{
2841  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
2843  const auto order_entries = get_order_entries(sort);
2844  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2845  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2846  const auto& source_exe_unit = source_work_unit.exe_unit;
2847 
2848  // we do not allow sorting geometry or array types
2849  for (auto order_entry : order_entries) {
2850  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2851  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2852  const auto& ti = get_target_info(te, false);
2853  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2854  throw std::runtime_error(
2855  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2856  }
2857  }
2858 
2859  if (source_exe_unit.groupby_exprs.size() == 1) {
2860  if (!source_exe_unit.groupby_exprs.front()) {
2861  sort_algorithm = SortAlgorithm::StreamingTopN;
2862  } else {
2863  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2864  first_oe_is_desc(order_entries))) {
2865  sort_algorithm = SortAlgorithm::Default;
2866  }
2867  }
2868  }
2869 
2870  sort->setOutputMetainfo(source->getOutputMetainfo());
2871  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2872  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2873  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2874  std::move(source_exe_unit.input_col_descs),
2875  source_exe_unit.simple_quals,
2876  source_exe_unit.quals,
2877  source_exe_unit.join_quals,
2878  source_exe_unit.groupby_exprs,
2879  source_exe_unit.target_exprs,
2880  nullptr,
2881  {sort_info.order_entries, sort_algorithm, limit, offset},
2882  scan_total_limit,
2883  source_exe_unit.query_hint,
2884  source_exe_unit.query_plan_dag,
2885  source_exe_unit.hash_table_build_plan_dag,
2886  source_exe_unit.table_id_to_node_map,
2887  source_exe_unit.use_bump_allocator,
2888  source_exe_unit.union_all,
2889  source_exe_unit.query_state},
2890  source,
2891  max_groups_buffer_entry_guess,
2892  std::move(source_work_unit.query_rewriter),
2893  source_work_unit.input_permutation,
2894  source_work_unit.left_deep_join_input_sizes};
2895 }
2896 
2897 namespace {
2898 
2905 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2906  CHECK(!table_infos.empty());
2907  const auto& first_table = table_infos.front();
2908  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2909  for (const auto& table_info : table_infos) {
2910  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2911  max_num_groups = table_info.info.getNumTuplesUpperBound();
2912  }
2913  }
2914  return std::max(max_num_groups, size_t(1));
2915 }
2916 
2917 bool is_projection(const RelAlgExecutionUnit& ra_exe_unit) {
2918  return ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front();
2919 }
2920 
2922  const RenderInfo* render_info) {
2923  if (!is_projection(ra_exe_unit)) {
2924  return false;
2925  }
2926  if (render_info && render_info->isPotentialInSituRender()) {
2927  return false;
2928  }
2929  if (!ra_exe_unit.sort_info.order_entries.empty()) {
2930  // disable output columnar when we have top-sort node query
2931  return false;
2932  }
2933  return ra_exe_unit.scan_limit >= g_columnar_large_projections_threshold;
2934 }
2935 
2943  for (const auto target_expr : ra_exe_unit.target_exprs) {
2944  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2945  return false;
2946  }
2947  }
2948  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2949  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2950  return true;
2951  }
2952  return false;
2953 }
2954 
2955 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2956  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2957  ra_exe_unit.simple_quals.empty());
2958 }
2959 
2961  const RelAlgExecutionUnit& ra_exe_unit_in,
2962  const std::vector<InputTableInfo>& table_infos,
2963  const Executor* executor,
2964  const ExecutorDeviceType device_type_in,
2965  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2966  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2967  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2968  const auto target_expr = ra_exe_unit.target_exprs[i];
2969  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2970  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2971  continue;
2972  }
2973  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2974  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2975  CHECK(arg);
2976  const auto& arg_ti = arg->get_type_info();
2977  // Avoid calling getExpressionRange for variable length types (string and array),
2978  // it'd trigger an assertion since that API expects to be called only for types
2979  // for which the notion of range is well-defined. A bit of a kludge, but the
2980  // logic to reject these types anyway is at lower levels in the stack and not
2981  // really worth pulling into a separate function for now.
2982  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2983  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2984  continue;
2985  }
2986  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2987  if (arg_range.getType() != ExpressionRangeType::Integer) {
2988  continue;
2989  }
2990  // When running distributed, the threshold for using the precise implementation
2991  // must be consistent across all leaves, otherwise we could have a mix of precise
2992  // and approximate bitmaps and we cannot aggregate them.
2993  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2994  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2995  const auto sub_bitmap_count =
2996  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2997  int64_t approx_bitmap_sz_bits{0};
2998  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
2999  if (error_rate) {
3000  CHECK(error_rate->get_type_info().get_type() == kINT);
3001  CHECK_GE(error_rate->get_constval().intval, 1);
3002  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
3003  } else {
3004  approx_bitmap_sz_bits = g_hll_precision_bits;
3005  }
3006  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
3007  arg_range.getIntMin(),
3008  approx_bitmap_sz_bits,
3009  true,
3010  device_type,
3011  sub_bitmap_count};
3012  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
3013  arg_range.getIntMin(),
3014  bitmap_sz_bits,
3015  false,
3016  device_type,
3017  sub_bitmap_count};
3018  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
3019  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
3020  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
3021  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
3022  target_exprs_owned.push_back(precise_count_distinct);
3023  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
3024  }
3025  }
3026  return ra_exe_unit;
3027 }
3028 
3030  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
3031  const std::vector<TargetMetaInfo>& targets_meta) {
3032  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
3033  render_info.targets.clear();
3034  for (size_t i = 0; i < targets_meta.size(); ++i) {
3035  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
3036  targets_meta[i].get_resname(),
3037  work_unit_target_exprs[i]->get_shared_ptr(),
3038  false));
3039  }
3040 }
3041 
3042 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3043  const CompilationOptions& co,
3044  const ExecutionOptions& eo) {
3046  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3047 }
3048 
3049 } // namespace
3050 
3052  const RelAlgExecutor::WorkUnit& work_unit,
3053  const std::vector<TargetMetaInfo>& targets_meta,
3054  const bool is_agg,
3055  const CompilationOptions& co_in,
3056  const ExecutionOptions& eo_in,
3057  RenderInfo* render_info,
3058  const int64_t queue_time_ms,
3059  const std::optional<size_t> previous_count) {
3061  auto timer = DEBUG_TIMER(__func__);
3062 
3063  auto co = co_in;
3064  auto eo = eo_in;
3065  ColumnCacheMap column_cache;
3066  if (is_window_execution_unit(work_unit.exe_unit)) {
3068  throw std::runtime_error("Window functions support is disabled");
3069  }
3071  co.allow_lazy_fetch = false;
3072  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
3073  }
3074  if (!eo.just_explain && eo.find_push_down_candidates) {
3075  // find potential candidates:
3076  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3077  if (!selected_filters.empty() || eo.just_calcite_explain) {
3078  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3079  }
3080  }
3081  if (render_info && render_info->isPotentialInSituRender()) {
3082  co.allow_lazy_fetch = false;
3083  }
3084  const auto body = work_unit.body;
3085  CHECK(body);
3086  auto it = leaf_results_.find(body->getId());
3087  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
3088  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3089  if (it != leaf_results_.end()) {
3090  executor_->addTransientStringLiterals(work_unit.exe_unit,
3091  executor_->row_set_mem_owner_);
3092  auto& aggregated_result = it->second;
3093  auto& result_rows = aggregated_result.rs;
3094  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3095  body->setOutputMetainfo(aggregated_result.targets_meta);
3096  if (render_info) {
3097  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3098  }
3099  return result;
3100  }
3101  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3102 
3104  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3105 
3106  // register query hint if query_dag_ is valid
3107  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3108  if (query_dag_) {
3109  auto candidate = query_dag_->getQueryHint(body);
3110  if (candidate) {
3111  ra_exe_unit.query_hint = *candidate;
3112  }
3113  }
3114  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3115  if (is_window_execution_unit(ra_exe_unit)) {
3116  CHECK_EQ(table_infos.size(), size_t(1));
3117  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3118  max_groups_buffer_entry_guess =
3119  table_infos.front().info.fragments.front().getNumTuples();
3120  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3121  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3122  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3123  ra_exe_unit.scan_limit = *previous_count;
3124  } else {
3125  // TODO(adb): enable bump allocator path for render queries
3126  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3127  ra_exe_unit.scan_limit = 0;
3128  ra_exe_unit.use_bump_allocator = true;
3129  } else if (eo.executor_type == ::ExecutorType::Extern) {
3130  ra_exe_unit.scan_limit = 0;
3131  } else if (!eo.just_explain) {
3132  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3133  if (filter_count_all) {
3134  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3135  }
3136  }
3137  }
3138  }
3139 
3141  const auto prefer_columnar = should_output_columnar(ra_exe_unit, render_info);
3142  if (prefer_columnar) {
3143  VLOG(1) << "Using columnar layout for projection as output size of "
3144  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3146  eo.output_columnar_hint = true;
3147  }
3148  }
3149 
3150  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3151  co.device_type,
3153  nullptr,
3154  executor_->getCatalog(),
3155  executor_->blockSize(),
3156  executor_->gridSize()),
3157  {}};
3158 
3159  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3160  const bool has_cardinality_estimation,
3161  const bool has_ndv_estimation) -> ExecutionResult {
3162  // Note that the groups buffer entry guess may be modified during query execution.
3163  // Create a local copy so we can track those changes if we need to attempt a retry
3164  // due to OOM
3165  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3166  try {
3167  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3168  is_agg,
3169  table_infos,
3170  ra_exe_unit,
3171  co,
3172  eo,
3173  cat_,
3174  render_info,
3175  has_cardinality_estimation,
3176  column_cache),
3177  targets_meta};
3178  } catch (const QueryExecutionError& e) {
3179  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3180  throw CardinalityEstimationRequired(/*range=*/0);
3181  }
3183  return handleOutOfMemoryRetry(
3184  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3185  targets_meta,
3186  is_agg,
3187  co,
3188  eo,
3189  render_info,
3191  queue_time_ms);
3192  }
3193  };
3194 
3195  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3196  try {
3197  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3198  auto card = cached_cardinality.second;
3199  if (cached_cardinality.first && card >= 0) {
3200  result = execute_and_handle_errors(
3201  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3202  } else {
3203  result = execute_and_handle_errors(
3204  max_groups_buffer_entry_guess,
3206  /*has_ndv_estimation=*/false);
3207  }
3208  } catch (const CardinalityEstimationRequired& e) {
3209  // check the cardinality cache
3210  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3211  auto card = cached_cardinality.second;
3212  if (cached_cardinality.first && card >= 0) {
3213  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3214  } else {
3215  const auto ndv_groups_estimation =
3216  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3217  const auto estimated_groups_buffer_entry_guess =
3218  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3219  : std::min(groups_approx_upper_bound(table_infos),
3221  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3222  result = execute_and_handle_errors(
3223  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3224  if (!(eo.just_validate || eo.just_explain)) {
3225  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3226  }
3227  }
3228  }
3229 
3230  result.setQueueTime(queue_time_ms);
3231  if (render_info) {
3232  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3233  if (render_info->isPotentialInSituRender()) {
3234  // return an empty result (with the same queue time, and zero render time)
3235  return {std::make_shared<ResultSet>(
3236  queue_time_ms,
3237  0,
3238  executor_->row_set_mem_owner_
3239  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3240  : nullptr),
3241  {}};
3242  }
3243  }
3244  return result;
3245 }
3246 
3247 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3248  const bool is_agg,
3249  const CompilationOptions& co,
3250  const ExecutionOptions& eo) {
3251  const auto count =
3252  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3253  kCOUNT,
3254  nullptr,
3255  false,
3256  nullptr);
3257  const auto count_all_exe_unit =
3258  work_unit.exe_unit.createCountAllExecutionUnit(count.get());
3259  size_t one{1};
3260  ResultSetPtr count_all_result;
3261  try {
3262  ColumnCacheMap column_cache;
3263  count_all_result =
3264  executor_->executeWorkUnit(one,
3265  is_agg,
3266  get_table_infos(work_unit.exe_unit, executor_),
3267  count_all_exe_unit,
3268  co,
3269  eo,
3270  cat_,
3271  nullptr,
3272  false,
3273  column_cache);
3274  } catch (const foreign_storage::ForeignStorageException& error) {
3275  throw error;
3276  } catch (const QueryMustRunOnCpu&) {
3277  // force a retry of the top level query on CPU
3278  throw;
3279  } catch (const std::exception& e) {
3280  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3281  return std::nullopt;
3282  }
3283  const auto count_row = count_all_result->getNextRow(false, false);
3284  CHECK_EQ(size_t(1), count_row.size());
3285  const auto& count_tv = count_row.front();
3286  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3287  CHECK(count_scalar_tv);
3288  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3289  CHECK(count_ptr);
3290  CHECK_GE(*count_ptr, 0);
3291  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3292  return std::max(count_upper_bound, size_t(1));
3293 }
3294 
3295 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3296  const auto& ra_exe_unit = work_unit.exe_unit;
3297  if (ra_exe_unit.input_descs.size() != 1) {
3298  return false;
3299  }
3300  const auto& table_desc = ra_exe_unit.input_descs.front();
3301  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3302  return false;
3303  }
3304  const int table_id = table_desc.getTableId();
3305  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3306  const auto comp_expr =
3307  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3308  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3309  return false;
3310  }
3311  const auto lhs = comp_expr->get_left_operand();
3312  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3313  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3314  return false;
3315  }
3316  const auto rhs = comp_expr->get_right_operand();
3317  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3318  if (!rhs_const) {
3319  return false;
3320  }
3321  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3322  if (cd->isVirtualCol) {
3323  CHECK_EQ("rowid", cd->columnName);
3324  return true;
3325  }
3326  }
3327  return false;
3328 }
3329 
3331  const RelAlgExecutor::WorkUnit& work_unit,
3332  const std::vector<TargetMetaInfo>& targets_meta,
3333  const bool is_agg,
3334  const CompilationOptions& co,
3335  const ExecutionOptions& eo,
3336  RenderInfo* render_info,
3337  const bool was_multifrag_kernel_launch,
3338  const int64_t queue_time_ms) {
3339  // Disable the bump allocator
3340  // Note that this will have basically the same affect as using the bump allocator for
3341  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3342  // path and the bump allocator path for kernel per fragment execution.
3343  auto ra_exe_unit_in = work_unit.exe_unit;
3344  ra_exe_unit_in.use_bump_allocator = false;
3345 
3346  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3347  co.device_type,
3349  nullptr,
3350  executor_->getCatalog(),
3351  executor_->blockSize(),
3352  executor_->gridSize()),
3353  {}};
3354 
3355  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3356  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3357  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3358  false,
3359  false,
3360  eo.allow_loop_joins,
3361  eo.with_watchdog,
3362  eo.jit_debug,
3363  false,
3366  false,
3367  false,
3372  eo.executor_type,
3374 
3375  if (was_multifrag_kernel_launch) {
3376  try {
3377  // Attempt to retry using the kernel per fragment path. The smaller input size
3378  // required may allow the entire kernel to execute in GPU memory.
3379  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3380  "kernels disabled.";
3381  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3382  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3383  ColumnCacheMap column_cache;
3384  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3385  is_agg,
3386  table_infos,
3387  ra_exe_unit,
3388  co,
3389  eo_no_multifrag,
3390  cat_,
3391  nullptr,
3392  true,
3393  column_cache),
3394  targets_meta};
3395  result.setQueueTime(queue_time_ms);
3396  } catch (const QueryExecutionError& e) {
3398  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3399  }
3400  }
3401 
3402  if (render_info) {
3403  render_info->setForceNonInSituData();
3404  }
3405 
3406  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3407  // Only reset the group buffer entry guess if we ran out of slots, which
3408  // suggests a
3409  // highly pathological input which prevented a good estimation of distinct tuple
3410  // count. For projection queries, this will force a per-fragment scan limit, which is
3411  // compatible with the CPU path
3412  VLOG(1) << "Resetting max groups buffer entry guess.";
3413  max_groups_buffer_entry_guess = 0;
3414 
3415  int iteration_ctr = -1;
3416  while (true) {
3417  iteration_ctr++;
3419  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3420  ColumnCacheMap column_cache;
3421  try {
3422  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3423  is_agg,
3424  table_infos,
3425  ra_exe_unit,
3426  co_cpu,
3427  eo_no_multifrag,
3428  cat_,
3429  nullptr,
3430  true,
3431  column_cache),
3432  targets_meta};
3433  } catch (const QueryExecutionError& e) {
3434  // Ran out of slots
3435  if (e.getErrorCode() < 0) {
3436  // Even the conservative guess failed; it should only happen when we group
3437  // by a huge cardinality array. Maybe we should throw an exception instead?
3438  // Such a heavy query is entirely capable of exhausting all the host memory.
3439  CHECK(max_groups_buffer_entry_guess);
3440  // Only allow two iterations of increasingly large entry guesses up to a maximum
3441  // of 512MB per column per kernel
3442  if (g_enable_watchdog || iteration_ctr > 1) {
3443  throw std::runtime_error("Query ran out of output slots in the result");
3444  }
3445  max_groups_buffer_entry_guess *= 2;
3446  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3447  "groups buffer entry "
3448  "guess equal to "
3449  << max_groups_buffer_entry_guess;
3450  } else {
3452  }
3453  continue;
3454  }
3455  result.setQueueTime(queue_time_ms);
3456  return result;
3457  }
3458  return result;
3459 }
3460 
3461 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3462  LOG(ERROR) << "Query execution failed with error "
3463  << getErrorMessageFromCode(error_code);
3464  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3465  // We ran out of GPU memory, this doesn't count as an error if the query is
3466  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3467  // --allow-cpu-retry.
3468  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3469  if (!g_allow_cpu_retry) {
3470  throw std::runtime_error(
3471  "Query ran out of GPU memory, unable to automatically retry on CPU");
3472  }
3473  return;
3474  }
3475  throw std::runtime_error(getErrorMessageFromCode(error_code));
3476 }
3477 
3478 namespace {
3479 struct ErrorInfo {
3480  const char* code{nullptr};
3481  const char* description{nullptr};
3482 };
3483 ErrorInfo getErrorDescription(const int32_t error_code) {
3484  // 'designated initializers' don't compile on Windows for std 17
3485  // They require /std:c++20. They been removed for the windows port.
3486  switch (error_code) {
3488  return {"ERR_DIV_BY_ZERO", "Division by zero"};
3490  return {"ERR_OUT_OF_GPU_MEM",
3491 
3492  "Query couldn't keep the entire working set of columns in GPU memory"};
3494  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
3496  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
3498  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
3500  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
3502  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
3504  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3505  "Columnar conversion not supported for variable length types"};
3507  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
3509  return {"ERR_STRING_CONST_IN_RESULTSET",
3510 
3511  "NONE ENCODED String types are not supported as input result set."};
3513  return {"ERR_OUT_OF_RENDER_MEM",
3514 
3515  "Insufficient GPU memory for query results in render output buffer "
3516  "sized by render-mem-bytes"};
3518  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3519  "Streaming-Top-N not supported in Render Query"};
3521  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3522  "Multiple distinct values encountered"};
3523  case Executor::ERR_GEOS:
3524  return {"ERR_GEOS", "ERR_GEOS"};
3526  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
3527 
3528  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
3529  default:
3530  return {nullptr, nullptr};
3531  }
3532 }
3533 
3534 } // namespace
3535 
3536 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3537  if (error_code < 0) {
3538  return "Ran out of slots in the query output buffer";
3539  }
3540  const auto errorInfo = getErrorDescription(error_code);
3541 
3542  if (errorInfo.code) {
3543  return errorInfo.code + ": "s + errorInfo.description;
3544  } else {
3545  return "Other error: code "s + std::to_string(error_code);
3546  }
3547 }
3548 
3551  VLOG(1) << "Running post execution callback.";
3552  (*post_execution_callback_)();
3553  }
3554 }
3555 
3557  const SortInfo& sort_info,
3558  const ExecutionOptions& eo) {
3559  const auto compound = dynamic_cast<const RelCompound*>(node);
3560  if (compound) {
3561  return createCompoundWorkUnit(compound, sort_info, eo);
3562  }
3563  const auto project = dynamic_cast<const RelProject*>(node);
3564  if (project) {
3565  return createProjectWorkUnit(project, sort_info, eo);
3566  }
3567  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3568  if (aggregate) {
3569  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3570  }
3571  const auto filter = dynamic_cast<const RelFilter*>(node);
3572  if (filter) {
3573  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3574  }
3575  LOG(FATAL) << "Unhandled node type: " << node->toString();
3576  return {};
3577 }
3578 
3579 namespace {
3580 
3582  auto sink = get_data_sink(ra);
3583  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3584  return join->getJoinType();
3585  }
3586  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3587  return JoinType::INNER;
3588  }
3589 
3590  return JoinType::INVALID;
3591 }
3592 
3593 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3594  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3595  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3596  return nullptr;
3597  }
3598  const auto equi_join_condition =
3599  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3600  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3601  return nullptr;
3602  }
3603  const auto both_are_null_condition =
3604  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3605  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3606  both_are_null_condition->size() != 2) {
3607  return nullptr;
3608  }
3609  const auto lhs_is_null =
3610  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3611  const auto rhs_is_null =
3612  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3613  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3614  rhs_is_null->getOperator() != kISNULL) {
3615  return nullptr;
3616  }
3617  CHECK_EQ(size_t(1), lhs_is_null->size());
3618  CHECK_EQ(size_t(1), rhs_is_null->size());
3619  CHECK_EQ(size_t(2), equi_join_condition->size());
3620  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3621  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3622  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3623  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3624  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3625  return nullptr;
3626  }
3627  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3628  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3629  RexDeepCopyVisitor deep_copy_visitor;
3630  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3631  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3632  eq_operands.emplace_back(lhs_op_copy.release());
3633  eq_operands.emplace_back(rhs_op_copy.release());
3634  return boost::make_unique<const RexOperator>(
3635  kBW_EQ, eq_operands, equi_join_condition->getType());
3636  }
3637  return nullptr;
3638 }
3639 
3640 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3641  const RexScalar* scalar) {
3642  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3643  if (condition && condition->getOperator() == kAND) {
3644  CHECK_GE(condition->size(), size_t(2));
3645  auto acc = get_bitwise_equals(condition->getOperand(0));
3646  if (!acc) {
3647  return nullptr;
3648  }
3649  for (size_t i = 1; i < condition->size(); ++i) {
3650  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3651  and_operands.emplace_back(std::move(acc));
3652  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3653  acc =
3654  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3655  }
3656  return acc;
3657  }
3658  return get_bitwise_equals(scalar);
3659 }
3660 
3661 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3662  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3663  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3664  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3665  ++nesting_level) {
3666  if (left_deep_join->getOuterCondition(nesting_level)) {
3667  join_types[nesting_level - 1] = JoinType::LEFT;
3668  }
3669  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
3670  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
3671  join_types[nesting_level - 1] = cur_level_join_type;
3672  }
3673  }
3674  return join_types;
3675 }
3676 
3677 template <class RA>
3678 std::vector<size_t> do_table_reordering(
3679  std::vector<InputDescriptor>& input_descs,
3680  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3681  const JoinQualsPerNestingLevel& left_deep_join_quals,
3682  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3683  const RA* node,
3684  const std::vector<InputTableInfo>& query_infos,
3685  const Executor* executor) {
3686  if (g_cluster) {
3687  // Disable table reordering in distributed mode. The aggregator does not have enough
3688  // information to break ties
3689  return {};
3690  }
3691  const auto& cat = *executor->getCatalog();
3692  for (const auto& table_info : query_infos) {
3693  if (table_info.table_id < 0) {
3694  continue;
3695  }
3696  const auto td = cat.getMetadataForTable(table_info.table_id);
3697  CHECK(td);
3698  if (table_is_replicated(td)) {
3699  return {};
3700  }
3701  }
3702  const auto input_permutation =
3703  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3704  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3705  std::tie(input_descs, input_col_descs, std::ignore) =
3706  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3707  return input_permutation;
3708 }
3709 
3711  const RelLeftDeepInnerJoin* left_deep_join) {
3712  std::vector<size_t> input_sizes;
3713  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3714  const auto inputs = get_node_output(left_deep_join->getInput(i));
3715  input_sizes.push_back(inputs.size());
3716  }
3717  return input_sizes;
3718 }
3719 
3720 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3721  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3722  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3723  for (const auto& qual : quals) {
3724  const auto rewritten_qual = rewrite_expr(qual.get());
3725  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3726  }
3727  return rewritten_quals;
3728 }
3729 
3730 } // namespace
3731 
3733  const RelCompound* compound,
3734  const SortInfo& sort_info,
3735  const ExecutionOptions& eo) {
3736  std::vector<InputDescriptor> input_descs;
3737  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3738  auto input_to_nest_level = get_input_nest_levels(compound, {});
3739  std::tie(input_descs, input_col_descs, std::ignore) =
3740  get_input_desc(compound, input_to_nest_level, {}, cat_);
3741  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3742  const auto query_infos = get_table_infos(input_descs, executor_);
3743  CHECK_EQ(size_t(1), compound->inputCount());
3744  const auto left_deep_join =
3745  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3746  JoinQualsPerNestingLevel left_deep_join_quals;
3747  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3748  : std::vector<JoinType>{get_join_type(compound)};
3749  std::vector<size_t> input_permutation;
3750  std::vector<size_t> left_deep_join_input_sizes;
3751  std::optional<unsigned> left_deep_tree_id;
3752  if (left_deep_join) {
3753  left_deep_tree_id = left_deep_join->getId();
3754  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3755  left_deep_join_quals = translateLeftDeepJoinFilter(
3756  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3758  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3759  join_types.end()) {
3760  input_permutation = do_table_reordering(input_descs,
3761  input_col_descs,
3762  left_deep_join_quals,
3763  input_to_nest_level,
3764  compound,
3765  query_infos,
3766  executor_);
3767  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3768  std::tie(input_descs, input_col_descs, std::ignore) =
3769  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3770  left_deep_join_quals = translateLeftDeepJoinFilter(
3771  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3772  }
3773  }
3774  RelAlgTranslator translator(cat_,
3775  query_state_,
3776  executor_,
3777  input_to_nest_level,
3778  join_types,
3779  now_,
3780  eo.just_explain);
3781  const auto scalar_sources =
3782  translate_scalar_sources(compound, translator, eo.executor_type);
3783  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3784  const auto quals_cf = translate_quals(compound, translator);
3785  const auto target_exprs = translate_targets(target_exprs_owned_,
3786  scalar_sources,
3787  groupby_exprs,
3788  compound,
3789  translator,
3790  eo.executor_type);
3791  auto query_hint = RegisteredQueryHint::defaults();
3792  if (query_dag_) {
3793  auto candidate = query_dag_->getQueryHint(compound);
3794  if (candidate) {
3795  query_hint = *candidate;
3796  }
3797  }
3798  CHECK_EQ(compound->size(), target_exprs.size());
3799  const RelAlgExecutionUnit exe_unit = {input_descs,
3800  input_col_descs,
3801  quals_cf.simple_quals,
3802  rewrite_quals(quals_cf.quals),
3803  left_deep_join_quals,
3804  groupby_exprs,
3805  target_exprs,
3806  nullptr,
3807  sort_info,
3808  0,
3809  query_hint,
3811  {},
3812  {},
3813  false,
3814  std::nullopt,
3815  query_state_};
3816  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3817  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3818  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3819  compound->setOutputMetainfo(targets_meta);
3820  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
3821  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
3822  left_deep_trees_info.emplace(left_deep_tree_id.value(),
3823  rewritten_exe_unit.join_quals);
3824  }
3825  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(compound,
3826  cat_,
3827  left_deep_tree_id,
3828  left_deep_trees_info,
3830  executor_,
3831  translator);
3832  if (is_extracted_dag_valid(dag_info)) {
3833  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
3834  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
3835  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
3836  }
3837  return {rewritten_exe_unit,
3838  compound,
3840  std::move(query_rewriter),
3841  input_permutation,
3842  left_deep_join_input_sizes};
3843 }
3844 
3845 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
3846  const RelAlgNode* node) {
3847  auto input_to_nest_level = get_input_nest_levels(node, {});
3848  const auto left_deep_join =
3849  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
3850  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3851  : std::vector<JoinType>{get_join_type(node)};
3852  return std::make_shared<RelAlgTranslator>(
3853  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
3854 }
3855 
3856 namespace {
3857 
3858 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3859  CHECK(qual_expr);
3860  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3861  if (!bin_oper || bin_oper->getOperator() != kAND) {
3862  return {qual_expr};
3863  }
3864  CHECK_GE(bin_oper->size(), size_t(2));
3865  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3866  for (size_t i = 1; i < bin_oper->size(); ++i) {
3867  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3868  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3869  }
3870  return lhs_cf;
3871 }
3872 
3873 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3874  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3875  const SQLOps sql_op) {
3876  CHECK(!factors.empty());
3877  auto acc = factors.front();
3878  for (size_t i = 1; i < factors.size(); ++i) {
3879  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3880  }
3881  return acc;
3882 }
3883 
3884 template <class QualsList>
3885 bool list_contains_expression(const QualsList& haystack,
3886  const std::shared_ptr<Analyzer::Expr>& needle) {
3887  for (const auto& qual : haystack) {
3888  if (*qual == *needle) {
3889  return true;
3890  }
3891  }
3892  return false;
3893 }
3894 
3895 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3896 // evaluations of `p` and allows use of the original form in joins if `p`
3897 // can be used for hash joins.
3898 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3899  const std::shared_ptr<Analyzer::Expr>& expr) {
3900  const auto expr_terms = qual_to_disjunctive_form(expr);
3901  CHECK_GE(expr_terms.size(), size_t(1));
3902  const auto& first_term = expr_terms.front();
3903  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3904  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3905  // First, collect the conjunctive components common to all the disjunctive components.
3906  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3907  for (const auto& first_term_factor : first_term_factors.quals) {
3908  bool is_common =
3909  expr_terms.size() > 1; // Only report common factors for disjunction.
3910  for (size_t i = 1; i < expr_terms.size(); ++i) {
3911  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3912  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3913  is_common = false;
3914  break;
3915  }
3916  }
3917  if (is_common) {
3918  common_factors.push_back(first_term_factor);
3919  }
3920  }
3921  if (common_factors.empty()) {
3922  return expr;
3923  }
3924  // Now that the common expressions are known, collect the remaining expressions.
3925  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3926  for (const auto& term : expr_terms) {
3927  const auto term_cf = qual_to_conjunctive_form(term);
3928  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3929  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3930  for (const auto& qual : term_cf.quals) {
3931  if (!list_contains_expression(common_factors, qual)) {
3932  remaining_quals.push_back(qual);
3933  }
3934  }
3935  if (!remaining_quals.empty()) {
3936  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3937  }
3938  }
3939  // Reconstruct the expression with the transformation applied.
3940  const auto common_expr = build_logical_expression(common_factors, kAND);
3941  if (remaining_terms.empty()) {
3942  return common_expr;
3943  }
3944  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3945  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3946 }
3947 
3948 } // namespace
3949 
3950 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3951  const RexScalar* join_condition,
3952  const std::vector<JoinType>& join_types,
3953  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3954  const bool just_explain) const {
3955  RelAlgTranslator translator(
3956  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3957  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3958  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3959  for (const auto rex_condition_component : rex_condition_cf) {
3960  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3961  const auto join_condition =
3963  bw_equals ? bw_equals.get() : rex_condition_component));
3964  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3965  join_condition_quals.insert(join_condition_quals.end(),
3966  join_condition_cf.quals.begin(),
3967  join_condition_cf.quals.end());
3968  join_condition_quals.insert(join_condition_quals.end(),
3969  join_condition_cf.simple_quals.begin(),
3970  join_condition_cf.simple_quals.end());
3971  }
3972  return combine_equi_join_conditions(join_condition_quals);
3973 }
3974 
3975 // Translate left deep join filter and separate the conjunctive form qualifiers
3976 // per nesting level. The code generated for hash table lookups on each level
3977 // must dominate its uses in deeper nesting levels.
3979  const RelLeftDeepInnerJoin* join,
3980  const std::vector<InputDescriptor>& input_descs,
3981  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3982  const bool just_explain) {
3983  const auto join_types = left_deep_join_types(join);
3984  const auto join_condition_quals = makeJoinQuals(
3985  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3986  MaxRangeTableIndexVisitor rte_idx_visitor;
3987  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3988  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3989  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3990  const auto outer_condition = join->getOuterCondition(rte_idx);
3991  if (outer_condition) {
3992  result[rte_idx - 1].quals =
3993  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3994  CHECK_LE(rte_idx, join_types.size());
3995  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3996  result[rte_idx - 1].type = JoinType::LEFT;
3997  continue;
3998  }
3999  for (const auto& qual : join_condition_quals) {
4000  if (visited_quals.count(qual)) {
4001  continue;
4002  }
4003  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4004  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4005  const auto it_ok = visited_quals.emplace(qual);
4006  CHECK(it_ok.second);
4007  result[rte_idx - 1].quals.push_back(qual);
4008  }
4009  }
4010  CHECK_LE(rte_idx, join_types.size());
4011  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4012  join_types[rte_idx - 1] == JoinType::SEMI ||
4013  join_types[rte_idx - 1] == JoinType::ANTI);
4014  result[rte_idx - 1].type = join_types[rte_idx - 1];
4015  }
4016  return result;
4017 }
4018 
4019 namespace {
4020 
4021 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
4022  const RelAlgNode* ra_node,
4023  const size_t nest_level,
4024  const std::vector<TargetMetaInfo>& in_metainfo,
4025  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4026  CHECK_LE(size_t(1), ra_node->inputCount());
4027  CHECK_GE(size_t(2), ra_node->inputCount());
4028  const auto input = ra_node->getInput(nest_level);
4029  const auto it_rte_idx = input_to_nest_level.find(input);
4030  CHECK(it_rte_idx != input_to_nest_level.end());
4031  const int rte_idx = it_rte_idx->second;
4032  const int table_id = table_id_from_ra(input);
4033  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
4034  const auto scan_ra = dynamic_cast<const RelScan*>(input);
4035  int input_idx = 0;
4036  for (const auto& input_meta : in_metainfo) {
4037  inputs.push_back(
4038  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
4039  table_id,
4040  scan_ra ? input_idx + 1 : input_idx,
4041  rte_idx));
4042  ++input_idx;
4043  }
4044  return inputs;
4045 }
4046 
4047 std::vector<Analyzer::Expr*> get_raw_pointers(
4048  std::vector<std::shared_ptr<Analyzer::Expr>> const& input) {
4049  std::vector<Analyzer::Expr*> output(input.size());
4050  auto const raw_ptr = [](auto& shared_ptr) { return shared_ptr.get(); };
4051  std::transform(input.cbegin(), input.cend(), output.begin(), raw_ptr);
4052  return output;
4053 }
4054 
4055 } // namespace
4056 
4058  const RelAggregate* aggregate,
4059  const SortInfo& sort_info,
4060  const bool just_explain) {
4061  std::vector<InputDescriptor> input_descs;
4062  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4063  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4064  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4065  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4066  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4067  const auto join_type = get_join_type(aggregate);
4068 
4069  RelAlgTranslator translator(cat_,
4070  query_state_,
4071  executor_,
4072  input_to_nest_level,
4073  {join_type},
4074  now_,
4075  just_explain);
4076  CHECK_EQ(size_t(1), aggregate->inputCount());
4077  const auto source = aggregate->getInput(0);
4078  const auto& in_metainfo = source->getOutputMetainfo();
4079  const auto scalar_sources =
4080  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4081  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4082  const auto target_exprs = translate_targets(
4083  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
4084  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4085  aggregate->setOutputMetainfo(targets_meta);
4086  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(aggregate,
4087  cat_,
4088  std::nullopt,
4091  executor_,
4092  translator);
4093  auto query_hint = RegisteredQueryHint::defaults();
4094  if (query_dag_) {
4095  auto candidate = query_dag_->getQueryHint(aggregate);
4096  if (candidate) {
4097  query_hint = *candidate;
4098  }
4099  }
4100  return {RelAlgExecutionUnit{input_descs,
4101  input_col_descs,
4102  {},
4103  {},
4104  {},
4105  groupby_exprs,
4106  target_exprs,
4107  nullptr,
4108  sort_info,
4109  0,
4110  query_hint,
4111  dag_info.extracted_dag,
4112  dag_info.hash_table_plan_dag,
4113  dag_info.table_id_to_node_map,
4114  false,
4115  std::nullopt,
4116  query_state_},
4117  aggregate,
4119  nullptr};
4120 }
4121 
4123  const RelProject* project,
4124  const SortInfo& sort_info,
4125  const ExecutionOptions& eo) {
4126  std::vector<InputDescriptor> input_descs;
4127  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4128  auto input_to_nest_level = get_input_nest_levels(project, {});
4129  std::tie(input_descs, input_col_descs, std::ignore) =
4130  get_input_desc(project, input_to_nest_level, {}, cat_);
4131  const auto query_infos = get_table_infos(input_descs, executor_);
4132 
4133  const auto left_deep_join =
4134  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4135  JoinQualsPerNestingLevel left_deep_join_quals;
4136  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4137  : std::vector<JoinType>{get_join_type(project)};
4138  std::vector<size_t> input_permutation;
4139  std::vector<size_t> left_deep_join_input_sizes;
4140  std::optional<unsigned> left_deep_tree_id;
4141  if (left_deep_join) {
4142  left_deep_tree_id = left_deep_join->getId();
4143  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4144  const auto query_infos = get_table_infos(input_descs, executor_);
4145  left_deep_join_quals = translateLeftDeepJoinFilter(
4146  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4148  input_permutation = do_table_reordering(input_descs,
4149  input_col_descs,
4150  left_deep_join_quals,
4151  input_to_nest_level,
4152  project,
4153  query_infos,
4154  executor_);
4155  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4156  std::tie(input_descs, input_col_descs, std::ignore) =
4157  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4158  left_deep_join_quals = translateLeftDeepJoinFilter(
4159  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4160  }
4161  }
4162 
4163  RelAlgTranslator translator(cat_,
4164  query_state_,
4165  executor_,
4166  input_to_nest_level,
4167  join_types,
4168  now_,
4169  eo.just_explain);
4170  const auto target_exprs_owned =
4171  translate_scalar_sources(project, translator, eo.executor_type);
4172  target_exprs_owned_.insert(
4173  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4174  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4175  auto query_hint = RegisteredQueryHint::defaults();
4176  if (query_dag_) {
4177  auto candidate = query_dag_->getQueryHint(project);
4178  if (candidate) {
4179  query_hint = *candidate;
4180  }
4181  }
4182  const RelAlgExecutionUnit exe_unit = {input_descs,
4183  input_col_descs,
4184  {},
4185  {},
4186  left_deep_join_quals,
4187  {nullptr},
4188  target_exprs,
4189  nullptr,
4190  sort_info,
4191  0,
4192  query_hint,
4194  {},
4195  {},
4196  false,
4197  std::nullopt,
4198  query_state_};
4199  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4200  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4201  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4202  project->setOutputMetainfo(targets_meta);
4203  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4204  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4205  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4206  rewritten_exe_unit.join_quals);
4207  }
4208  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(project,
4209  cat_,
4210  left_deep_tree_id,
4211  left_deep_trees_info,
4213  executor_,
4214  translator);
4215  if (is_extracted_dag_valid(dag_info)) {
4216  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
4217  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
4218  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
4219  }
4220  return {rewritten_exe_unit,
4221  project,
4223  std::move(query_rewriter),
4224  input_permutation,
4225  left_deep_join_input_sizes};
4226 }
4227 
4228 namespace {
4229 
4230 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4231  RelAlgNode const* input_node) {
4232  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4233  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4234  const int negative_node_id = -input_node->getId();
4235  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4236  target_exprs.reserve(tmis.size());
4237  for (size_t i = 0; i < tmis.size(); ++i) {
4238  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4239  tmis[i].get_type_info(), negative_node_id, i, 0));
4240  }
4241  return target_exprs;
4242 }
4243 
4244 } // namespace
4245 
4247  const RelLogicalUnion* logical_union,
4248  const SortInfo& sort_info,
4249  const ExecutionOptions& eo) {
4250  std::vector<InputDescriptor> input_descs;
4251  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4252  // Map ra input ptr to index (0, 1).
4253  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4254  std::tie(input_descs, input_col_descs, std::ignore) =
4255  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4256  const auto query_infos = get_table_infos(input_descs, executor_);
4257  auto const max_num_tuples =
4258  std::accumulate(query_infos.cbegin(),
4259  query_infos.cend(),
4260  size_t(0),
4261  [](auto max, auto const& query_info) {
4262  return std::max(max, query_info.info.getNumTuples());
4263  });
4264 
4265  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4266  for (auto& pair : input_to_nest_level) {
4267  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
4268  }
4269 
4270  RelAlgTranslator translator(
4271  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4272 
4273  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4274  // may differ on StringDictionaries.
4275  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4276  for (unsigned i = 0; i < 2; ++i) {
4277  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4278  CHECK(!input_exprs_owned.empty()) << "No metainfo found for input node(" << i << ") "
4279  << logical_union->getInput(i)->toString();
4280  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4281  for (auto& input_expr : input_exprs_owned) {
4282  VLOG(3) << " " << input_expr->toString();
4283  }
4284  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4285  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4286  }
4287 
4288  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4289  << " input_col_descs=" << shared::printContainer(input_col_descs)
4290  << " target_exprs.size()=" << target_exprs_pair[0].size()
4291  << " max_num_tuples=" << max_num_tuples;
4292 
4293  const RelAlgExecutionUnit exe_unit = {input_descs,
4294  input_col_descs,
4295  {}, // quals_cf.simple_quals,
4296  {}, // rewrite_quals(quals_cf.quals),
4297  {},
4298  {nullptr},
4299  target_exprs_pair[0],
4300  nullptr,
4301  sort_info,
4302  max_num_tuples,
4305  {},
4306  {},
4307  false,
4308  logical_union->isAll(),
4309  query_state_,
4310  target_exprs_pair[1]};
4311  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4312  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4313 
4314  RelAlgNode const* input0 = logical_union->getInput(0);
4315  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4316  logical_union->setOutputMetainfo(
4317  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4318  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4319  logical_union->setOutputMetainfo(
4320  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4321  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4322  logical_union->setOutputMetainfo(
4323  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4324  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4325  logical_union->setOutputMetainfo(
4326  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4327  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4328  logical_union->setOutputMetainfo(
4329  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4330  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4331  logical_union->setOutputMetainfo(
4332  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4333  } else if (dynamic_cast<const RelSort*>(input0)) {
4334  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4335  } else {
4336  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4337  }
4338  VLOG(3) << "logical_union->getOutputMetainfo()="
4339  << shared::printContainer(logical_union->getOutputMetainfo())
4340  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4341  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4342 
4343  return {rewritten_exe_unit,
4344  logical_union,
4346  std::move(query_rewriter)};
4347 }
4348 
4350  const RelTableFunction* rel_table_func,
4351  const bool just_explain,
4352  const bool is_gpu) {
4353  std::vector<InputDescriptor> input_descs;
4354  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4355  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4356  std::tie(input_descs, input_col_descs, std::ignore) =
4357  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4358  const auto query_infos = get_table_infos(input_descs, executor_);
4359  RelAlgTranslator translator(
4360  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4361  const auto input_exprs_owned = translate_scalar_sources(
4362  rel_table_func, translator, ::ExecutorType::TableFunctions);
4363  target_exprs_owned_.insert(
4364  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4365  auto input_exprs = get_raw_pointers(input_exprs_owned);
4366 
4367  const auto table_function_impl_and_type_infos = [=]() {
4368  if (is_gpu) {
4369  try {
4370  return bind_table_function(
4371  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4372  } catch (ExtensionFunctionBindingError& e) {
4373  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4374  << " Redirecting " << rel_table_func->getFunctionName()
4375  << " step to run on CPU.";
4376  throw QueryMustRunOnCpu();
4377  }
4378  } else {
4379  try {
4380  return bind_table_function(
4381  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4382  } catch (ExtensionFunctionBindingError& e) {
4383  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4384  throw;
4385  }
4386  }
4387  }();
4388  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4389  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4390 
4391  size_t output_row_sizing_param = 0;
4392  if (table_function_impl
4393  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
4394  const auto parameter_index =
4395  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4396  CHECK_GT(parameter_index, size_t(0));
4397  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4398  const auto parameter_expr =
4399  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4400  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4401  if (!parameter_expr_literal) {
4402  throw std::runtime_error(
4403  "Provided output buffer sizing parameter is not a literal. Only literal "
4404  "values are supported with output buffer sizing configured table "
4405  "functions.");
4406  }
4407  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4408  if (literal_val < 0) {
4409  throw std::runtime_error("Provided output sizing parameter " +
4410  std::to_string(literal_val) +
4411  " must be positive integer.");
4412  }
4413  output_row_sizing_param = static_cast<size_t>(literal_val);
4414  } else {
4415  // RowMultiplier not specified in the SQL query. Set it to 1
4416  output_row_sizing_param = 1; // default value for RowMultiplier
4417  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
4418  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4419  makeExpr<Analyzer::Constant>(kINT, false, d);
4420  // Push the constant 1 to input_exprs
4421  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4422  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4423  }
4424  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4425  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4426  } else {
4427  UNREACHABLE();
4428  }
4429 
4430  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4431  size_t input_index = 0;
4432  size_t arg_index = 0;
4433  const auto table_func_args = table_function_impl.getInputArgs();
4434  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
4435  for (const auto& ti : table_function_type_infos) {
4436  if (ti.is_column_list()) {
4437  for (int i = 0; i < ti.get_dimension(); i++) {
4438  auto& input_expr = input_exprs[input_index];
4439  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4440  CHECK(col_var);
4441 
4442  // avoid setting type info to ti here since ti doesn't have all the
4443  // properties correctly set
4444  auto type_info = input_expr->get_type_info();
4445  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4446  type_info.set_type(ti.get_type()); // set type to column list
4447  type_info.set_dimension(ti.get_dimension());
4448  input_expr->set_type_info(type_info);
4449 
4450  input_col_exprs.push_back(col_var);
4451  input_index++;
4452  }
4453  } else if (ti.is_column()) {
4454  auto& input_expr = input_exprs[input_index];
4455  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4456  CHECK(col_var);
4457 
4458  // same here! avoid setting type info to ti since it doesn't have all the
4459  // properties correctly set
4460  auto type_info = input_expr->get_type_info();
4461  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4462  type_info.set_type(ti.get_type()); // set type to column
4463  input_expr->set_type_info(type_info);
4464 
4465  input_col_exprs.push_back(col_var);
4466  input_index++;
4467  } else {
4468  auto input_expr = input_exprs[input_index];
4469  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
4470  if (ext_func_arg_ti != input_expr->get_type_info()) {
4471  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
4472  }
4473  input_index++;
4474  }
4475  arg_index++;
4476  }
4477  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4478  std::vector<Analyzer::Expr*> table_func_outputs;
4479  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4480  auto ti = table_function_impl.getOutputSQLType(i);
4481  if (ti.is_dict_encoded_string()) {
4482  auto p = table_function_impl.getInputID(i);
4483 
4484  int32_t input_pos = p.first;
4485  // Iterate over the list of arguments to compute the offset. Use this offset to
4486  // get the corresponding input
4487  int32_t offset = 0;
4488  for (int j = 0; j < input_pos; j++) {
4489  const auto ti = table_function_type_infos[j];
4490  offset += ti.is_column_list() ? ti.get_dimension() : 1;
4491  }
4492  input_pos = offset + p.second;
4493 
4494  CHECK_LT(input_pos, input_exprs.size());
4495  int32_t comp_param = input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4496  ti.set_comp_param(comp_param);
4497  }
4498  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4499  table_func_outputs.push_back(target_exprs_owned_.back().get());
4500  }
4501  const TableFunctionExecutionUnit exe_unit = {
4502  input_descs,
4503  input_col_descs,
4504  input_exprs, // table function inputs
4505  input_col_exprs, // table function column inputs (duplicates w/ above)
4506  table_func_outputs, // table function projected exprs
4507  output_row_sizing_param, // output buffer sizing param
4508  table_function_impl};
4509  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4510  rel_table_func->setOutputMetainfo(targets_meta);
4511  return {exe_unit, rel_table_func};
4512 }
4513 
4514 namespace {
4515 
4516 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4518  const RelAlgTranslator& translator,
4519  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4520  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4521  std::vector<TargetMetaInfo> in_metainfo;
4522  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4523  const auto data_sink_node = get_data_sink(filter);
4524  auto input_it = inputs_owned.begin();
4525  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4526  const auto source = data_sink_node->getInput(nest_level);
4527  const auto scan_source = dynamic_cast<const RelScan*>(source);
4528  if (scan_source) {
4529  CHECK(source->getOutputMetainfo().empty());
4530  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4531  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4532  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4533  }
4534  const auto source_metadata =
4535  get_targets_meta(scan_source, get_raw_pointers(scalar_sources_owned));
4536  in_metainfo.insert(
4537  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4538  exprs_owned.insert(
4539  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4540  } else {
4541  const auto& source_metadata = source->getOutputMetainfo();
4542  input_it += source_metadata.size();
4543  in_metainfo.insert(
4544  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4545  const auto scalar_sources_owned = synthesize_inputs(
4546  data_sink_node, nest_level, source_metadata, input_to_nest_level);
4547  exprs_owned.insert(
4548  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4549  }
4550  }
4551  return std::make_pair(in_metainfo, exprs_owned);
4552 }
4553 
4554 } // namespace
4555 
4557  const SortInfo& sort_info,
4558  const bool just_explain) {
4559  CHECK_EQ(size_t(1), filter->inputCount());
4560  std::vector<InputDescriptor> input_descs;
4561  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4562  std::vector<TargetMetaInfo> in_metainfo;
4563  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4564  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4565 
4566  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4567  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4568  get_input_desc(filter, input_to_nest_level, {}, cat_);
4569  const auto join_type = get_join_type(filter);
4570  RelAlgTranslator translator(cat_,
4571  query_state_,
4572  executor_,
4573  input_to_nest_level,
4574  {join_type},
4575  now_,
4576  just_explain);
4577  std::tie(in_metainfo, target_exprs_owned) =
4578  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4579  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4580  const auto qual = fold_expr(filter_expr.get());
4581  target_exprs_owned_.insert(
4582  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4583  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4584  filter->setOutputMetainfo(in_metainfo);
4585  const auto rewritten_qual = rewrite_expr(qual.get());
4586  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(filter,
4587  cat_,
4588  std::nullopt,
4591  executor_,
4592  translator);
4593  auto query_hint = RegisteredQueryHint::defaults();
4594  if (query_dag_) {
4595  auto candidate = query_dag_->getQueryHint(filter);
4596  if (candidate) {
4597  query_hint = *candidate;
4598  }
4599  }
4600  return {{input_descs,
4601  input_col_descs,
4602  {},
4603  {rewritten_qual ? rewritten_qual : qual},
4604  {},
4605  {nullptr},
4606  target_exprs,
4607  nullptr,
4608  sort_info,
4609  0,
4610  query_hint,
4611  dag_info.extracted_dag,
4612  dag_info.hash_table_plan_dag,
4613  dag_info.table_id_to_node_map},
4614  filter,
4616  nullptr};
4617 }
4618 
const size_t getGroupByCount() const
bool isAll() const
bool should_output_columnar(const RelAlgExecutionUnit &ra_exe_unit, const RenderInfo *render_info)
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
static void invalidateCachesByTable(size_t table_key)
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:1502
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:219
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:524
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
std::string cat(Ts &&...args)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1260
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1172
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
Definition: sqltypes.h:49
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3183
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ExecutorDeviceType
ErrorInfo getErrorDescription(const int32_t error_code)
void setForceNonInSituData()
Definition: RenderInfo.cpp:44
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:77
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:226
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1085
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:205
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
size_t size() const override
std::vector< size_t > outer_fragment_indices
static SpeculativeTopNBlacklist speculative_topn_blacklist_
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< unsigned > visitLeftDeepInnerJoin(const RelLeftDeepInnerJoin *left_deep_join_tree) const override
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:36
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
const std::list< Analyzer::OrderEntry > order_entries
PersistentStorageMgr * getPersistentStorageMgr() const
Definition: DataMgr.cpp:644
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:97
const RexScalar * getCondition() const
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5958
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:333
Definition: sqldefs.h:38
const QueryPlan extracted_dag
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1976
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
#define UNREACHABLE()
Definition: Logger.h:255
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1064
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
int64_t int_value_from_numbers_ptr(const SQLTypeInfo &type_info, const int8_t *data)
void prepare_string_dictionaries(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1178
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1174
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:4107
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1615
#define CHECK_GT(x, y)
Definition: Logger.h:223
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
double running_query_interrupt_freq
foreign_storage::ForeignStorageMgr * getForeignStorageMgr() const
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:301
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:40
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:472
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:68
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1175
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:70
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1176
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1173
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:76
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1164
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_from_table_reordering
Definition: Execute.cpp:86
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Classes representing a parse tree.
int get_logical_size() const
Definition: sqltypes.h:349
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:225
ExecutorType executor_type
bool g_enable_system_tables
Definition: SysCatalog.cpp:65
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1168
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
int count