OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
20 #include "Parser/ParserNode.h"
37 #include "QueryEngine/RexVisitor.h"
41 #include "Shared/measure.h"
42 #include "Shared/misc.h"
43 #include "Shared/shard_key.h"
44 
45 #include <boost/algorithm/cxx11/any_of.hpp>
46 #include <boost/range/adaptor/reversed.hpp>
47 
48 #include <algorithm>
49 #include <functional>
50 #include <numeric>
51 
53 bool g_enable_interop{false};
54 bool g_enable_union{false};
56 
57 extern bool g_enable_bump_allocator;
59 
60 namespace {
61 
62 bool node_is_aggregate(const RelAlgNode* ra) {
63  const auto compound = dynamic_cast<const RelCompound*>(ra);
64  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
65  return ((compound && compound->isAggregate()) || aggregate);
66 }
67 
68 std::unordered_set<PhysicalInput> get_physical_inputs(
70  const RelAlgNode* ra) {
71  auto phys_inputs = get_physical_inputs(ra);
72  std::unordered_set<PhysicalInput> phys_inputs2;
73  for (auto& phi : phys_inputs) {
74  phys_inputs2.insert(
75  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
76  }
77  return phys_inputs2;
78 }
79 
80 void set_parallelism_hints(const RelAlgNode& ra_node,
81  const Catalog_Namespace::Catalog& catalog) {
82  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
83  parallelism_hints_per_table;
84  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
85  int table_id = physical_input.table_id;
86  auto table = catalog.getMetadataForTable(table_id, false);
87  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
88  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
89  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
90  auto foreign_table = catalog.getForeignTable(table_id);
91  for (const auto& fragment :
92  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
93  Chunk_NS::Chunk chunk{col_desc};
94  ChunkKey chunk_key = {
95  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
96  // do not include chunk hints that are in CPU memory
97  if (!chunk.isChunkOnDevice(
98  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
99  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
101  fragment.fragmentId});
102  }
103  }
104  }
105  }
106  if (!parallelism_hints_per_table.empty()) {
107  auto foreign_storage_mgr =
109  CHECK(foreign_storage_mgr);
110  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
111  }
112 }
113 
115  const Catalog_Namespace::Catalog& catalog) {
116  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
117  int table_id = physical_input.table_id;
118  auto table = catalog.getMetadataForTable(table_id, false);
119  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
120  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
121  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
122  auto foreign_table = catalog.getForeignTable(table_id);
123  if (col_desc->columnType.is_dict_encoded_type()) {
124  CHECK(foreign_table->fragmenter != nullptr);
125  for (const auto& fragment :
126  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
127  ChunkKey chunk_key = {
128  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
129  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
130  CHECK(metadata_map.find(col_id) != metadata_map.end());
131  if (foreign_storage::is_metadata_placeholder(*(metadata_map.at(col_id)))) {
132  // When this goes out of scope it will stay in CPU cache but become
133  // evictable
134  std::shared_ptr<Chunk_NS::Chunk> chunk =
135  Chunk_NS::Chunk::getChunk(col_desc,
136  &(catalog.getDataMgr()),
137  chunk_key,
139  0,
140  0,
141  0);
142  }
143  }
144  }
145  }
146  }
147 }
148 
150  const Catalog_Namespace::Catalog& catalog) {
151  // Iterate through ra_node inputs for types that need to be loaded pre-execution
152  // If they do not have valid metadata, load them into CPU memory to generate
153  // the metadata and leave them ready to be used by the query
154  set_parallelism_hints(ra_node, catalog);
155  prepare_string_dictionaries(ra_node, catalog);
156 }
157 
159  return !dag.contain_not_supported_rel_node &&
160  dag.extracted_dag.compare(EMPTY_QUERY_PLAN) != 0;
161 }
162 
163 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
164  public:
165  std::vector<unsigned> visitLeftDeepInnerJoin(
166  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
167  return {left_deep_join_tree->getId()};
168  }
169 
170  protected:
171  std::vector<unsigned> aggregateResult(
172  const std::vector<unsigned>& aggregate,
173  const std::vector<unsigned>& next_result) const override {
174  auto result = aggregate;
175  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
176  return result;
177  }
178 };
179 
180 } // namespace
181 
183  const ExecutionOptions& eo) {
184  if (eo.find_push_down_candidates) {
185  return 0;
186  }
187 
188  if (eo.just_explain) {
189  return 0;
190  }
191 
192  CHECK(query_dag_);
193 
194  query_dag_->resetQueryExecutionState();
195  const auto& ra = query_dag_->getRootNode();
196 
197  auto lock = executor_->acquireExecuteMutex();
198  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
199  const auto phys_inputs = get_physical_inputs(cat_, &ra);
200  const auto phys_table_ids = get_physical_table_inputs(&ra);
201  executor_->setCatalog(&cat_);
202  executor_->setupCaching(phys_inputs, phys_table_ids);
203 
204  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
205  auto ed_seq = RaExecutionSequence(&ra);
206 
207  if (!getSubqueries().empty()) {
208  return 0;
209  }
210 
211  CHECK(!ed_seq.empty());
212  if (ed_seq.size() > 1) {
213  return 0;
214  }
215 
218  executor_->setCatalog(&cat_);
219  executor_->temporary_tables_ = &temporary_tables_;
220 
222  auto exec_desc_ptr = ed_seq.getDescriptor(0);
223  CHECK(exec_desc_ptr);
224  auto& exec_desc = *exec_desc_ptr;
225  const auto body = exec_desc.getBody();
226  if (body->isNop()) {
227  return 0;
228  }
229 
230  const auto project = dynamic_cast<const RelProject*>(body);
231  if (project) {
232  auto work_unit =
233  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
234 
235  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
236  executor_);
237  }
238 
239  const auto compound = dynamic_cast<const RelCompound*>(body);
240  if (compound) {
241  if (compound->isDeleteViaSelect()) {
242  return 0;
243  } else if (compound->isUpdateViaSelect()) {
244  return 0;
245  } else {
246  if (compound->isAggregate()) {
247  return 0;
248  }
249 
250  const auto work_unit =
251  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
252 
253  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
254  executor_);
255  }
256  }
257 
258  return 0;
259 }
260 
262  const ExecutionOptions& eo,
263  const bool just_explain_plan,
264  RenderInfo* render_info) {
265  CHECK(query_dag_);
266  auto timer = DEBUG_TIMER(__func__);
268 
269  auto run_query = [&](const CompilationOptions& co_in) {
270  auto execution_result =
271  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
273  VLOG(1) << "Running post execution callback.";
274  (*post_execution_callback_)();
275  }
276  return execution_result;
277  };
278 
279  try {
280  return run_query(co);
281  } catch (const QueryMustRunOnCpu&) {
282  if (!g_allow_cpu_retry) {
283  throw;
284  }
285  }
286  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
287  auto co_cpu = CompilationOptions::makeCpuOnly(co);
288 
289  if (render_info) {
290  render_info->setForceNonInSituData();
291  }
292  return run_query(co_cpu);
293 }
294 
296  const ExecutionOptions& eo,
297  const bool just_explain_plan,
298  RenderInfo* render_info) {
300  auto timer = DEBUG_TIMER(__func__);
301  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
302 
303  query_dag_->resetQueryExecutionState();
304  const auto& ra = query_dag_->getRootNode();
305 
306  // capture the lock acquistion time
307  auto clock_begin = timer_start();
309  executor_->resetInterrupt();
310  }
311  std::string query_session{""};
312  std::string query_str{"N/A"};
313  std::string query_submitted_time{""};
314  // gather necessary query's info
315  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
316  query_session = query_state_->getConstSessionInfo()->get_session_id();
317  query_str = query_state_->getQueryStr();
318  query_submitted_time = query_state_->getQuerySubmittedTime();
319  }
320 
321  bool acquire_spin_lock = false;
322  auto validate_or_explain_query =
323  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
324  ScopeGuard clearRuntimeInterruptStatus = [this,
325  &query_session,
326  &render_info,
327  &eo,
328  &acquire_spin_lock,
329  &query_submitted_time,
330  &validate_or_explain_query] {
331  // reset the runtime query interrupt status after the end of query execution
332  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
333  !validate_or_explain_query) {
334  executor_->clearQuerySessionStatus(
335  query_session, query_submitted_time, acquire_spin_lock);
336  }
337  };
338 
339  if (!render_info && eo.allow_runtime_query_interrupt && !validate_or_explain_query) {
340  // if we reach here, the current query which was waiting an idle executor
341  // within the dispatch queue is now scheduled to the specific executor
342  // (not UNITARY_EXECUTOR)
343  // so we update the query session's status with the executor that takes this query
344  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
345  query_session, query_str, query_submitted_time);
346 
347  // For now we can run a single query at a time, so if other query is already
348  // executed by different executor (i.e., when we turn on parallel executor),
349  // we can check the possibility of running this query by using the spinlock.
350  // If it fails to acquire a lock, it sleeps {g_pending_query_interrupt_freq} ms.
351  // And then try to get the lock (and again and again...)
352  if (!query_session.empty()) {
353  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
354  try {
355  executor_->checkPendingQueryStatus(query_session);
356  } catch (QueryExecutionError& e) {
358  throw std::runtime_error(
359  "Query execution has been interrupted (pending query)");
360  }
361  throw e;
362  } catch (...) {
363  throw std::runtime_error("Checking pending query status failed: unknown error");
364  }
365  // here it fails to acquire the lock, so sleep...
366  std::this_thread::sleep_for(
367  std::chrono::milliseconds(g_pending_query_interrupt_freq));
368  }
369  acquire_spin_lock = true;
370  // now the query is going to be executed, so update the status as "RUNNING"
371  executor_->updateQuerySessionStatus(query_state_,
372  QuerySessionStatus::QueryStatus::RUNNING);
373  }
374  }
375  auto acquire_execute_mutex = [](Executor * executor) -> auto {
376  auto ret = executor->acquireExecuteMutex();
377  return ret;
378  };
379  // now we get the spinlock that means this query is ready to run by the executor
380  // so we acquire executor lock in here to make sure that this executor holds
381  // all necessary resources and at the same time protect them against other executor
382  auto lock = acquire_execute_mutex(executor_);
383 
384  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
385  !validate_or_explain_query) {
386  // check whether this query session is "already" interrupted
387  // this case occurs when there is very short gap between being interrupted and
388  // taking the execute lock
389  // for instance, the session can fire multiple queries that other queries are waiting
390  // in the spinlock but the user can request the query interruption
391  // if so we have to remove "all" queries initiated by this session and we do in here
392  // without running the query
393  try {
394  executor_->checkPendingQueryStatus(query_session);
395  } catch (QueryExecutionError& e) {
397  throw std::runtime_error("Query execution has been interrupted (pending query)");
398  }
399  throw e;
400  } catch (...) {
401  throw std::runtime_error("Checking pending query status failed: unknown error");
402  }
403  }
404 
405  // Notify foreign tables to load prior to caching
407 
408  int64_t queue_time_ms = timer_stop(clock_begin);
409  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
410  const auto phys_inputs = get_physical_inputs(cat_, &ra);
411  const auto phys_table_ids = get_physical_table_inputs(&ra);
412  executor_->setCatalog(&cat_);
413  executor_->setupCaching(phys_inputs, phys_table_ids);
414 
415  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
416  auto ed_seq = RaExecutionSequence(&ra);
417 
418  if (just_explain_plan) {
419  std::stringstream ss;
420  std::vector<const RelAlgNode*> nodes;
421  for (size_t i = 0; i < ed_seq.size(); i++) {
422  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
423  }
424  size_t ctr = nodes.size();
425  size_t tab_ctr = 0;
426  for (auto& body : boost::adaptors::reverse(nodes)) {
427  const auto index = ctr--;
428  const auto tabs = std::string(tab_ctr++, '\t');
429  CHECK(body);
430  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
431  if (auto sort = dynamic_cast<const RelSort*>(body)) {
432  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
433  }
434  if (dynamic_cast<const RelProject*>(body) ||
435  dynamic_cast<const RelCompound*>(body)) {
436  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
437  ss << tabs << " : " << join->toString() << "\n";
438  }
439  }
440  }
441  const auto& subqueries = getSubqueries();
442  if (!subqueries.empty()) {
443  ss << "Subqueries: "
444  << "\n";
445  for (const auto& subquery : subqueries) {
446  const auto ra = subquery->getRelAlg();
447  ss << "\t" << ra->toString() << "\n";
448  }
449  }
450  auto rs = std::make_shared<ResultSet>(ss.str());
451  return {rs, {}};
452  }
453 
454  if (render_info) {
455  // set render to be non-insitu in certain situations.
456  if (!render_info->disallow_in_situ_only_if_final_ED_is_aggregate &&
457  ed_seq.size() > 1) {
458  // old logic
459  // disallow if more than one ED
460  render_info->setInSituDataIfUnset(false);
461  }
462  }
463 
464  if (eo.find_push_down_candidates) {
465  // this extra logic is mainly due to current limitations on multi-step queries
466  // and/or subqueries.
468  ed_seq, co, eo, render_info, queue_time_ms);
469  }
470  timer_setup.stop();
471 
472  // Dispatch the subqueries first
473  for (auto subquery : getSubqueries()) {
474  const auto subquery_ra = subquery->getRelAlg();
475  CHECK(subquery_ra);
476  if (subquery_ra->hasContextData()) {
477  continue;
478  }
479  // Execute the subquery and cache the result.
480  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
481  RaExecutionSequence subquery_seq(subquery_ra);
482  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
483  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
484  }
485  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
486 }
487 
489  AggregatedColRange agg_col_range_cache;
490  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
491  return executor_->computeColRangesCache(phys_inputs);
492 }
493 
495  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
496  return executor_->computeStringDictionaryGenerations(phys_inputs);
497 }
498 
500  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
501  return executor_->computeTableGenerations(phys_table_ids);
502 }
503 
504 Executor* RelAlgExecutor::getExecutor() const {
505  return executor_;
506 }
507 
509  CHECK(executor_);
510  executor_->row_set_mem_owner_ = nullptr;
511 }
512 
513 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
515  auto sort_node = dynamic_cast<const RelSort*>(root_node);
516  if (sort_node) {
517  // we assume that test query that needs join info does not contain any sort node
518  return {};
519  }
520  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
521  RelLeftDeepTreeIdsCollector visitor;
522  auto left_deep_tree_ids = visitor.visit(root_node);
523  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
524 }
525 
526 namespace {
527 
529  CHECK_EQ(size_t(1), sort->inputCount());
530  const auto source = sort->getInput(0);
531  if (dynamic_cast<const RelSort*>(source)) {
532  throw std::runtime_error("Sort node not supported as input to another sort");
533  }
534 }
535 
536 } // namespace
537 
539  const RaExecutionSequence& seq,
540  const size_t step_idx,
541  const CompilationOptions& co,
542  const ExecutionOptions& eo,
543  RenderInfo* render_info) {
544  INJECT_TIMER(executeRelAlgQueryStep);
545 
546  auto exe_desc_ptr = seq.getDescriptor(step_idx);
547  CHECK(exe_desc_ptr);
548  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
549 
550  size_t shard_count{0};
551  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
552  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
553  };
554 
555  if (sort) {
557  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
559  source_work_unit.exe_unit, *executor_->getCatalog());
560  if (!shard_count) {
561  // No point in sorting on the leaf, only execute the input to the sort node.
562  CHECK_EQ(size_t(1), sort->inputCount());
563  const auto source = sort->getInput(0);
564  if (sort->collationCount() || node_is_aggregate(source)) {
565  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
566  CHECK_EQ(temp_seq.size(), size_t(1));
567  ExecutionOptions eo_copy = {
569  eo.allow_multifrag,
570  eo.just_explain,
571  eo.allow_loop_joins,
572  eo.with_watchdog,
573  eo.jit_debug,
574  eo.just_validate || sort->isEmptyResult(),
583  eo.executor_type,
584  };
585  // Use subseq to avoid clearing existing temporary tables
586  return {
587  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
588  merge_type(source),
589  source->getId(),
590  false};
591  }
592  }
593  }
596  std::make_pair(step_idx, step_idx + 1),
597  co,
598  eo,
599  render_info,
601  merge_type(exe_desc_ptr->getBody()),
602  exe_desc_ptr->getBody()->getId(),
603  false};
605  VLOG(1) << "Running post execution callback.";
606  (*post_execution_callback_)();
607  }
608  return result;
609 }
610 
612  const AggregatedColRange& agg_col_range,
613  const StringDictionaryGenerations& string_dictionary_generations,
614  const TableGenerations& table_generations) {
615  // capture the lock acquistion time
616  auto clock_begin = timer_start();
618  executor_->resetInterrupt();
619  }
620  queue_time_ms_ = timer_stop(clock_begin);
621  executor_->row_set_mem_owner_ =
622  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
623  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
624  executor_->table_generations_ = table_generations;
625  executor_->agg_col_range_cache_ = agg_col_range;
626 }
627 
629  const CompilationOptions& co,
630  const ExecutionOptions& eo,
631  RenderInfo* render_info,
632  const int64_t queue_time_ms,
633  const bool with_existing_temp_tables) {
635  auto timer = DEBUG_TIMER(__func__);
636  if (!with_existing_temp_tables) {
638  }
641  executor_->setCatalog(&cat_);
642  executor_->temporary_tables_ = &temporary_tables_;
643 
644  time(&now_);
645  CHECK(!seq.empty());
646 
647  auto get_descriptor_count = [&seq, &eo]() -> size_t {
648  if (eo.just_explain) {
649  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
650  // run the logical values descriptor to generate the result set, then the next
651  // descriptor to generate the explain
652  CHECK_GE(seq.size(), size_t(2));
653  return 2;
654  } else {
655  return 1;
656  }
657  } else {
658  return seq.size();
659  }
660  };
661 
662  const auto exec_desc_count = get_descriptor_count();
663  // this join info needs to be maintained throughout an entire query runtime
664  for (size_t i = 0; i < exec_desc_count; i++) {
665  VLOG(1) << "Executing query step " << i;
666  // only render on the last step
667  try {
668  executeRelAlgStep(seq,
669  i,
670  co,
671  eo,
672  (i == exec_desc_count - 1) ? render_info : nullptr,
673  queue_time_ms);
674  } catch (const NativeExecutionError&) {
675  if (!g_enable_interop) {
676  throw;
677  }
678  auto eo_extern = eo;
679  eo_extern.executor_type = ::ExecutorType::Extern;
680  auto exec_desc_ptr = seq.getDescriptor(i);
681  const auto body = exec_desc_ptr->getBody();
682  const auto compound = dynamic_cast<const RelCompound*>(body);
683  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
684  LOG(INFO) << "Also failed to run the query using interoperability";
685  throw;
686  }
687  executeRelAlgStep(seq,
688  i,
689  co,
690  eo_extern,
691  (i == exec_desc_count - 1) ? render_info : nullptr,
692  queue_time_ms);
693  }
694  }
695 
696  return seq.getDescriptor(exec_desc_count - 1)->getResult();
697 }
698 
700  const RaExecutionSequence& seq,
701  const std::pair<size_t, size_t> interval,
702  const CompilationOptions& co,
703  const ExecutionOptions& eo,
704  RenderInfo* render_info,
705  const int64_t queue_time_ms) {
707  executor_->setCatalog(&cat_);
708  executor_->temporary_tables_ = &temporary_tables_;
710  time(&now_);
711  for (size_t i = interval.first; i < interval.second; i++) {
712  // only render on the last step
713  executeRelAlgStep(seq,
714  i,
715  co,
716  eo,
717  (i == interval.second - 1) ? render_info : nullptr,
718  queue_time_ms);
719  }
720 
721  return seq.getDescriptor(interval.second - 1)->getResult();
722 }
723 
725  const size_t step_idx,
726  const CompilationOptions& co,
727  const ExecutionOptions& eo,
728  RenderInfo* render_info,
729  const int64_t queue_time_ms) {
731  auto timer = DEBUG_TIMER(__func__);
733  auto exec_desc_ptr = seq.getDescriptor(step_idx);
734  CHECK(exec_desc_ptr);
735  auto& exec_desc = *exec_desc_ptr;
736  const auto body = exec_desc.getBody();
737  if (body->isNop()) {
738  handleNop(exec_desc);
739  return;
740  }
741  const ExecutionOptions eo_work_unit{
743  eo.allow_multifrag,
744  eo.just_explain,
745  eo.allow_loop_joins,
746  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
747  eo.jit_debug,
748  eo.just_validate,
757  eo.executor_type,
758  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
759 
760  // Notify foreign tables to load prior to execution
762 
763  const auto compound = dynamic_cast<const RelCompound*>(body);
764  if (compound) {
765  if (compound->isDeleteViaSelect()) {
766  executeDelete(compound, co, eo_work_unit, queue_time_ms);
767  } else if (compound->isUpdateViaSelect()) {
768  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
769  } else {
770  exec_desc.setResult(
771  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
772  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
773  << static_cast<int>(-compound->getId()) << ", ...)"
774  << " exec_desc.getResult().getDataPtr()->rowCount()="
775  << exec_desc.getResult().getDataPtr()->rowCount();
776  if (exec_desc.getResult().isFilterPushDownEnabled()) {
777  return;
778  }
779  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
780  }
781  return;
782  }
783  const auto project = dynamic_cast<const RelProject*>(body);
784  if (project) {
785  if (project->isDeleteViaSelect()) {
786  executeDelete(project, co, eo_work_unit, queue_time_ms);
787  } else if (project->isUpdateViaSelect()) {
788  executeUpdate(project, co, eo_work_unit, queue_time_ms);
789  } else {
790  std::optional<size_t> prev_count;
791  // Disabling the intermediate count optimization in distributed, as the previous
792  // execution descriptor will likely not hold the aggregated result.
793  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
794  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
795  CHECK(prev_exec_desc);
796  RelAlgNode const* prev_body = prev_exec_desc->getBody();
797  // This optimization needs to be restricted in its application for UNION, which
798  // can have 2 input nodes in which neither should restrict the count of the other.
799  // However some non-UNION queries are measurably slower with this restriction, so
800  // it is only applied when g_enable_union is true.
801  bool const parent_check =
802  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
803  // If the previous node produced a reliable count, skip the pre-flight count
804  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
805  dynamic_cast<const RelLogicalValues*>(prev_body))) {
806  const auto& prev_exe_result = prev_exec_desc->getResult();
807  const auto prev_result = prev_exe_result.getRows();
808  if (prev_result) {
809  prev_count = prev_result->rowCount();
810  VLOG(3) << "Setting output row count for projection node to previous node ("
811  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
812  }
813  }
814  }
815  exec_desc.setResult(executeProject(
816  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
817  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
818  << static_cast<int>(-project->getId()) << ", ...)"
819  << " exec_desc.getResult().getDataPtr()->rowCount()="
820  << exec_desc.getResult().getDataPtr()->rowCount();
821  if (exec_desc.getResult().isFilterPushDownEnabled()) {
822  return;
823  }
824  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
825  }
826  return;
827  }
828  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
829  if (aggregate) {
830  exec_desc.setResult(
831  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
832  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
833  return;
834  }
835  const auto filter = dynamic_cast<const RelFilter*>(body);
836  if (filter) {
837  exec_desc.setResult(
838  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
839  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
840  return;
841  }
842  const auto sort = dynamic_cast<const RelSort*>(body);
843  if (sort) {
844  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
845  if (exec_desc.getResult().isFilterPushDownEnabled()) {
846  return;
847  }
848  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
849  return;
850  }
851  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
852  if (logical_values) {
853  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
854  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
855  return;
856  }
857  const auto modify = dynamic_cast<const RelModify*>(body);
858  if (modify) {
859  exec_desc.setResult(executeModify(modify, eo_work_unit));
860  return;
861  }
862  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
863  if (logical_union) {
864  exec_desc.setResult(
865  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
866  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
867  return;
868  }
869  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
870  if (table_func) {
871  exec_desc.setResult(
872  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
873  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
874  return;
875  }
876  LOG(FATAL) << "Unhandled body type: " << body->toString();
877 }
878 
880  // just set the result of the previous node as the result of no op
881  auto body = ed.getBody();
882  CHECK(dynamic_cast<const RelAggregate*>(body));
883  CHECK_EQ(size_t(1), body->inputCount());
884  const auto input = body->getInput(0);
885  body->setOutputMetainfo(input->getOutputMetainfo());
886  const auto it = temporary_tables_.find(-input->getId());
887  CHECK(it != temporary_tables_.end());
888  // set up temp table as it could be used by the outer query or next step
889  addTemporaryTable(-body->getId(), it->second);
890 
891  ed.setResult({it->second, input->getOutputMetainfo()});
892 }
893 
894 namespace {
895 
896 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
897  public:
899 
900  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
901  return synthesized_physical_inputs_owned;
902  }
903 
904  std::unordered_set<const RexInput*> visitInput(
905  const RexInput* rex_input) const override {
906  const auto input_ra = rex_input->getSourceNode();
907  CHECK(input_ra);
908  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
909  if (scan_ra) {
910  const auto td = scan_ra->getTableDescriptor();
911  if (td) {
912  const auto col_id = rex_input->getIndex();
913  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
914  if (cd && cd->columnType.get_physical_cols() > 0) {
915  CHECK(IS_GEO(cd->columnType.get_type()));
916  std::unordered_set<const RexInput*> synthesized_physical_inputs;
917  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
918  auto physical_input =
919  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
920  synthesized_physical_inputs_owned.emplace_back(physical_input);
921  synthesized_physical_inputs.insert(physical_input);
922  }
923  return synthesized_physical_inputs;
924  }
925  }
926  }
927  return {rex_input};
928  }
929 
930  protected:
931  std::unordered_set<const RexInput*> aggregateResult(
932  const std::unordered_set<const RexInput*>& aggregate,
933  const std::unordered_set<const RexInput*>& next_result) const override {
934  auto result = aggregate;
935  result.insert(next_result.begin(), next_result.end());
936  return result;
937  }
938 
939  private:
940  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
942 };
943 
944 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
945  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
946  return table_func;
947  }
948  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
949  CHECK_EQ(size_t(2), join->inputCount());
950  return join;
951  }
952  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
953  CHECK_EQ(size_t(1), ra_node->inputCount());
954  }
955  auto only_src = ra_node->getInput(0);
956  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
957  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
958  return is_join ? only_src : ra_node;
959 }
960 
961 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
963  RexUsedInputsVisitor visitor(cat);
964  const auto filter_expr = compound->getFilterExpr();
965  std::unordered_set<const RexInput*> used_inputs =
966  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
967  const auto sources_size = compound->getScalarSourcesSize();
968  for (size_t i = 0; i < sources_size; ++i) {
969  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
970  used_inputs.insert(source_inputs.begin(), source_inputs.end());
971  }
972  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
973  return std::make_pair(used_inputs, used_inputs_owned);
974 }
975 
976 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
978  CHECK_EQ(size_t(1), aggregate->inputCount());
979  std::unordered_set<const RexInput*> used_inputs;
980  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
981  const auto source = aggregate->getInput(0);
982  const auto& in_metainfo = source->getOutputMetainfo();
983  const auto group_count = aggregate->getGroupByCount();
984  CHECK_GE(in_metainfo.size(), group_count);
985  for (size_t i = 0; i < group_count; ++i) {
986  auto synthesized_used_input = new RexInput(source, i);
987  used_inputs_owned.emplace_back(synthesized_used_input);
988  used_inputs.insert(synthesized_used_input);
989  }
990  for (const auto& agg_expr : aggregate->getAggExprs()) {
991  for (size_t i = 0; i < agg_expr->size(); ++i) {
992  const auto operand_idx = agg_expr->getOperand(i);
993  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
994  auto synthesized_used_input = new RexInput(source, operand_idx);
995  used_inputs_owned.emplace_back(synthesized_used_input);
996  used_inputs.insert(synthesized_used_input);
997  }
998  }
999  return std::make_pair(used_inputs, used_inputs_owned);
1000 }
1001 
1002 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1004  RexUsedInputsVisitor visitor(cat);
1005  std::unordered_set<const RexInput*> used_inputs;
1006  for (size_t i = 0; i < project->size(); ++i) {
1007  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1008  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1009  }
1010  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1011  return std::make_pair(used_inputs, used_inputs_owned);
1012 }
1013 
1014 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1017  RexUsedInputsVisitor visitor(cat);
1018  std::unordered_set<const RexInput*> used_inputs;
1019  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1020  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1021  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1022  }
1023  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1024  return std::make_pair(used_inputs, used_inputs_owned);
1025 }
1026 
1027 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1029  std::unordered_set<const RexInput*> used_inputs;
1030  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1031  const auto data_sink_node = get_data_sink(filter);
1032  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1033  const auto source = data_sink_node->getInput(nest_level);
1034  const auto scan_source = dynamic_cast<const RelScan*>(source);
1035  if (scan_source) {
1036  CHECK(source->getOutputMetainfo().empty());
1037  for (size_t i = 0; i < scan_source->size(); ++i) {
1038  auto synthesized_used_input = new RexInput(scan_source, i);
1039  used_inputs_owned.emplace_back(synthesized_used_input);
1040  used_inputs.insert(synthesized_used_input);
1041  }
1042  } else {
1043  const auto& partial_in_metadata = source->getOutputMetainfo();
1044  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1045  auto synthesized_used_input = new RexInput(source, i);
1046  used_inputs_owned.emplace_back(synthesized_used_input);
1047  used_inputs.insert(synthesized_used_input);
1048  }
1049  }
1050  }
1051  return std::make_pair(used_inputs, used_inputs_owned);
1052 }
1053 
1054 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1056  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1057  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1058  used_inputs_owned.reserve(logical_union->inputCount());
1059  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1060  auto const n_inputs = logical_union->inputCount();
1061  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1062  auto input = logical_union->getInput(nest_level);
1063  for (size_t i = 0; i < input->size(); ++i) {
1064  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1065  used_inputs.insert(used_inputs_owned.back().get());
1066  }
1067  }
1068  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1069 }
1070 
1071 int table_id_from_ra(const RelAlgNode* ra_node) {
1072  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1073  if (scan_ra) {
1074  const auto td = scan_ra->getTableDescriptor();
1075  CHECK(td);
1076  return td->tableId;
1077  }
1078  return -ra_node->getId();
1079 }
1080 
1081 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1082  const RelAlgNode* ra_node,
1083  const std::vector<size_t>& input_permutation) {
1084  const auto data_sink_node = get_data_sink(ra_node);
1085  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1086  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1087  const auto input_node_idx =
1088  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1089  const auto input_ra = data_sink_node->getInput(input_node_idx);
1090  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1091  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1092  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1093  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1094  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1095  CHECK(it_ok.second);
1096  LOG_IF(INFO, !input_permutation.empty())
1097  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1098  }
1099  return input_to_nest_level;
1100 }
1101 
1102 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1105  const auto data_sink_node = get_data_sink(ra_node);
1106  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1107  CHECK_EQ(join->inputCount(), 2u);
1108  const auto condition = join->getCondition();
1109  RexUsedInputsVisitor visitor(cat);
1110  auto condition_inputs = visitor.visit(condition);
1111  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1112  visitor.get_inputs_owned());
1113  return std::make_pair(condition_inputs, condition_inputs_owned);
1114  }
1115 
1116  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1117  CHECK_GE(left_deep_join->inputCount(), 2u);
1118  const auto condition = left_deep_join->getInnerCondition();
1119  RexUsedInputsVisitor visitor(cat);
1120  auto result = visitor.visit(condition);
1121  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1122  ++nesting_level) {
1123  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1124  if (outer_condition) {
1125  const auto outer_result = visitor.visit(outer_condition);
1126  result.insert(outer_result.begin(), outer_result.end());
1127  }
1128  }
1129  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1130  return std::make_pair(result, used_inputs_owned);
1131  }
1132 
1133  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1134  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1135  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1136  CHECK_GT(ra_node->inputCount(), 0u) << ra_node->toString();
1137  } else {
1138  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1139  }
1140  return std::make_pair(std::unordered_set<const RexInput*>{},
1141  std::vector<std::shared_ptr<RexInput>>{});
1142 }
1143 
1145  std::vector<InputDescriptor>& input_descs,
1147  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1148  const RelAlgNode* ra_node,
1149  const std::unordered_set<const RexInput*>& source_used_inputs,
1150  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1151  VLOG(3) << "ra_node=" << ra_node->toString()
1152  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1153  << " source_used_inputs.size()=" << source_used_inputs.size();
1154  for (const auto used_input : source_used_inputs) {
1155  const auto input_ra = used_input->getSourceNode();
1156  const int table_id = table_id_from_ra(input_ra);
1157  const auto col_id = used_input->getIndex();
1158  auto it = input_to_nest_level.find(input_ra);
1159  if (it != input_to_nest_level.end()) {
1160  const int input_desc = it->second;
1161  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1162  dynamic_cast<const RelScan*>(input_ra)
1163  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1164  : col_id,
1165  table_id,
1166  input_desc));
1167  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1168  throw std::runtime_error("Bushy joins not supported");
1169  }
1170  }
1171 }
1172 
1173 template <class RA>
1174 std::pair<std::vector<InputDescriptor>,
1175  std::list<std::shared_ptr<const InputColDescriptor>>>
1176 get_input_desc_impl(const RA* ra_node,
1177  const std::unordered_set<const RexInput*>& used_inputs,
1178  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1179  const std::vector<size_t>& input_permutation,
1181  std::vector<InputDescriptor> input_descs;
1182  const auto data_sink_node = get_data_sink(ra_node);
1183  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1184  const auto input_node_idx =
1185  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1186  auto input_ra = data_sink_node->getInput(input_node_idx);
1187  const int table_id = table_id_from_ra(input_ra);
1188  input_descs.emplace_back(table_id, input_idx);
1189  }
1190  std::sort(input_descs.begin(),
1191  input_descs.end(),
1192  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1193  return lhs.getNestLevel() < rhs.getNestLevel();
1194  });
1195  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1196  collect_used_input_desc(input_descs,
1197  cat,
1198  input_col_descs_unique, // modified
1199  ra_node,
1200  used_inputs,
1201  input_to_nest_level);
1202  std::unordered_set<const RexInput*> join_source_used_inputs;
1203  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1204  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1205  get_join_source_used_inputs(ra_node, cat);
1206  collect_used_input_desc(input_descs,
1207  cat,
1208  input_col_descs_unique, // modified
1209  ra_node,
1210  join_source_used_inputs,
1211  input_to_nest_level);
1212  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1213  input_col_descs_unique.begin(), input_col_descs_unique.end());
1214 
1215  std::sort(input_col_descs.begin(),
1216  input_col_descs.end(),
1217  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1218  std::shared_ptr<const InputColDescriptor> const& rhs) {
1219  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1220  lhs->getColId(),
1221  lhs->getScanDesc().getTableId()) <
1222  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1223  rhs->getColId(),
1224  rhs->getScanDesc().getTableId());
1225  });
1226  return {input_descs,
1227  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1228  input_col_descs.end())};
1229 }
1230 
1231 template <class RA>
1232 std::tuple<std::vector<InputDescriptor>,
1233  std::list<std::shared_ptr<const InputColDescriptor>>,
1234  std::vector<std::shared_ptr<RexInput>>>
1235 get_input_desc(const RA* ra_node,
1236  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1237  const std::vector<size_t>& input_permutation,
1238  const Catalog_Namespace::Catalog& cat) {
1239  std::unordered_set<const RexInput*> used_inputs;
1240  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1241  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1242  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1243  auto input_desc_pair = get_input_desc_impl(
1244  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1245  return std::make_tuple(
1246  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1247 }
1248 
1249 size_t get_scalar_sources_size(const RelCompound* compound) {
1250  return compound->getScalarSourcesSize();
1251 }
1252 
1253 size_t get_scalar_sources_size(const RelProject* project) {
1254  return project->size();
1255 }
1256 
1257 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1258  return table_func->getTableFuncInputsSize();
1259 }
1260 
1261 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1262  return compound->getScalarSource(i);
1263 }
1264 
1265 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1266  return project->getProjectAt(i);
1267 }
1268 
1269 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1270  return table_func->getTableFuncInputAt(i);
1271 }
1272 
1273 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1274  const std::shared_ptr<Analyzer::Expr> expr) {
1275  const auto& ti = expr->get_type_info();
1276  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1277  return expr;
1278  }
1279  auto transient_dict_ti = ti;
1280  transient_dict_ti.set_compression(kENCODING_DICT);
1281  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1282  transient_dict_ti.set_fixed_size();
1283  return expr->add_cast(transient_dict_ti);
1284 }
1285 
1287  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1288  const std::shared_ptr<Analyzer::Expr>& expr) {
1289  try {
1290  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1291  } catch (...) {
1292  scalar_sources.push_back(fold_expr(expr.get()));
1293  }
1294 }
1295 
1296 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1297  const std::shared_ptr<Analyzer::Expr>& input) {
1298  const auto& input_ti = input->get_type_info();
1299  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1300  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1301  }
1302  return input;
1303 }
1304 
1305 template <class RA>
1306 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1307  const RA* ra_node,
1308  const RelAlgTranslator& translator,
1309  const ::ExecutorType executor_type) {
1310  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1311  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1312  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1313  << ") = " << scalar_sources_size;
1314  for (size_t i = 0; i < scalar_sources_size; ++i) {
1315  const auto scalar_rex = scalar_at(i, ra_node);
1316  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1317  // RexRef are synthetic scalars we append at the end of the real ones
1318  // for the sake of taking memory ownership, no real work needed here.
1319  continue;
1320  }
1321 
1322  const auto scalar_expr =
1323  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1324  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1325  if (executor_type == ExecutorType::Native) {
1326  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1327  } else {
1328  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1329  }
1330  }
1331 
1332  return scalar_sources;
1333 }
1334 
1335 template <class RA>
1336 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1337  const RA* ra_node,
1338  const RelAlgTranslator& translator,
1339  int32_t tableId,
1340  const Catalog_Namespace::Catalog& cat,
1341  const ColumnNameList& colNames,
1342  size_t starting_projection_column_idx) {
1343  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1344  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1345  const auto scalar_rex = scalar_at(i, ra_node);
1346  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1347  // RexRef are synthetic scalars we append at the end of the real ones
1348  // for the sake of taking memory ownership, no real work needed here.
1349  continue;
1350  }
1351 
1352  std::shared_ptr<Analyzer::Expr> translated_expr;
1353  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1354  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1355  tableId,
1356  cat,
1357  colNames[i - starting_projection_column_idx]);
1358  } else {
1359  translated_expr = translator.translateScalarRex(scalar_rex);
1360  }
1361  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1362  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1363  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1364  }
1365 
1366  return scalar_sources;
1367 }
1368 
1369 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1370  const RelCompound* compound,
1371  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1372  if (!compound->isAggregate()) {
1373  return {nullptr};
1374  }
1375  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1376  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1377  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1378  }
1379  return groupby_exprs;
1380 }
1381 
1382 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1383  const RelAggregate* aggregate,
1384  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1385  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1386  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1387  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1388  }
1389  return groupby_exprs;
1390 }
1391 
1393  const RelAlgTranslator& translator) {
1394  const auto filter_rex = compound->getFilterExpr();
1395  const auto filter_expr =
1396  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1397  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1399 }
1400 
1401 std::vector<Analyzer::Expr*> translate_targets(
1402  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1403  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1404  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1405  const RelCompound* compound,
1406  const RelAlgTranslator& translator,
1407  const ExecutorType executor_type) {
1408  std::vector<Analyzer::Expr*> target_exprs;
1409  for (size_t i = 0; i < compound->size(); ++i) {
1410  const auto target_rex = compound->getTargetExpr(i);
1411  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1412  std::shared_ptr<Analyzer::Expr> target_expr;
1413  if (target_rex_agg) {
1414  target_expr =
1415  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1416  } else {
1417  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1418  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1419  if (target_rex_ref) {
1420  const auto ref_idx = target_rex_ref->getIndex();
1421  CHECK_GE(ref_idx, size_t(1));
1422  CHECK_LE(ref_idx, groupby_exprs.size());
1423  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1424  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1425  } else {
1426  target_expr = translator.translateScalarRex(target_rex_scalar);
1427  auto rewritten_expr = rewrite_expr(target_expr.get());
1428  target_expr = fold_expr(rewritten_expr.get());
1429  if (executor_type == ExecutorType::Native) {
1430  try {
1431  target_expr = set_transient_dict(target_expr);
1432  } catch (...) {
1433  // noop
1434  }
1435  } else {
1436  target_expr = cast_dict_to_none(target_expr);
1437  }
1438  }
1439  }
1440  CHECK(target_expr);
1441  target_exprs_owned.push_back(target_expr);
1442  target_exprs.push_back(target_expr.get());
1443  }
1444  return target_exprs;
1445 }
1446 
1447 std::vector<Analyzer::Expr*> translate_targets(
1448  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1449  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1450  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1451  const RelAggregate* aggregate,
1452  const RelAlgTranslator& translator) {
1453  std::vector<Analyzer::Expr*> target_exprs;
1454  size_t group_key_idx = 1;
1455  for (const auto& groupby_expr : groupby_exprs) {
1456  auto target_expr =
1457  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1458  target_exprs_owned.push_back(target_expr);
1459  target_exprs.push_back(target_expr.get());
1460  }
1461 
1462  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1463  auto target_expr =
1464  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1465  CHECK(target_expr);
1466  target_expr = fold_expr(target_expr.get());
1467  target_exprs_owned.push_back(target_expr);
1468  target_exprs.push_back(target_expr.get());
1469  }
1470  return target_exprs;
1471 }
1472 
1474  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1475  return agg_expr && agg_expr->get_is_distinct();
1476 }
1477 
1478 bool is_agg(const Analyzer::Expr* expr) {
1479  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1480  if (agg_expr && agg_expr->get_contains_agg()) {
1481  auto agg_type = agg_expr->get_aggtype();
1482  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1483  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1484  return true;
1485  }
1486  }
1487  return false;
1488 }
1489 
1491  if (is_count_distinct(&expr)) {
1492  return SQLTypeInfo(kBIGINT, false);
1493  } else if (is_agg(&expr)) {
1495  }
1496  return get_logical_type_info(expr.get_type_info());
1497 }
1498 
1499 template <class RA>
1500 std::vector<TargetMetaInfo> get_targets_meta(
1501  const RA* ra_node,
1502  const std::vector<Analyzer::Expr*>& target_exprs) {
1503  std::vector<TargetMetaInfo> targets_meta;
1504  CHECK_EQ(ra_node->size(), target_exprs.size());
1505  for (size_t i = 0; i < ra_node->size(); ++i) {
1506  CHECK(target_exprs[i]);
1507  // TODO(alex): remove the count distinct type fixup.
1508  targets_meta.emplace_back(ra_node->getFieldName(i),
1509  get_logical_type_for_expr(*target_exprs[i]),
1510  target_exprs[i]->get_type_info());
1511  }
1512  return targets_meta;
1513 }
1514 
1515 template <>
1516 std::vector<TargetMetaInfo> get_targets_meta(
1517  const RelFilter* filter,
1518  const std::vector<Analyzer::Expr*>& target_exprs) {
1519  RelAlgNode const* input0 = filter->getInput(0);
1520  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1521  return get_targets_meta(input, target_exprs);
1522  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1523  return get_targets_meta(input, target_exprs);
1524  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1525  return get_targets_meta(input, target_exprs);
1526  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1527  return get_targets_meta(input, target_exprs);
1528  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1529  return get_targets_meta(input, target_exprs);
1530  }
1531  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1532  return {};
1533 }
1534 
1535 } // namespace
1536 
1538  const CompilationOptions& co_in,
1539  const ExecutionOptions& eo_in,
1540  const int64_t queue_time_ms) {
1541  CHECK(node);
1542  auto timer = DEBUG_TIMER(__func__);
1543 
1545 
1546  auto co = co_in;
1547  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1548  // encoded string updates
1549 
1550  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1551  auto& work_unit,
1552  const bool is_aggregate) {
1553  auto table_descriptor = node->getModifiedTableDescriptor();
1554  CHECK(table_descriptor);
1555  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1556  throw std::runtime_error(
1557  "UPDATE queries involving variable length columns are only supported on tables "
1558  "with the vacuum attribute set to 'delayed'");
1559  }
1560  auto updated_table_desc = node->getModifiedTableDescriptor();
1562  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1563  node->getTargetColumns(),
1564  node->getOutputMetainfo(),
1565  node->isVarlenUpdateRequired());
1566 
1567  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1568 
1569  auto execute_update_ra_exe_unit =
1570  [this, &co, &eo_in, &table_infos, &updated_table_desc](
1571  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1573 
1574  auto eo = eo_in;
1575  if (dml_transaction_parameters_->tableIsTemporary()) {
1576  eo.output_columnar_hint = true;
1577  co_project.allow_lazy_fetch = false;
1578  co_project.filter_on_deleted_column =
1579  false; // project the entire delete column for columnar update
1580  }
1581 
1582  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
1584  CHECK(update_transaction_parameters);
1585  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1586  try {
1587  auto table_update_metadata =
1588  executor_->executeUpdate(ra_exe_unit,
1589  table_infos,
1590  updated_table_desc,
1591  co_project,
1592  eo,
1593  cat_,
1594  executor_->row_set_mem_owner_,
1595  update_callback,
1596  is_aggregate);
1597  post_execution_callback_ = [table_update_metadata, this]() {
1598  dml_transaction_parameters_->finalizeTransaction(cat_);
1599  TableOptimizer table_optimizer{
1600  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1601  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1602  };
1603  } catch (const QueryExecutionError& e) {
1604  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1605  }
1606  };
1607 
1608  if (dml_transaction_parameters_->tableIsTemporary()) {
1609  // hold owned target exprs during execution if rewriting
1610  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1611  // rewrite temp table updates to generate the full column by moving the where
1612  // clause into a case if such a rewrite is not possible, bail on the update
1613  // operation build an expr for the update target
1614  auto update_transaction_params =
1616  CHECK(update_transaction_params);
1617  const auto td = update_transaction_params->getTableDescriptor();
1618  CHECK(td);
1619  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1620  if (update_column_names.size() > 1) {
1621  throw std::runtime_error(
1622  "Multi-column update is not yet supported for temporary tables.");
1623  }
1624 
1625  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1626  CHECK(cd);
1627  auto projected_column_to_update =
1628  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1629  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1630  work_unit.exe_unit, projected_column_to_update);
1631  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1632  throw std::runtime_error(
1633  "Variable length updates not yet supported on temporary tables.");
1634  }
1635  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1636  } else {
1637  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1638  }
1639  };
1640 
1641  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1642  auto work_unit =
1643  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1644 
1645  execute_update_for_node(compound, work_unit, compound->isAggregate());
1646  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1647  auto work_unit =
1648  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1649 
1650  if (project->isSimple()) {
1651  CHECK_EQ(size_t(1), project->inputCount());
1652  const auto input_ra = project->getInput(0);
1653  if (dynamic_cast<const RelSort*>(input_ra)) {
1654  const auto& input_table =
1655  get_temporary_table(&temporary_tables_, -input_ra->getId());
1656  CHECK(input_table);
1657  work_unit.exe_unit.scan_limit = input_table->rowCount();
1658  }
1659  }
1660 
1661  execute_update_for_node(project, work_unit, false);
1662  } else {
1663  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1664  }
1665 }
1666 
1668  const CompilationOptions& co,
1669  const ExecutionOptions& eo_in,
1670  const int64_t queue_time_ms) {
1671  CHECK(node);
1672  auto timer = DEBUG_TIMER(__func__);
1673 
1675 
1676  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1677  auto& work_unit,
1678  const bool is_aggregate) {
1679  auto* table_descriptor = node->getModifiedTableDescriptor();
1680  CHECK(table_descriptor);
1681  if (!table_descriptor->hasDeletedCol) {
1682  throw std::runtime_error(
1683  "DELETE queries are only supported on tables with the vacuum attribute set to "
1684  "'delayed'");
1685  }
1686 
1687  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1688 
1689  auto execute_delete_ra_exe_unit =
1690  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1691  const bool is_aggregate) {
1693  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1694  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1696  CHECK(delete_params);
1697  auto delete_callback = yieldDeleteCallback(*delete_params);
1699 
1700  auto eo = eo_in;
1701  if (dml_transaction_parameters_->tableIsTemporary()) {
1702  eo.output_columnar_hint = true;
1703  co_delete.filter_on_deleted_column =
1704  false; // project the entire delete column for columnar update
1705  } else {
1706  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1707  }
1708 
1709  try {
1710  auto table_update_metadata =
1711  executor_->executeUpdate(exe_unit,
1712  table_infos,
1713  table_descriptor,
1714  co_delete,
1715  eo,
1716  cat_,
1717  executor_->row_set_mem_owner_,
1718  delete_callback,
1719  is_aggregate);
1720  post_execution_callback_ = [table_update_metadata, this]() {
1721  dml_transaction_parameters_->finalizeTransaction(cat_);
1722  TableOptimizer table_optimizer{
1723  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1724  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1725  };
1726  } catch (const QueryExecutionError& e) {
1727  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1728  }
1729  };
1730 
1731  if (table_is_temporary(table_descriptor)) {
1732  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1733  auto cd = cat_.getDeletedColumn(table_descriptor);
1734  CHECK(cd);
1735  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1736  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1737  const auto rewritten_exe_unit =
1738  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1739  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1740  } else {
1741  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1742  }
1743  };
1744 
1745  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1746  const auto work_unit =
1747  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1748  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1749  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1750  auto work_unit =
1751  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1752  if (project->isSimple()) {
1753  CHECK_EQ(size_t(1), project->inputCount());
1754  const auto input_ra = project->getInput(0);
1755  if (dynamic_cast<const RelSort*>(input_ra)) {
1756  const auto& input_table =
1757  get_temporary_table(&temporary_tables_, -input_ra->getId());
1758  CHECK(input_table);
1759  work_unit.exe_unit.scan_limit = input_table->rowCount();
1760  }
1761  }
1762  execute_delete_for_node(project, work_unit, false);
1763  } else {
1764  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1765  }
1766 }
1767 
1769  const CompilationOptions& co,
1770  const ExecutionOptions& eo,
1771  RenderInfo* render_info,
1772  const int64_t queue_time_ms) {
1773  auto timer = DEBUG_TIMER(__func__);
1774  const auto work_unit =
1775  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1776  CompilationOptions co_compound = co;
1777  return executeWorkUnit(work_unit,
1778  compound->getOutputMetainfo(),
1779  compound->isAggregate(),
1780  co_compound,
1781  eo,
1782  render_info,
1783  queue_time_ms);
1784 }
1785 
1787  const CompilationOptions& co,
1788  const ExecutionOptions& eo,
1789  RenderInfo* render_info,
1790  const int64_t queue_time_ms) {
1791  auto timer = DEBUG_TIMER(__func__);
1792  const auto work_unit = createAggregateWorkUnit(
1793  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1794  return executeWorkUnit(work_unit,
1795  aggregate->getOutputMetainfo(),
1796  true,
1797  co,
1798  eo,
1799  render_info,
1800  queue_time_ms);
1801 }
1802 
1803 namespace {
1804 
1805 // Returns true iff the execution unit contains window functions.
1807  return std::any_of(ra_exe_unit.target_exprs.begin(),
1808  ra_exe_unit.target_exprs.end(),
1809  [](const Analyzer::Expr* expr) {
1810  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1811  });
1812 }
1813 
1814 } // namespace
1815 
1817  const RelProject* project,
1818  const CompilationOptions& co,
1819  const ExecutionOptions& eo,
1820  RenderInfo* render_info,
1821  const int64_t queue_time_ms,
1822  const std::optional<size_t> previous_count) {
1823  auto timer = DEBUG_TIMER(__func__);
1824  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1825  CompilationOptions co_project = co;
1826  if (project->isSimple()) {
1827  CHECK_EQ(size_t(1), project->inputCount());
1828  const auto input_ra = project->getInput(0);
1829  if (dynamic_cast<const RelSort*>(input_ra)) {
1830  co_project.device_type = ExecutorDeviceType::CPU;
1831  const auto& input_table =
1832  get_temporary_table(&temporary_tables_, -input_ra->getId());
1833  CHECK(input_table);
1834  work_unit.exe_unit.scan_limit =
1835  std::min(input_table->getLimit(), input_table->rowCount());
1836  }
1837  }
1838  return executeWorkUnit(work_unit,
1839  project->getOutputMetainfo(),
1840  false,
1841  co_project,
1842  eo,
1843  render_info,
1844  queue_time_ms,
1845  previous_count);
1846 }
1847 
1849  const CompilationOptions& co_in,
1850  const ExecutionOptions& eo,
1851  const int64_t queue_time_ms) {
1853  auto timer = DEBUG_TIMER(__func__);
1854 
1855  auto co = co_in;
1856 
1857  if (g_cluster) {
1858  throw std::runtime_error("Table functions not supported in distributed mode yet");
1859  }
1860  if (!g_enable_table_functions) {
1861  throw std::runtime_error("Table function support is disabled");
1862  }
1863  auto table_func_work_unit = createTableFunctionWorkUnit(
1864  table_func,
1865  eo.just_explain,
1866  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1867  const auto body = table_func_work_unit.body;
1868  CHECK(body);
1869 
1870  const auto table_infos =
1871  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1872 
1873  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1874  co.device_type,
1876  nullptr,
1877  executor_->getCatalog(),
1878  executor_->blockSize(),
1879  executor_->gridSize()),
1880  {}};
1881 
1882  try {
1883  result = {executor_->executeTableFunction(
1884  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1885  body->getOutputMetainfo()};
1886  } catch (const QueryExecutionError& e) {
1889  throw std::runtime_error("Table function ran out of memory during execution");
1890  }
1891  result.setQueueTime(queue_time_ms);
1892  return result;
1893 }
1894 
1895 namespace {
1896 
1897 // Creates a new expression which has the range table index set to 1. This is needed to
1898 // reuse the hash join construction helpers to generate a hash table for the window
1899 // function partition: create an equals expression with left and right sides identical
1900 // except for the range table index.
1901 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1902  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1903  if (tuple) {
1904  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1905  for (const auto& element : tuple->getTuple()) {
1906  transformed_tuple.push_back(transform_to_inner(element.get()));
1907  }
1908  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1909  }
1910  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1911  if (!col) {
1912  throw std::runtime_error("Only columns supported in the window partition for now");
1913  }
1914  return makeExpr<Analyzer::ColumnVar>(
1915  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1916 }
1917 
1918 } // namespace
1919 
1921  const CompilationOptions& co,
1922  const ExecutionOptions& eo,
1923  ColumnCacheMap& column_cache_map,
1924  const int64_t queue_time_ms) {
1925  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1926  CHECK_EQ(query_infos.size(), size_t(1));
1927  if (query_infos.front().info.fragments.size() != 1) {
1928  throw std::runtime_error(
1929  "Only single fragment tables supported for window functions for now");
1930  }
1931  if (eo.executor_type == ::ExecutorType::Extern) {
1932  return;
1933  }
1934  query_infos.push_back(query_infos.front());
1935  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1936  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1937  ++target_index) {
1938  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1939  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1940  if (!window_func) {
1941  continue;
1942  }
1943  // Always use baseline layout hash tables for now, make the expression a tuple.
1944  const auto& partition_keys = window_func->getPartitionKeys();
1945  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1946  if (partition_keys.size() > 1) {
1947  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1948  } else {
1949  if (partition_keys.empty()) {
1950  throw std::runtime_error(
1951  "Empty window function partitions are not supported yet");
1952  }
1953  CHECK_EQ(partition_keys.size(), size_t(1));
1954  partition_key_tuple = partition_keys.front();
1955  }
1956  // Creates a tautology equality with the partition expression on both sides.
1957  const auto partition_key_cond =
1958  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1959  kBW_EQ,
1960  kONE,
1961  partition_key_tuple,
1962  transform_to_inner(partition_key_tuple.get()));
1963  auto context = createWindowFunctionContext(window_func,
1964  partition_key_cond,
1965  ra_exe_unit,
1966  query_infos,
1967  co,
1968  column_cache_map,
1969  executor_->getRowSetMemoryOwner());
1970  context->compute();
1971  window_project_node_context->addWindowFunctionContext(std::move(context),
1972  target_index);
1973  }
1974 }
1975 
1976 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1977  const Analyzer::WindowFunction* window_func,
1978  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1979  const RelAlgExecutionUnit& ra_exe_unit,
1980  const std::vector<InputTableInfo>& query_infos,
1981  const CompilationOptions& co,
1982  ColumnCacheMap& column_cache_map,
1983  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1984  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1987  const auto join_table_or_err =
1988  executor_->buildHashTableForQualifier(partition_key_cond,
1989  query_infos,
1990  memory_level,
1991  JoinType::INVALID, // for window function
1993  column_cache_map,
1994  ra_exe_unit.query_hint);
1995  if (!join_table_or_err.fail_reason.empty()) {
1996  throw std::runtime_error(join_table_or_err.fail_reason);
1997  }
1998  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
1999  const auto& order_keys = window_func->getOrderKeys();
2000  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2001  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2002  auto context = std::make_unique<WindowFunctionContext>(window_func,
2003  join_table_or_err.hash_table,
2004  elem_count,
2005  co.device_type,
2006  row_set_mem_owner);
2007  for (const auto& order_key : order_keys) {
2008  const auto order_col =
2009  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2010  if (!order_col) {
2011  throw std::runtime_error("Only order by columns supported for now");
2012  }
2013  const int8_t* column;
2014  size_t join_col_elem_count;
2015  std::tie(column, join_col_elem_count) =
2017  *order_col,
2018  query_infos.front().info.fragments.front(),
2019  memory_level,
2020  0,
2021  nullptr,
2022  /*thread_idx=*/0,
2023  chunks_owner,
2024  column_cache_map);
2025  CHECK_EQ(join_col_elem_count, elem_count);
2026  context->addOrderColumn(column, order_col.get(), chunks_owner);
2027  }
2028  return context;
2029 }
2030 
2032  const CompilationOptions& co,
2033  const ExecutionOptions& eo,
2034  RenderInfo* render_info,
2035  const int64_t queue_time_ms) {
2036  auto timer = DEBUG_TIMER(__func__);
2037  const auto work_unit =
2038  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2039  return executeWorkUnit(
2040  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2041 }
2042 
2043 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2044  std::vector<TargetMetaInfo> const& rhs) {
2045  if (lhs.size() == rhs.size()) {
2046  for (size_t i = 0; i < lhs.size(); ++i) {
2047  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2048  return false;
2049  }
2050  }
2051  return true;
2052  }
2053  return false;
2054 }
2055 
2056 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2057  return target_meta_info.get_type_info().is_geometry();
2058 }
2059 
2061  const RaExecutionSequence& seq,
2062  const CompilationOptions& co,
2063  const ExecutionOptions& eo,
2064  RenderInfo* render_info,
2065  const int64_t queue_time_ms) {
2066  auto timer = DEBUG_TIMER(__func__);
2067  if (!logical_union->isAll()) {
2068  throw std::runtime_error("UNION without ALL is not supported yet.");
2069  }
2070  // Will throw a std::runtime_error if types don't match.
2071  logical_union->checkForMatchingMetaInfoTypes();
2072  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2073  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2074  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2075  }
2076  // Only Projections and Aggregates from a UNION are supported for now.
2077  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
2078  if (node->hasInput(logical_union) &&
2079  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
2080  node)) {
2081  throw std::runtime_error("UNION ALL not yet supported in this context.");
2082  }
2083  });
2084 
2085  auto work_unit =
2086  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2087  return executeWorkUnit(work_unit,
2088  logical_union->getOutputMetainfo(),
2089  false,
2091  eo,
2092  render_info,
2093  queue_time_ms);
2094 }
2095 
2097  const RelLogicalValues* logical_values,
2098  const ExecutionOptions& eo) {
2099  auto timer = DEBUG_TIMER(__func__);
2101  logical_values->getNumRows(),
2103  /*is_table_function=*/false);
2104 
2105  auto tuple_type = logical_values->getTupleType();
2106  for (size_t i = 0; i < tuple_type.size(); ++i) {
2107  auto& target_meta_info = tuple_type[i];
2108  if (target_meta_info.get_type_info().is_varlen()) {
2109  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2110  }
2111  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2112  // replace w/ bigint
2113  tuple_type[i] =
2114  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2115  }
2116  query_mem_desc.addColSlotInfo(
2117  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2118  }
2119  logical_values->setOutputMetainfo(tuple_type);
2120 
2121  std::vector<TargetInfo> target_infos;
2122  for (const auto& tuple_type_component : tuple_type) {
2123  target_infos.emplace_back(TargetInfo{false,
2124  kCOUNT,
2125  tuple_type_component.get_type_info(),
2126  SQLTypeInfo(kNULLT, false),
2127  false,
2128  false,
2129  /*is_varlen_projection=*/false});
2130  }
2131 
2132  std::shared_ptr<ResultSet> rs{
2133  ResultSetLogicalValuesBuilder{logical_values,
2134  target_infos,
2137  executor_->getRowSetMemoryOwner(),
2138  executor_}
2139  .build()};
2140 
2141  return {rs, tuple_type};
2142 }
2143 
2144 namespace {
2145 
2146 template <class T>
2147 int64_t insert_one_dict_str(T* col_data,
2148  const std::string& columnName,
2149  const SQLTypeInfo& columnType,
2150  const Analyzer::Constant* col_cv,
2151  const Catalog_Namespace::Catalog& catalog) {
2152  if (col_cv->get_is_null()) {
2153  *col_data = inline_fixed_encoding_null_val(columnType);
2154  } else {
2155  const int dict_id = columnType.get_comp_param();
2156  const auto col_datum = col_cv->get_constval();
2157  const auto& str = *col_datum.stringval;
2158  const auto dd = catalog.getMetadataForDict(dict_id);
2159  CHECK(dd && dd->stringDict);
2160  int32_t str_id = dd->stringDict->getOrAdd(str);
2161  if (!dd->dictIsTemp) {
2162  const auto checkpoint_ok = dd->stringDict->checkpoint();
2163  if (!checkpoint_ok) {
2164  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2165  columnName);
2166  }
2167  }
2168  const bool invalid = str_id > max_valid_int_value<T>();
2169  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2170  if (invalid) {
2171  LOG(ERROR) << "Could not encode string: " << str
2172  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2173  << " bits. Will store NULL instead.";
2174  }
2175  str_id = inline_fixed_encoding_null_val(columnType);
2176  }
2177  *col_data = str_id;
2178  }
2179  return *col_data;
2180 }
2181 
2182 template <class T>
2183 int64_t insert_one_dict_str(T* col_data,
2184  const ColumnDescriptor* cd,
2185  const Analyzer::Constant* col_cv,
2186  const Catalog_Namespace::Catalog& catalog) {
2187  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2188 }
2189 
2190 } // namespace
2191 
2193  const ExecutionOptions& eo) {
2194  auto timer = DEBUG_TIMER(__func__);
2195  if (eo.just_explain) {
2196  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2197  }
2198 
2199  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2202  executor_->getRowSetMemoryOwner(),
2203  executor_->getCatalog(),
2204  executor_->blockSize(),
2205  executor_->gridSize());
2206 
2207  std::vector<TargetMetaInfo> empty_targets;
2208  return {rs, empty_targets};
2209 }
2210 
2211 namespace {
2212 int64_t int_value_from_numbers_ptr(const SQLTypeInfo& type_info, const int8_t* data) {
2213  size_t sz = 0;
2214  switch (type_info.get_type()) {
2215  case kTINYINT:
2216  case kSMALLINT:
2217  case kINT:
2218  case kBIGINT:
2219  case kTIMESTAMP:
2220  case kTIME:
2221  case kDATE:
2222  sz = type_info.get_logical_size();
2223  break;
2224  case kTEXT:
2225  case kVARCHAR:
2226  case kCHAR:
2227  CHECK(type_info.get_compression() == kENCODING_DICT);
2228  sz = type_info.get_size();
2229  break;
2230  default:
2231  CHECK(false) << "Unexpected sharding key datatype";
2232  }
2233 
2234  switch (sz) {
2235  case 1:
2236  return *(reinterpret_cast<const int8_t*>(data));
2237  case 2:
2238  return *(reinterpret_cast<const int16_t*>(data));
2239  case 4:
2240  return *(reinterpret_cast<const int32_t*>(data));
2241  case 8:
2242  return *(reinterpret_cast<const int64_t*>(data));
2243  default:
2244  CHECK(false);
2245  return 0;
2246  }
2247 }
2248 
2251  const Fragmenter_Namespace::InsertData& data) {
2252  auto shard_column_md = cat.getShardColumnMetadataForTable(td);
2253  CHECK(shard_column_md);
2254  auto sharded_column_id = shard_column_md->columnId;
2255  const TableDescriptor* shard{nullptr};
2256  for (size_t i = 0; i < data.columnIds.size(); ++i) {
2257  if (data.columnIds[i] == sharded_column_id) {
2258  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
2259  const auto shard_count = shard_tables.size();
2260  CHECK(data.data[i].numbersPtr);
2261  auto value = int_value_from_numbers_ptr(shard_column_md->columnType,
2262  data.data[i].numbersPtr);
2263  const size_t shard_idx = SHARD_FOR_KEY(value, shard_count);
2264  shard = shard_tables[shard_idx];
2265  break;
2266  }
2267  }
2268  return shard;
2269 }
2270 
2271 } // namespace
2272 
2274  // Note: We currently obtain an executor for this method, but we do not need it.
2275  // Therefore, we skip the executor state setup in the regular execution path. In the
2276  // future, we will likely want to use the executor to evaluate expressions in the insert
2277  // statement.
2278 
2279  const auto& targets = query.get_targetlist();
2280  const int table_id = query.get_result_table_id();
2281  const auto& col_id_list = query.get_result_col_list();
2282 
2283  std::vector<const ColumnDescriptor*> col_descriptors;
2284  std::vector<int> col_ids;
2285  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2286  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2287  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2288 
2289  for (const int col_id : col_id_list) {
2290  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2291  const auto col_enc = cd->columnType.get_compression();
2292  if (cd->columnType.is_string()) {
2293  switch (col_enc) {
2294  case kENCODING_NONE: {
2295  auto it_ok =
2296  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2297  CHECK(it_ok.second);
2298  break;
2299  }
2300  case kENCODING_DICT: {
2301  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2302  CHECK(dd);
2303  const auto it_ok = col_buffers.emplace(
2304  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2305  CHECK(it_ok.second);
2306  break;
2307  }
2308  default:
2309  CHECK(false);
2310  }
2311  } else if (cd->columnType.is_geometry()) {
2312  auto it_ok =
2313  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2314  CHECK(it_ok.second);
2315  } else if (cd->columnType.is_array()) {
2316  auto it_ok =
2317  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2318  CHECK(it_ok.second);
2319  } else {
2320  const auto it_ok = col_buffers.emplace(
2321  col_id,
2322  std::unique_ptr<uint8_t[]>(
2323  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2324  // the buffer
2325  CHECK(it_ok.second);
2326  }
2327  col_descriptors.push_back(cd);
2328  col_ids.push_back(col_id);
2329  }
2330  size_t col_idx = 0;
2332  insert_data.databaseId = cat_.getCurrentDB().dbId;
2333  insert_data.tableId = table_id;
2334  for (auto target_entry : targets) {
2335  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2336  if (!col_cv) {
2337  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2338  CHECK(col_cast);
2339  CHECK_EQ(kCAST, col_cast->get_optype());
2340  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2341  }
2342  CHECK(col_cv);
2343  const auto cd = col_descriptors[col_idx];
2344  auto col_datum = col_cv->get_constval();
2345  auto col_type = cd->columnType.get_type();
2346  uint8_t* col_data_bytes{nullptr};
2347  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2348  (!cd->columnType.is_string() ||
2349  cd->columnType.get_compression() == kENCODING_DICT)) {
2350  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2351  CHECK(col_data_bytes_it != col_buffers.end());
2352  col_data_bytes = col_data_bytes_it->second.get();
2353  }
2354  switch (col_type) {
2355  case kBOOLEAN: {
2356  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2357  auto null_bool_val =
2358  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2359  *col_data = col_cv->get_is_null() || null_bool_val
2360  ? inline_fixed_encoding_null_val(cd->columnType)
2361  : (col_datum.boolval ? 1 : 0);
2362  break;
2363  }
2364  case kTINYINT: {
2365  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2366  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2367  : col_datum.tinyintval;
2368  break;
2369  }
2370  case kSMALLINT: {
2371  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2372  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2373  : col_datum.smallintval;
2374  break;
2375  }
2376  case kINT: {
2377  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2378  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2379  : col_datum.intval;
2380  break;
2381  }
2382  case kBIGINT:
2383  case kDECIMAL:
2384  case kNUMERIC: {
2385  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2386  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2387  : col_datum.bigintval;
2388  break;
2389  }
2390  case kFLOAT: {
2391  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2392  *col_data = col_datum.floatval;
2393  break;
2394  }
2395  case kDOUBLE: {
2396  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2397  *col_data = col_datum.doubleval;
2398  break;
2399  }
2400  case kTEXT:
2401  case kVARCHAR:
2402  case kCHAR: {
2403  switch (cd->columnType.get_compression()) {
2404  case kENCODING_NONE:
2405  str_col_buffers[col_ids[col_idx]].push_back(
2406  col_datum.stringval ? *col_datum.stringval : "");
2407  break;
2408  case kENCODING_DICT: {
2409  switch (cd->columnType.get_size()) {
2410  case 1:
2412  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2413  break;
2414  case 2:
2416  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2417  break;
2418  case 4:
2420  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2421  break;
2422  default:
2423  CHECK(false);
2424  }
2425  break;
2426  }
2427  default:
2428  CHECK(false);
2429  }
2430  break;
2431  }
2432  case kTIME:
2433  case kTIMESTAMP:
2434  case kDATE: {
2435  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2436  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2437  : col_datum.bigintval;
2438  break;
2439  }
2440  case kARRAY: {
2441  const auto is_null = col_cv->get_is_null();
2442  const auto size = cd->columnType.get_size();
2443  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2444  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2445  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2446  if (is_null && !is_point_coords) {
2447  if (size > 0) {
2448  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2449  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2450  throw std::runtime_error("Column " + cd->columnName +
2451  " doesn't accept NULL values");
2452  }
2453  int8_t* buf = (int8_t*)checked_malloc(size);
2454  put_null_array(static_cast<void*>(buf), elem_ti, "");
2455  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2456  p += elem_ti.get_size()) {
2457  put_null(static_cast<void*>(p), elem_ti, "");
2458  }
2459  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2460  } else {
2461  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2462  }
2463  break;
2464  }
2465  const auto l = col_cv->get_value_list();
2466  size_t len = l.size() * elem_ti.get_size();
2467  if (size > 0 && static_cast<size_t>(size) != len) {
2468  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2469  std::to_string(size / elem_ti.get_size()) +
2470  " values, " + "received " + std::to_string(l.size()));
2471  }
2472  if (elem_ti.is_string()) {
2473  CHECK(kENCODING_DICT == elem_ti.get_compression());
2474  CHECK(4 == elem_ti.get_size());
2475 
2476  int8_t* buf = (int8_t*)checked_malloc(len);
2477  int32_t* p = reinterpret_cast<int32_t*>(buf);
2478 
2479  int elemIndex = 0;
2480  for (auto& e : l) {
2481  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2482  CHECK(c);
2483  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2484  elemIndex++;
2485  }
2486  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2487 
2488  } else {
2489  int8_t* buf = (int8_t*)checked_malloc(len);
2490  int8_t* p = buf;
2491  for (auto& e : l) {
2492  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2493  CHECK(c);
2494  p = appendDatum(p, c->get_constval(), elem_ti);
2495  }
2496  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2497  }
2498  break;
2499  }
2500  case kPOINT:
2501  case kLINESTRING:
2502  case kPOLYGON:
2503  case kMULTIPOLYGON:
2504  str_col_buffers[col_ids[col_idx]].push_back(
2505  col_datum.stringval ? *col_datum.stringval : "");
2506  break;
2507  default:
2508  CHECK(false);
2509  }
2510  ++col_idx;
2511  }
2512  for (const auto& kv : col_buffers) {
2513  insert_data.columnIds.push_back(kv.first);
2514  DataBlockPtr p;
2515  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2516  insert_data.data.push_back(p);
2517  }
2518  for (auto& kv : str_col_buffers) {
2519  insert_data.columnIds.push_back(kv.first);
2520  DataBlockPtr p;
2521  p.stringsPtr = &kv.second;
2522  insert_data.data.push_back(p);
2523  }
2524  for (auto& kv : arr_col_buffers) {
2525  insert_data.columnIds.push_back(kv.first);
2526  DataBlockPtr p;
2527  p.arraysPtr = &kv.second;
2528  insert_data.data.push_back(p);
2529  }
2530  insert_data.numRows = 1;
2531  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2532  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2533  CHECK(table_descriptor);
2534  if (table_descriptor->nShards > 0) {
2535  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2536  CHECK(shard);
2537  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2538  } else {
2539  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2540  }
2541 
2542  // Ensure checkpoint happens across all shards, if not in distributed
2543  // mode (aggregator handles checkpointing in distributed mode)
2544  if (!g_cluster &&
2545  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2546  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2547  }
2548 
2549  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2552  executor_->getRowSetMemoryOwner(),
2553  nullptr,
2554  0,
2555  0);
2556  std::vector<TargetMetaInfo> empty_targets;
2557  return {rs, empty_targets};
2558 }
2559 
2560 namespace {
2561 
2562 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2563 // the executor interface to use the collation directly and remove this conversion.
2564 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2565  std::list<Analyzer::OrderEntry> result;
2566  for (size_t i = 0; i < sort->collationCount(); ++i) {
2567  const auto sort_field = sort->getCollation(i);
2568  result.emplace_back(sort_field.getField() + 1,
2569  sort_field.getSortDir() == SortDirection::Descending,
2570  sort_field.getNullsPosition() == NullSortedPosition::First);
2571  }
2572  return result;
2573 }
2574 
2575 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2576  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2577  if (aggregate) {
2578  return 0;
2579  }
2580  const auto compound = dynamic_cast<const RelCompound*>(ra);
2581  return (compound && compound->isAggregate()) ? 0 : limit;
2582 }
2583 
2584 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2585  return !order_entries.empty() && order_entries.front().is_desc;
2586 }
2587 
2588 } // namespace
2589 
2591  const CompilationOptions& co,
2592  const ExecutionOptions& eo,
2593  RenderInfo* render_info,
2594  const int64_t queue_time_ms) {
2595  auto timer = DEBUG_TIMER(__func__);
2597  const auto source = sort->getInput(0);
2598  const bool is_aggregate = node_is_aggregate(source);
2599  auto it = leaf_results_.find(sort->getId());
2600  if (it != leaf_results_.end()) {
2601  // Add any transient string literals to the sdp on the agg
2602  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2603  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
2604  executor_->row_set_mem_owner_);
2605  // Handle push-down for LIMIT for multi-node
2606  auto& aggregated_result = it->second;
2607  auto& result_rows = aggregated_result.rs;
2608  const size_t limit = sort->getLimit();
2609  const size_t offset = sort->getOffset();
2610  const auto order_entries = get_order_entries(sort);
2611  if (limit || offset) {
2612  if (!order_entries.empty()) {
2613  result_rows->sort(order_entries, limit + offset, executor_);
2614  }
2615  result_rows->dropFirstN(offset);
2616  if (limit) {
2617  result_rows->keepFirstN(limit);
2618  }
2619  }
2620  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2621  sort->setOutputMetainfo(aggregated_result.targets_meta);
2622  return result;
2623  }
2624 
2625  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2626  bool is_desc{false};
2627 
2628  auto execute_sort_query = [this,
2629  sort,
2630  &source,
2631  &is_aggregate,
2632  &eo,
2633  &co,
2634  render_info,
2635  queue_time_ms,
2636  &groupby_exprs,
2637  &is_desc]() -> ExecutionResult {
2638  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2639  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2640  ExecutionOptions eo_copy = {
2642  eo.allow_multifrag,
2643  eo.just_explain,
2644  eo.allow_loop_joins,
2645  eo.with_watchdog,
2646  eo.jit_debug,
2647  eo.just_validate || sort->isEmptyResult(),
2648  eo.with_dynamic_watchdog,
2649  eo.dynamic_watchdog_time_limit,
2650  eo.find_push_down_candidates,
2651  eo.just_calcite_explain,
2652  eo.gpu_input_mem_limit_percent,
2653  eo.allow_runtime_query_interrupt,
2654  eo.running_query_interrupt_freq,
2655  eo.pending_query_interrupt_freq,
2656  eo.executor_type,
2657  };
2658 
2659  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2660  auto source_result = executeWorkUnit(source_work_unit,
2661  source->getOutputMetainfo(),
2662  is_aggregate,
2663  co,
2664  eo_copy,
2665  render_info,
2666  queue_time_ms);
2667  if (render_info && render_info->isPotentialInSituRender()) {
2668  return source_result;
2669  }
2670  if (source_result.isFilterPushDownEnabled()) {
2671  return source_result;
2672  }
2673  auto rows_to_sort = source_result.getRows();
2674  if (eo.just_explain) {
2675  return {rows_to_sort, {}};
2676  }
2677  const size_t limit = sort->getLimit();
2678  const size_t offset = sort->getOffset();
2679  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2680  !use_speculative_top_n(source_work_unit.exe_unit,
2681  rows_to_sort->getQueryMemDesc())) {
2682  const size_t top_n = limit == 0 ? 0 : limit + offset;
2683  rows_to_sort->sort(
2684  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2685  }
2686  if (limit || offset) {
2687  if (g_cluster && sort->collationCount() == 0) {
2688  if (offset >= rows_to_sort->rowCount()) {
2689  rows_to_sort->dropFirstN(offset);
2690  } else {
2691  rows_to_sort->keepFirstN(limit + offset);
2692  }
2693  } else {
2694  rows_to_sort->dropFirstN(offset);
2695  if (limit) {
2696  rows_to_sort->keepFirstN(limit);
2697  }
2698  }
2699  }
2700  return {rows_to_sort, source_result.getTargetsMeta()};
2701  };
2702 
2703  try {
2704  return execute_sort_query();
2705  } catch (const SpeculativeTopNFailed& e) {
2706  CHECK_EQ(size_t(1), groupby_exprs.size());
2707  CHECK(groupby_exprs.front());
2708  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2709  return execute_sort_query();
2710  }
2711 }
2712 
2714  const RelSort* sort,
2715  const ExecutionOptions& eo) {
2716  const auto source = sort->getInput(0);
2717  const size_t limit = sort->getLimit();
2718  const size_t offset = sort->getOffset();
2719  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2720  const size_t scan_total_limit =
2721  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2722  size_t max_groups_buffer_entry_guess{
2723  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
2725  const auto order_entries = get_order_entries(sort);
2726  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2727  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2728  const auto& source_exe_unit = source_work_unit.exe_unit;
2729 
2730  // we do not allow sorting geometry or array types
2731  for (auto order_entry : order_entries) {
2732  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2733  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2734  const auto& ti = get_target_info(te, false);
2735  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2736  throw std::runtime_error(
2737  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2738  }
2739  }
2740 
2741  if (source_exe_unit.groupby_exprs.size() == 1) {
2742  if (!source_exe_unit.groupby_exprs.front()) {
2743  sort_algorithm = SortAlgorithm::StreamingTopN;
2744  } else {
2745  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2746  first_oe_is_desc(order_entries))) {
2747  sort_algorithm = SortAlgorithm::Default;
2748  }
2749  }
2750  }
2751 
2752  sort->setOutputMetainfo(source->getOutputMetainfo());
2753  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2754  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2755  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2756  std::move(source_exe_unit.input_col_descs),
2757  source_exe_unit.simple_quals,
2758  source_exe_unit.quals,
2759  source_exe_unit.join_quals,
2760  source_exe_unit.groupby_exprs,
2761  source_exe_unit.target_exprs,
2762  nullptr,
2763  {sort_info.order_entries, sort_algorithm, limit, offset},
2764  scan_total_limit,
2765  source_exe_unit.query_hint,
2766  EMPTY_QUERY_PLAN, /* skip sort node's recycling */
2767  {},
2768  source_exe_unit.use_bump_allocator,
2769  source_exe_unit.union_all,
2770  source_exe_unit.query_state},
2771  source,
2772  max_groups_buffer_entry_guess,
2773  std::move(source_work_unit.query_rewriter),
2774  source_work_unit.input_permutation,
2775  source_work_unit.left_deep_join_input_sizes};
2776 }
2777 
2778 namespace {
2779 
2786 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2787  CHECK(!table_infos.empty());
2788  const auto& first_table = table_infos.front();
2789  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2790  for (const auto& table_info : table_infos) {
2791  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2792  max_num_groups = table_info.info.getNumTuplesUpperBound();
2793  }
2794  }
2795  return std::max(max_num_groups, size_t(1));
2796 }
2797 
2805  for (const auto target_expr : ra_exe_unit.target_exprs) {
2806  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2807  return false;
2808  }
2809  }
2810  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2811  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2812  return true;
2813  }
2814  return false;
2815 }
2816 
2817 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2818  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2819  ra_exe_unit.simple_quals.empty());
2820 }
2821 
2823  const RelAlgExecutionUnit& ra_exe_unit_in,
2824  const std::vector<InputTableInfo>& table_infos,
2825  const Executor* executor,
2826  const ExecutorDeviceType device_type_in,
2827  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2828  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2829  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2830  const auto target_expr = ra_exe_unit.target_exprs[i];
2831  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2832  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2833  continue;
2834  }
2835  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2836  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2837  CHECK(arg);
2838  const auto& arg_ti = arg->get_type_info();
2839  // Avoid calling getExpressionRange for variable length types (string and array),
2840  // it'd trigger an assertion since that API expects to be called only for types
2841  // for which the notion of range is well-defined. A bit of a kludge, but the
2842  // logic to reject these types anyway is at lower levels in the stack and not
2843  // really worth pulling into a separate function for now.
2844  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2845  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2846  continue;
2847  }
2848  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2849  if (arg_range.getType() != ExpressionRangeType::Integer) {
2850  continue;
2851  }
2852  // When running distributed, the threshold for using the precise implementation
2853  // must be consistent across all leaves, otherwise we could have a mix of precise
2854  // and approximate bitmaps and we cannot aggregate them.
2855  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2856  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2857  const auto sub_bitmap_count =
2858  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2859  int64_t approx_bitmap_sz_bits{0};
2860  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
2861  if (error_rate) {
2862  CHECK(error_rate->get_type_info().get_type() == kINT);
2863  CHECK_GE(error_rate->get_constval().intval, 1);
2864  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2865  } else {
2866  approx_bitmap_sz_bits = g_hll_precision_bits;
2867  }
2868  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2869  arg_range.getIntMin(),
2870  approx_bitmap_sz_bits,
2871  true,
2872  device_type,
2873  sub_bitmap_count};
2874  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2875  arg_range.getIntMin(),
2876  bitmap_sz_bits,
2877  false,
2878  device_type,
2879  sub_bitmap_count};
2880  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2881  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2882  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2883  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2884  target_exprs_owned.push_back(precise_count_distinct);
2885  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2886  }
2887  }
2888  return ra_exe_unit;
2889 }
2890 
2892  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2893  const std::vector<TargetMetaInfo>& targets_meta) {
2894  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2895  render_info.targets.clear();
2896  for (size_t i = 0; i < targets_meta.size(); ++i) {
2897  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2898  targets_meta[i].get_resname(),
2899  work_unit_target_exprs[i]->get_shared_ptr(),
2900  false));
2901  }
2902 }
2903 
2904 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2905  const CompilationOptions& co,
2906  const ExecutionOptions& eo) {
2908  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2909 }
2910 
2911 } // namespace
2912 
2914  const RelAlgExecutor::WorkUnit& work_unit,
2915  const std::vector<TargetMetaInfo>& targets_meta,
2916  const bool is_agg,
2917  const CompilationOptions& co_in,
2918  const ExecutionOptions& eo,
2919  RenderInfo* render_info,
2920  const int64_t queue_time_ms,
2921  const std::optional<size_t> previous_count) {
2923  auto timer = DEBUG_TIMER(__func__);
2924 
2925  auto co = co_in;
2926  ColumnCacheMap column_cache;
2927  if (is_window_execution_unit(work_unit.exe_unit)) {
2929  throw std::runtime_error("Window functions support is disabled");
2930  }
2932  co.allow_lazy_fetch = false;
2933  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2934  }
2935  if (!eo.just_explain && eo.find_push_down_candidates) {
2936  // find potential candidates:
2937  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2938  if (!selected_filters.empty() || eo.just_calcite_explain) {
2939  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2940  }
2941  }
2942  if (render_info && render_info->isPotentialInSituRender()) {
2943  co.allow_lazy_fetch = false;
2944  }
2945  const auto body = work_unit.body;
2946  CHECK(body);
2947  auto it = leaf_results_.find(body->getId());
2948  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2949  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2950  if (it != leaf_results_.end()) {
2951  executor_->addTransientStringLiterals(work_unit.exe_unit,
2952  executor_->row_set_mem_owner_);
2953  auto& aggregated_result = it->second;
2954  auto& result_rows = aggregated_result.rs;
2955  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2956  body->setOutputMetainfo(aggregated_result.targets_meta);
2957  if (render_info) {
2958  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2959  }
2960  return result;
2961  }
2962  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2963 
2965  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2966 
2967  // register query hint if query_dag_ is valid
2968  ra_exe_unit.query_hint =
2969  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults();
2970 
2971  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2972  if (is_window_execution_unit(ra_exe_unit)) {
2973  CHECK_EQ(table_infos.size(), size_t(1));
2974  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2975  max_groups_buffer_entry_guess =
2976  table_infos.front().info.fragments.front().getNumTuples();
2977  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2978  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2979  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2980  ra_exe_unit.scan_limit = *previous_count;
2981  } else {
2982  // TODO(adb): enable bump allocator path for render queries
2983  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2984  ra_exe_unit.scan_limit = 0;
2985  ra_exe_unit.use_bump_allocator = true;
2986  } else if (eo.executor_type == ::ExecutorType::Extern) {
2987  ra_exe_unit.scan_limit = 0;
2988  } else if (!eo.just_explain) {
2989  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2990  if (filter_count_all) {
2991  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2992  }
2993  }
2994  }
2995  }
2996  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2997  co.device_type,
2999  nullptr,
3000  executor_->getCatalog(),
3001  executor_->blockSize(),
3002  executor_->gridSize()),
3003  {}};
3004 
3005  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3006  const bool has_cardinality_estimation,
3007  const bool has_ndv_estimation) -> ExecutionResult {
3008  // Note that the groups buffer entry guess may be modified during query execution.
3009  // Create a local copy so we can track those changes if we need to attempt a retry
3010  // due to OOM
3011  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3012  try {
3013  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3014  is_agg,
3015  table_infos,
3016  ra_exe_unit,
3017  co,
3018  eo,
3019  cat_,
3020  render_info,
3021  has_cardinality_estimation,
3022  column_cache),
3023  targets_meta};
3024  } catch (const QueryExecutionError& e) {
3025  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3026  throw CardinalityEstimationRequired(/*range=*/0);
3027  }
3029  return handleOutOfMemoryRetry(
3030  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3031  targets_meta,
3032  is_agg,
3033  co,
3034  eo,
3035  render_info,
3037  queue_time_ms);
3038  }
3039  };
3040 
3041  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3042  try {
3043  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3044  auto card = cached_cardinality.second;
3045  if (cached_cardinality.first && card >= 0) {
3046  result = execute_and_handle_errors(
3047  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3048  } else {
3049  result = execute_and_handle_errors(
3050  max_groups_buffer_entry_guess,
3052  /*has_ndv_estimation=*/false);
3053  }
3054  } catch (const CardinalityEstimationRequired& e) {
3055  // check the cardinality cache
3056  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3057  auto card = cached_cardinality.second;
3058  if (cached_cardinality.first && card >= 0) {
3059  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3060  } else {
3061  const auto ndv_groups_estimation =
3062  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3063  const auto estimated_groups_buffer_entry_guess =
3064  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3065  : std::min(groups_approx_upper_bound(table_infos),
3067  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3068  result = execute_and_handle_errors(
3069  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3070  if (!(eo.just_validate || eo.just_explain)) {
3071  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3072  }
3073  }
3074  }
3075 
3076  result.setQueueTime(queue_time_ms);
3077  if (render_info) {
3078  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3079  if (render_info->isPotentialInSituRender()) {
3080  // return an empty result (with the same queue time, and zero render time)
3081  return {std::make_shared<ResultSet>(
3082  queue_time_ms,
3083  0,
3084  executor_->row_set_mem_owner_
3085  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3086  : nullptr),
3087  {}};
3088  }
3089  }
3090  return result;
3091 }
3092 
3093 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3094  const bool is_agg,
3095  const CompilationOptions& co,
3096  const ExecutionOptions& eo) {
3097  const auto count =
3098  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3099  kCOUNT,
3100  nullptr,
3101  false,
3102  nullptr);
3103  const auto count_all_exe_unit =
3105  size_t one{1};
3106  ResultSetPtr count_all_result;
3107  try {
3108  ColumnCacheMap column_cache;
3109  count_all_result =
3110  executor_->executeWorkUnit(one,
3111  is_agg,
3112  get_table_infos(work_unit.exe_unit, executor_),
3113  count_all_exe_unit,
3114  co,
3115  eo,
3116  cat_,
3117  nullptr,
3118  false,
3119  column_cache);
3120  } catch (const foreign_storage::ForeignStorageException& error) {
3121  throw error;
3122  } catch (const QueryMustRunOnCpu&) {
3123  // force a retry of the top level query on CPU
3124  throw;
3125  } catch (const std::exception& e) {
3126  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3127  return std::nullopt;
3128  }
3129  const auto count_row = count_all_result->getNextRow(false, false);
3130  CHECK_EQ(size_t(1), count_row.size());
3131  const auto& count_tv = count_row.front();
3132  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3133  CHECK(count_scalar_tv);
3134  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3135  CHECK(count_ptr);
3136  CHECK_GE(*count_ptr, 0);
3137  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3138  return std::max(count_upper_bound, size_t(1));
3139 }
3140 
3141 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3142  const auto& ra_exe_unit = work_unit.exe_unit;
3143  if (ra_exe_unit.input_descs.size() != 1) {
3144  return false;
3145  }
3146  const auto& table_desc = ra_exe_unit.input_descs.front();
3147  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3148  return false;
3149  }
3150  const int table_id = table_desc.getTableId();
3151  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3152  const auto comp_expr =
3153  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3154  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3155  return false;
3156  }
3157  const auto lhs = comp_expr->get_left_operand();
3158  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3159  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3160  return false;
3161  }
3162  const auto rhs = comp_expr->get_right_operand();
3163  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3164  if (!rhs_const) {
3165  return false;
3166  }
3167  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3168  if (cd->isVirtualCol) {
3169  CHECK_EQ("rowid", cd->columnName);
3170  return true;
3171  }
3172  }
3173  return false;
3174 }
3175 
3177  const RelAlgExecutor::WorkUnit& work_unit,
3178  const std::vector<TargetMetaInfo>& targets_meta,
3179  const bool is_agg,
3180  const CompilationOptions& co,
3181  const ExecutionOptions& eo,
3182  RenderInfo* render_info,
3183  const bool was_multifrag_kernel_launch,
3184  const int64_t queue_time_ms) {
3185  // Disable the bump allocator
3186  // Note that this will have basically the same affect as using the bump allocator for
3187  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3188  // path and the bump allocator path for kernel per fragment execution.
3189  auto ra_exe_unit_in = work_unit.exe_unit;
3190  ra_exe_unit_in.use_bump_allocator = false;
3191 
3192  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3193  co.device_type,
3195  nullptr,
3196  executor_->getCatalog(),
3197  executor_->blockSize(),
3198  executor_->gridSize()),
3199  {}};
3200 
3201  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3202  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3203  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3204  false,
3205  false,
3206  eo.allow_loop_joins,
3207  eo.with_watchdog,
3208  eo.jit_debug,
3209  false,
3212  false,
3213  false,
3218  eo.executor_type,
3220 
3221  if (was_multifrag_kernel_launch) {
3222  try {
3223  // Attempt to retry using the kernel per fragment path. The smaller input size
3224  // required may allow the entire kernel to execute in GPU memory.
3225  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3226  "kernels disabled.";
3227  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3228  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3229  ColumnCacheMap column_cache;
3230  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3231  is_agg,
3232  table_infos,
3233  ra_exe_unit,
3234  co,
3235  eo_no_multifrag,
3236  cat_,
3237  nullptr,
3238  true,
3239  column_cache),
3240  targets_meta};
3241  result.setQueueTime(queue_time_ms);
3242  } catch (const QueryExecutionError& e) {
3244  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3245  }
3246  }
3247 
3248  if (render_info) {
3249  render_info->setForceNonInSituData();
3250  }
3251 
3252  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3253  // Only reset the group buffer entry guess if we ran out of slots, which
3254  // suggests a
3255  // highly pathological input which prevented a good estimation of distinct tuple
3256  // count. For projection queries, this will force a per-fragment scan limit, which is
3257  // compatible with the CPU path
3258  VLOG(1) << "Resetting max groups buffer entry guess.";
3259  max_groups_buffer_entry_guess = 0;
3260 
3261  int iteration_ctr = -1;
3262  while (true) {
3263  iteration_ctr++;
3265  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3266  ColumnCacheMap column_cache;
3267  try {
3268  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3269  is_agg,
3270  table_infos,
3271  ra_exe_unit,
3272  co_cpu,
3273  eo_no_multifrag,
3274  cat_,
3275  nullptr,
3276  true,
3277  column_cache),
3278  targets_meta};
3279  } catch (const QueryExecutionError& e) {
3280  // Ran out of slots
3281  if (e.getErrorCode() < 0) {
3282  // Even the conservative guess failed; it should only happen when we group
3283  // by a huge cardinality array. Maybe we should throw an exception instead?
3284  // Such a heavy query is entirely capable of exhausting all the host memory.
3285  CHECK(max_groups_buffer_entry_guess);
3286  // Only allow two iterations of increasingly large entry guesses up to a maximum
3287  // of 512MB per column per kernel
3288  if (g_enable_watchdog || iteration_ctr > 1) {
3289  throw std::runtime_error("Query ran out of output slots in the result");
3290  }
3291  max_groups_buffer_entry_guess *= 2;
3292  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3293  "groups buffer entry "
3294  "guess equal to "
3295  << max_groups_buffer_entry_guess;
3296  } else {
3298  }
3299  continue;
3300  }
3301  result.setQueueTime(queue_time_ms);
3302  return result;
3303  }
3304  return result;
3305 }
3306 
3307 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3308  LOG(ERROR) << "Query execution failed with error "
3309  << getErrorMessageFromCode(error_code);
3310  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3311  // We ran out of GPU memory, this doesn't count as an error if the query is
3312  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3313  // --allow-cpu-retry.
3314  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3315  if (!g_allow_cpu_retry) {
3316  throw std::runtime_error(
3317  "Query ran out of GPU memory, unable to automatically retry on CPU");
3318  }
3319  return;
3320  }
3321  throw std::runtime_error(getErrorMessageFromCode(error_code));
3322 }
3323 
3324 namespace {
3325 struct ErrorInfo {
3326  const char* code{nullptr};
3327  const char* description{nullptr};
3328 };
3329 ErrorInfo getErrorDescription(const int32_t error_code) {
3330  switch (error_code) {
3332  return {.code = "ERR_DIV_BY_ZERO", .description = "Division by zero"};
3334  return {.code = "ERR_OUT_OF_GPU_MEM",
3335  .description =
3336  "Query couldn't keep the entire working set of columns in GPU memory"};
3338  return {.code = "ERR_UNSUPPORTED_SELF_JOIN",
3339  .description = "Self joins not supported yet"};
3341  return {.code = "ERR_OUT_OF_CPU_MEM",
3342  .description = "Not enough host memory to execute the query"};
3344  return {.code = "ERR_OVERFLOW_OR_UNDERFLOW",
3345  .description = "Overflow or underflow"};
3347  return {.code = "ERR_OUT_OF_TIME",
3348  .description = "Query execution has exceeded the time limit"};
3350  return {.code = "ERR_INTERRUPTED",
3351  .description = "Query execution has been interrupted"};
3353  return {
3354  .code = "ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3355  .description = "Columnar conversion not supported for variable length types"};
3357  return {.code = "ERR_TOO_MANY_LITERALS",
3358  .description = "Too many literals in the query"};
3360  return {.code = "ERR_STRING_CONST_IN_RESULTSET",
3361  .description =
3362  "NONE ENCODED String types are not supported as input result set."};
3364  return {.code = "ERR_OUT_OF_RENDER_MEM",
3365  .description =
3366  "Insufficient GPU memory for query results in render output buffer "
3367  "sized by render-mem-bytes"};
3369  return {.code = "ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3370  .description = "Streaming-Top-N not supported in Render Query"};
3372  return {.code = "ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3373  .description = "Multiple distinct values encountered"};
3374  case Executor::ERR_GEOS:
3375  return {.code = "ERR_GEOS", .description = "ERR_GEOS"};
3376  default:
3377  return {.code = nullptr, .description = nullptr};
3378  }
3379 }
3380 
3381 } // namespace
3382 
3383 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3384  if (error_code < 0) {
3385  return "Ran out of slots in the query output buffer";
3386  }
3387  const auto errorInfo = getErrorDescription(error_code);
3388 
3389  if (errorInfo.code) {
3390  return errorInfo.code + ": "s + errorInfo.description;
3391  } else {
3392  return "Other error: code "s + std::to_string(error_code);
3393  }
3394 }
3395 
3398  VLOG(1) << "Running post execution callback.";
3399  (*post_execution_callback_)();
3400  }
3401 }
3402 
3404  const SortInfo& sort_info,
3405  const ExecutionOptions& eo) {
3406  const auto compound = dynamic_cast<const RelCompound*>(node);
3407  if (compound) {
3408  return createCompoundWorkUnit(compound, sort_info, eo);
3409  }
3410  const auto project = dynamic_cast<const RelProject*>(node);
3411  if (project) {
3412  return createProjectWorkUnit(project, sort_info, eo);
3413  }
3414  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3415  if (aggregate) {
3416  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3417  }
3418  const auto filter = dynamic_cast<const RelFilter*>(node);
3419  if (filter) {
3420  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3421  }
3422  LOG(FATAL) << "Unhandled node type: " << node->toString();
3423  return {};
3424 }
3425 
3426 namespace {
3427 
3429  auto sink = get_data_sink(ra);
3430  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3431  return join->getJoinType();
3432  }
3433  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3434  return JoinType::INNER;
3435  }
3436 
3437  return JoinType::INVALID;
3438 }
3439 
3440 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3441  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3442  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3443  return nullptr;
3444  }
3445  const auto equi_join_condition =
3446  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3447  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3448  return nullptr;
3449  }
3450  const auto both_are_null_condition =
3451  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3452  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3453  both_are_null_condition->size() != 2) {
3454  return nullptr;
3455  }
3456  const auto lhs_is_null =
3457  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3458  const auto rhs_is_null =
3459  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3460  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3461  rhs_is_null->getOperator() != kISNULL) {
3462  return nullptr;
3463  }
3464  CHECK_EQ(size_t(1), lhs_is_null->size());
3465  CHECK_EQ(size_t(1), rhs_is_null->size());
3466  CHECK_EQ(size_t(2), equi_join_condition->size());
3467  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3468  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3469  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3470  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3471  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3472  return nullptr;
3473  }
3474  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3475  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3476  RexDeepCopyVisitor deep_copy_visitor;
3477  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3478  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3479  eq_operands.emplace_back(lhs_op_copy.release());
3480  eq_operands.emplace_back(rhs_op_copy.release());
3481  return boost::make_unique<const RexOperator>(
3482  kBW_EQ, eq_operands, equi_join_condition->getType());
3483  }
3484  return nullptr;
3485 }
3486 
3487 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3488  const RexScalar* scalar) {
3489  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3490  if (condition && condition->getOperator() == kAND) {
3491  CHECK_GE(condition->size(), size_t(2));
3492  auto acc = get_bitwise_equals(condition->getOperand(0));
3493  if (!acc) {
3494  return nullptr;
3495  }
3496  for (size_t i = 1; i < condition->size(); ++i) {
3497  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3498  and_operands.emplace_back(std::move(acc));
3499  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3500  acc =
3501  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3502  }
3503  return acc;
3504  }
3505  return get_bitwise_equals(scalar);
3506 }
3507 
3508 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3509  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3510  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3511  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3512  ++nesting_level) {
3513  if (left_deep_join->getOuterCondition(nesting_level)) {
3514  join_types[nesting_level - 1] = JoinType::LEFT;
3515  }
3516  }
3517  return join_types;
3518 }
3519 
3520 template <class RA>
3521 std::vector<size_t> do_table_reordering(
3522  std::vector<InputDescriptor>& input_descs,
3523  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3524  const JoinQualsPerNestingLevel& left_deep_join_quals,
3525  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3526  const RA* node,
3527  const std::vector<InputTableInfo>& query_infos,
3528  const Executor* executor) {
3529  if (g_cluster) {
3530  // Disable table reordering in distributed mode. The aggregator does not have enough
3531  // information to break ties
3532  return {};
3533  }
3534  const auto& cat = *executor->getCatalog();
3535  for (const auto& table_info : query_infos) {
3536  if (table_info.table_id < 0) {
3537  continue;
3538  }
3539  const auto td = cat.getMetadataForTable(table_info.table_id);
3540  CHECK(td);
3541  if (table_is_replicated(td)) {
3542  return {};
3543  }
3544  }
3545  const auto input_permutation =
3546  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3547  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3548  std::tie(input_descs, input_col_descs, std::ignore) =
3549  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3550  return input_permutation;
3551 }
3552 
3554  const RelLeftDeepInnerJoin* left_deep_join) {
3555  std::vector<size_t> input_sizes;
3556  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3557  const auto inputs = get_node_output(left_deep_join->getInput(i));
3558  input_sizes.push_back(inputs.size());
3559  }
3560  return input_sizes;
3561 }
3562 
3563 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3564  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3565  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3566  for (const auto& qual : quals) {
3567  const auto rewritten_qual = rewrite_expr(qual.get());
3568  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3569  }
3570  return rewritten_quals;
3571 }
3572 
3573 } // namespace
3574 
3576  const RelCompound* compound,
3577  const SortInfo& sort_info,
3578  const ExecutionOptions& eo) {
3579  std::vector<InputDescriptor> input_descs;
3580  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3581  auto input_to_nest_level = get_input_nest_levels(compound, {});
3582  std::tie(input_descs, input_col_descs, std::ignore) =
3583  get_input_desc(compound, input_to_nest_level, {}, cat_);
3584  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3585  const auto query_infos = get_table_infos(input_descs, executor_);
3586  CHECK_EQ(size_t(1), compound->inputCount());
3587  const auto left_deep_join =
3588  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3589  JoinQualsPerNestingLevel left_deep_join_quals;
3590  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3591  : std::vector<JoinType>{get_join_type(compound)};
3592  std::vector<size_t> input_permutation;
3593  std::vector<size_t> left_deep_join_input_sizes;
3594  std::optional<unsigned> left_deep_tree_id;
3595  if (left_deep_join) {
3596  left_deep_tree_id = left_deep_join->getId();
3597  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3598  left_deep_join_quals = translateLeftDeepJoinFilter(
3599  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3601  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3602  join_types.end()) {
3603  input_permutation = do_table_reordering(input_descs,
3604  input_col_descs,
3605  left_deep_join_quals,
3606  input_to_nest_level,
3607  compound,
3608  query_infos,
3609  executor_);
3610  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3611  std::tie(input_descs, input_col_descs, std::ignore) =
3612  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3613  left_deep_join_quals = translateLeftDeepJoinFilter(
3614  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3615  }
3616  }
3617  RelAlgTranslator translator(cat_,
3618  query_state_,
3619  executor_,
3620  input_to_nest_level,
3621  join_types,
3622  now_,
3623  eo.just_explain);
3624  const auto scalar_sources =
3625  translate_scalar_sources(compound, translator, eo.executor_type);
3626  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3627  const auto quals_cf = translate_quals(compound, translator);
3628  const auto target_exprs = translate_targets(target_exprs_owned_,
3629  scalar_sources,
3630  groupby_exprs,
3631  compound,
3632  translator,
3633  eo.executor_type);
3634  CHECK_EQ(compound->size(), target_exprs.size());
3635  const RelAlgExecutionUnit exe_unit = {
3636  input_descs,
3637  input_col_descs,
3638  quals_cf.simple_quals,
3639  rewrite_quals(quals_cf.quals),
3640  left_deep_join_quals,
3641  groupby_exprs,
3642  target_exprs,
3643  nullptr,
3644  sort_info,
3645  0,
3646  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3648  {},
3649  false,
3650  std::nullopt,
3651  query_state_};
3652  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3653  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3654  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3655  compound->setOutputMetainfo(targets_meta);
3656  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
3657  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
3658  left_deep_trees_info.emplace(left_deep_tree_id.value(),
3659  rewritten_exe_unit.join_quals);
3660  }
3661  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(compound,
3662  cat_,
3663  left_deep_tree_id,
3664  left_deep_trees_info,
3666  executor_,
3667  translator);
3668  if (is_extracted_dag_valid(dag_info)) {
3669  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
3670  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
3671  }
3672  return {rewritten_exe_unit,
3673  compound,
3675  std::move(query_rewriter),
3676  input_permutation,
3677  left_deep_join_input_sizes};
3678 }
3679 
3680 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
3681  const RelAlgNode* node) {
3682  auto input_to_nest_level = get_input_nest_levels(node, {});
3683  const auto left_deep_join =
3684  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
3685  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3686  : std::vector<JoinType>{get_join_type(node)};
3687  return std::make_shared<RelAlgTranslator>(
3688  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
3689 }
3690 
3691 namespace {
3692 
3693 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3694  CHECK(qual_expr);
3695  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3696  if (!bin_oper || bin_oper->getOperator() != kAND) {
3697  return {qual_expr};
3698  }
3699  CHECK_GE(bin_oper->size(), size_t(2));
3700  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3701  for (size_t i = 1; i < bin_oper->size(); ++i) {
3702  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3703  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3704  }
3705  return lhs_cf;
3706 }
3707 
3708 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3709  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3710  const SQLOps sql_op) {
3711  CHECK(!factors.empty());
3712  auto acc = factors.front();
3713  for (size_t i = 1; i < factors.size(); ++i) {
3714  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3715  }
3716  return acc;
3717 }
3718 
3719 template <class QualsList>
3720 bool list_contains_expression(const QualsList& haystack,
3721  const std::shared_ptr<Analyzer::Expr>& needle) {
3722  for (const auto& qual : haystack) {
3723  if (*qual == *needle) {
3724  return true;
3725  }
3726  }
3727  return false;
3728 }
3729 
3730 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3731 // evaluations of `p` and allows use of the original form in joins if `p`
3732 // can be used for hash joins.
3733 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3734  const std::shared_ptr<Analyzer::Expr>& expr) {
3735  const auto expr_terms = qual_to_disjunctive_form(expr);
3736  CHECK_GE(expr_terms.size(), size_t(1));
3737  const auto& first_term = expr_terms.front();
3738  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3739  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3740  // First, collect the conjunctive components common to all the disjunctive components.
3741  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3742  for (const auto& first_term_factor : first_term_factors.quals) {
3743  bool is_common =
3744  expr_terms.size() > 1; // Only report common factors for disjunction.
3745  for (size_t i = 1; i < expr_terms.size(); ++i) {
3746  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3747  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3748  is_common = false;
3749  break;
3750  }
3751  }
3752  if (is_common) {
3753  common_factors.push_back(first_term_factor);
3754  }
3755  }
3756  if (common_factors.empty()) {
3757  return expr;
3758  }
3759  // Now that the common expressions are known, collect the remaining expressions.
3760  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3761  for (const auto& term : expr_terms) {
3762  const auto term_cf = qual_to_conjunctive_form(term);
3763  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3764  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3765  for (const auto& qual : term_cf.quals) {
3766  if (!list_contains_expression(common_factors, qual)) {
3767  remaining_quals.push_back(qual);
3768  }
3769  }
3770  if (!remaining_quals.empty()) {
3771  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3772  }
3773  }
3774  // Reconstruct the expression with the transformation applied.
3775  const auto common_expr = build_logical_expression(common_factors, kAND);
3776  if (remaining_terms.empty()) {
3777  return common_expr;
3778  }
3779  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3780  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3781 }
3782 
3783 } // namespace
3784 
3785 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3786  const RexScalar* join_condition,
3787  const std::vector<JoinType>& join_types,
3788  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3789  const bool just_explain) const {
3790  RelAlgTranslator translator(
3791  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3792  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3793  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3794  for (const auto rex_condition_component : rex_condition_cf) {
3795  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3796  const auto join_condition =
3798  bw_equals ? bw_equals.get() : rex_condition_component));
3799  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3800  join_condition_quals.insert(join_condition_quals.end(),
3801  join_condition_cf.quals.begin(),
3802  join_condition_cf.quals.end());
3803  join_condition_quals.insert(join_condition_quals.end(),
3804  join_condition_cf.simple_quals.begin(),
3805  join_condition_cf.simple_quals.end());
3806  }
3807  return combine_equi_join_conditions(join_condition_quals);
3808 }
3809 
3810 // Translate left deep join filter and separate the conjunctive form qualifiers
3811 // per nesting level. The code generated for hash table lookups on each level
3812 // must dominate its uses in deeper nesting levels.
3814  const RelLeftDeepInnerJoin* join,
3815  const std::vector<InputDescriptor>& input_descs,
3816  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3817  const bool just_explain) {
3818  const auto join_types = left_deep_join_types(join);
3819  const auto join_condition_quals = makeJoinQuals(
3820  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3821  MaxRangeTableIndexVisitor rte_idx_visitor;
3822  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3823  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3824  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3825  const auto outer_condition = join->getOuterCondition(rte_idx);
3826  if (outer_condition) {
3827  result[rte_idx - 1].quals =
3828  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3829  CHECK_LE(rte_idx, join_types.size());
3830  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3831  result[rte_idx - 1].type = JoinType::LEFT;
3832  continue;
3833  }
3834  for (const auto& qual : join_condition_quals) {
3835  if (visited_quals.count(qual)) {
3836  continue;
3837  }
3838  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3839  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3840  const auto it_ok = visited_quals.emplace(qual);
3841  CHECK(it_ok.second);
3842  result[rte_idx - 1].quals.push_back(qual);
3843  }
3844  }
3845  CHECK_LE(rte_idx, join_types.size());
3846  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3847  result[rte_idx - 1].type = JoinType::INNER;
3848  }
3849  return result;
3850 }
3851 
3852 namespace {
3853 
3854 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3855  const RelAlgNode* ra_node,
3856  const size_t nest_level,
3857  const std::vector<TargetMetaInfo>& in_metainfo,
3858  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3859  CHECK_LE(size_t(1), ra_node->inputCount());
3860  CHECK_GE(size_t(2), ra_node->inputCount());
3861  const auto input = ra_node->getInput(nest_level);
3862  const auto it_rte_idx = input_to_nest_level.find(input);
3863  CHECK(it_rte_idx != input_to_nest_level.end());
3864  const int rte_idx = it_rte_idx->second;
3865  const int table_id = table_id_from_ra(input);
3866  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3867  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3868  int input_idx = 0;
3869  for (const auto& input_meta : in_metainfo) {
3870  inputs.push_back(
3871  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3872  table_id,
3873  scan_ra ? input_idx + 1 : input_idx,
3874  rte_idx));
3875  ++input_idx;
3876  }
3877  return inputs;
3878 }
3879 
3880 } // namespace
3881 
3883  const RelAggregate* aggregate,
3884  const SortInfo& sort_info,
3885  const bool just_explain) {
3886  std::vector<InputDescriptor> input_descs;
3887  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3888  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3889  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3890  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3891  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3892  const auto join_type = get_join_type(aggregate);
3893 
3894  RelAlgTranslator translator(cat_,
3895  query_state_,
3896  executor_,
3897  input_to_nest_level,
3898  {join_type},
3899  now_,
3900  just_explain);
3901  CHECK_EQ(size_t(1), aggregate->inputCount());
3902  const auto source = aggregate->getInput(0);
3903  const auto& in_metainfo = source->getOutputMetainfo();
3904  const auto scalar_sources =
3905  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3906  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3907  const auto target_exprs = translate_targets(
3908  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3909  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3910  aggregate->setOutputMetainfo(targets_meta);
3911  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(aggregate,
3912  cat_,
3913  std::nullopt,
3916  executor_,
3917  translator);
3918  return {RelAlgExecutionUnit{
3919  input_descs,
3920  input_col_descs,
3921  {},
3922  {},
3923  {},
3924  groupby_exprs,
3925  target_exprs,
3926  nullptr,
3927  sort_info,
3928  0,
3929  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3930  dag_info.extracted_dag,
3931  dag_info.hash_table_plan_dag,
3932  false,
3933  std::nullopt,
3934  query_state_},
3935  aggregate,
3937  nullptr};
3938 }
3939 
3941  const RelProject* project,
3942  const SortInfo& sort_info,
3943  const ExecutionOptions& eo) {
3944  std::vector<InputDescriptor> input_descs;
3945  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3946  auto input_to_nest_level = get_input_nest_levels(project, {});
3947  std::tie(input_descs, input_col_descs, std::ignore) =
3948  get_input_desc(project, input_to_nest_level, {}, cat_);
3949  const auto query_infos = get_table_infos(input_descs, executor_);
3950 
3951  const auto left_deep_join =
3952  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3953  JoinQualsPerNestingLevel left_deep_join_quals;
3954  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3955  : std::vector<JoinType>{get_join_type(project)};
3956  std::vector<size_t> input_permutation;
3957  std::vector<size_t> left_deep_join_input_sizes;
3958  std::optional<unsigned> left_deep_tree_id;
3959  if (left_deep_join) {
3960  left_deep_tree_id = left_deep_join->getId();
3961  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3962  const auto query_infos = get_table_infos(input_descs, executor_);
3963  left_deep_join_quals = translateLeftDeepJoinFilter(
3964  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3966  input_permutation = do_table_reordering(input_descs,
3967  input_col_descs,
3968  left_deep_join_quals,
3969  input_to_nest_level,
3970  project,
3971  query_infos,
3972  executor_);
3973  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3974  std::tie(input_descs, input_col_descs, std::ignore) =
3975  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3976  left_deep_join_quals = translateLeftDeepJoinFilter(
3977  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3978  }
3979  }
3980 
3981  RelAlgTranslator translator(cat_,
3982  query_state_,
3983  executor_,
3984  input_to_nest_level,
3985  join_types,
3986  now_,
3987  eo.just_explain);
3988  const auto target_exprs_owned =
3989  translate_scalar_sources(project, translator, eo.executor_type);
3990  target_exprs_owned_.insert(
3991  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3992  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3993 
3994  const RelAlgExecutionUnit exe_unit = {
3995  input_descs,
3996  input_col_descs,
3997  {},
3998  {},
3999  left_deep_join_quals,
4000  {nullptr},
4001  target_exprs,
4002  nullptr,
4003  sort_info,
4004  0,
4005  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
4007  {},
4008  false,
4009  std::nullopt,
4010  query_state_};
4011  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4012  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4013  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4014  project->setOutputMetainfo(targets_meta);
4015  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4016  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4017  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4018  rewritten_exe_unit.join_quals);
4019  }
4020  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(project,
4021  cat_,
4022  left_deep_tree_id,
4023  left_deep_trees_info,
4025  executor_,
4026  translator);
4027  if (is_extracted_dag_valid(dag_info)) {
4028  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
4029  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
4030  }
4031  return {rewritten_exe_unit,
4032  project,
4034  std::move(query_rewriter),
4035  input_permutation,
4036  left_deep_join_input_sizes};
4037 }
4038 
4039 namespace {
4040 
4041 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4042  RelAlgNode const* input_node) {
4043  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4044  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4045  const int negative_node_id = -input_node->getId();
4046  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4047  target_exprs.reserve(tmis.size());
4048  for (size_t i = 0; i < tmis.size(); ++i) {
4049  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4050  tmis[i].get_type_info(), negative_node_id, i, 0));
4051  }
4052  return target_exprs;
4053 }
4054 
4055 } // namespace
4056 
4058  const RelLogicalUnion* logical_union,
4059  const SortInfo& sort_info,
4060  const ExecutionOptions& eo) {
4061  std::vector<InputDescriptor> input_descs;
4062  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4063  // Map ra input ptr to index (0, 1).
4064  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4065  std::tie(input_descs, input_col_descs, std::ignore) =
4066  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4067  const auto query_infos = get_table_infos(input_descs, executor_);
4068  auto const max_num_tuples =
4069  std::accumulate(query_infos.cbegin(),
4070  query_infos.cend(),
4071  size_t(0),
4072  [](auto max, auto const& query_info) {
4073  return std::max(max, query_info.info.getNumTuples());
4074  });
4075 
4076  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4077  for (auto& pair : input_to_nest_level) {
4078  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
4079  }
4080 
4081  RelAlgTranslator translator(
4082  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4083 
4084  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
4085  CHECK(!input_exprs_owned.empty())
4086  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
4087  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
4088  for (auto& input_expr : input_exprs_owned) {
4089  VLOG(3) << " " << input_expr->toString();
4090  }
4091  target_exprs_owned_.insert(
4092  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4093  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
4094 
4095  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4096  << " input_col_descs=" << shared::printContainer(input_col_descs)
4097  << " target_exprs.size()=" << target_exprs.size()
4098  << " max_num_tuples=" << max_num_tuples;
4099 
4100  const RelAlgExecutionUnit exe_unit = {
4101  input_descs,
4102  input_col_descs,
4103  {}, // quals_cf.simple_quals,
4104  {}, // rewrite_quals(quals_cf.quals),
4105  {},
4106  {nullptr},
4107  target_exprs,
4108  nullptr,
4109  sort_info,
4110  max_num_tuples,
4111  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
4113  {},
4114  false,
4115  logical_union->isAll(),
4116  query_state_};
4117  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4118  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4119 
4120  RelAlgNode const* input0 = logical_union->getInput(0);
4121  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4122  logical_union->setOutputMetainfo(
4123  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4124  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4125  logical_union->setOutputMetainfo(
4126  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4127  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4128  logical_union->setOutputMetainfo(
4129  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4130  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4131  logical_union->setOutputMetainfo(
4132  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4133  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4134  logical_union->setOutputMetainfo(
4135  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4136  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4137  logical_union->setOutputMetainfo(
4138  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4139  } else if (dynamic_cast<const RelSort*>(input0)) {
4140  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4141  } else {
4142  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4143  }
4144  VLOG(3) << "logical_union->getOutputMetainfo()="
4145  << shared::printContainer(logical_union->getOutputMetainfo())
4146  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4147  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4148 
4149  return {rewritten_exe_unit,
4150  logical_union,
4152  std::move(query_rewriter)};
4153 }
4154 
4156  const RelTableFunction* rel_table_func,
4157  const bool just_explain,
4158  const bool is_gpu) {
4159  std::vector<InputDescriptor> input_descs;
4160  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4161  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4162  std::tie(input_descs, input_col_descs, std::ignore) =
4163  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4164  const auto query_infos = get_table_infos(input_descs, executor_);
4165  RelAlgTranslator translator(
4166  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4167  const auto input_exprs_owned =
4168  translate_scalar_sources(rel_table_func, translator, ::ExecutorType::Native);
4169  target_exprs_owned_.insert(
4170  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4171  auto input_exprs = get_exprs_not_owned(input_exprs_owned);
4172 
4173  const auto table_function_impl_and_type_infos = [=]() {
4174  if (is_gpu) {
4175  try {
4176  return bind_table_function(
4177  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4178  } catch (ExtensionFunctionBindingError& e) {
4179  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4180  << " Redirecting " << rel_table_func->getFunctionName()
4181  << " to run on CPU.";
4182  throw QueryMustRunOnCpu();
4183  }
4184  } else {
4185  try {
4186  return bind_table_function(
4187  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4188  } catch (ExtensionFunctionBindingError& e) {
4189  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4190  throw;
4191  }
4192  }
4193  }();
4194  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4195  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4196 
4197  size_t output_row_sizing_param = 0;
4198  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
4199  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
4200  const auto parameter_index =
4201  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4202  CHECK_GT(parameter_index, size_t(0));
4203  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4204  const auto parameter_expr =
4205  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4206  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4207  if (!parameter_expr_literal) {
4208  throw std::runtime_error(
4209  "Provided output buffer sizing parameter is not a literal. Only literal "
4210  "values are supported with output buffer sizing configured table "
4211  "functions.");
4212  }
4213  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4214  if (literal_val < 0) {
4215  throw std::runtime_error("Provided output sizing parameter " +
4216  std::to_string(literal_val) +
4217  " must be positive integer.");
4218  }
4219  output_row_sizing_param = static_cast<size_t>(literal_val);
4220  } else {
4221  // RowMultiplier not specified in the SQL query. Set it to 1
4222  output_row_sizing_param = 1; // default value for RowMultiplier
4224  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4225  makeExpr<Analyzer::Constant>(kINT, false, d);
4226  // Push the constant 1 to input_exprs
4227  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4228  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4229  }
4230  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
4231  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4232  } else {
4233  UNREACHABLE();
4234  }
4235 
4236  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4237  std::optional<int32_t> dict_id;
4238  size_t input_index = 0;
4239  for (const auto& ti : table_function_type_infos) {
4240  if (ti.is_column_list()) {
4241  for (int i = 0; i < ti.get_dimension(); i++) {
4242  auto& input_expr = input_exprs[input_index];
4243  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4244  CHECK(col_var);
4245 
4246  // avoid setting type info to ti here since ti doesn't have all the
4247  // properties correctly set
4248  auto type_info = input_expr->get_type_info();
4249  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4250  type_info.set_type(ti.get_type()); // set type to column list
4251  type_info.set_dimension(ti.get_dimension());
4252  input_expr->set_type_info(type_info);
4253 
4254  input_col_exprs.push_back(col_var);
4255  input_index++;
4256  }
4257  } else if (ti.is_column()) {
4258  auto& input_expr = input_exprs[input_index];
4259  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4260  CHECK(col_var);
4261 
4262  // same here! avoid setting type info to ti since it doesn't have all the
4263  // properties correctly set
4264  auto type_info = input_expr->get_type_info();
4265  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4266  type_info.set_type(ti.get_type()); // set type to column list
4267  input_expr->set_type_info(type_info);
4268 
4269  input_col_exprs.push_back(col_var);
4270  input_index++;
4271  } else {
4272  input_index++;
4273  }
4274  }
4275  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4276  std::vector<Analyzer::Expr*> table_func_outputs;
4277  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4278  auto ti = table_function_impl.getOutputSQLType(i);
4279  if (ti.is_dict_encoded_string()) {
4280  auto p = table_function_impl.getInputID(i);
4281 
4282  int32_t input_pos = p.first;
4283  // Iterate over the list of arguments to compute the offset. Use this offset to
4284  // get the corresponding input
4285  int32_t offset = 0;
4286  for (int j = 0; j < input_pos; j++) {
4287  const auto ti = table_function_type_infos[j];
4288  offset += ti.is_column_list() ? ti.get_dimension() : 1;
4289  }
4290  input_pos = offset + p.second;
4291 
4292  CHECK_LT(input_pos, input_exprs.size());
4293  int32_t comp_param = input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4294  ti.set_comp_param(comp_param);
4295  }
4296  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4297  table_func_outputs.push_back(target_exprs_owned_.back().get());
4298  }
4299  const TableFunctionExecutionUnit exe_unit = {
4300  input_descs,
4301  input_col_descs,
4302  input_exprs, // table function inputs
4303  input_col_exprs, // table function column inputs (duplicates w/ above)
4304  table_func_outputs, // table function projected exprs
4305  output_row_sizing_param, // output buffer sizing param
4306  table_function_impl};
4307  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4308  rel_table_func->setOutputMetainfo(targets_meta);
4309  return {exe_unit, rel_table_func};
4310 }
4311 
4312 namespace {
4313 
4314 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4316  const RelAlgTranslator& translator,
4317  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4318  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4319  std::vector<TargetMetaInfo> in_metainfo;
4320  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4321  const auto data_sink_node = get_data_sink(filter);
4322  auto input_it = inputs_owned.begin();
4323  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4324  const auto source = data_sink_node->getInput(nest_level);
4325  const auto scan_source = dynamic_cast<const RelScan*>(source);
4326  if (scan_source) {
4327  CHECK(source->getOutputMetainfo().empty());
4328  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4329  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4330  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4331  }
4332  const auto source_metadata =
4333  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
4334  in_metainfo.insert(
4335  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4336  exprs_owned.insert(
4337  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4338  } else {
4339  const auto& source_metadata = source->getOutputMetainfo();
4340  input_it += source_metadata.size();
4341  in_metainfo.insert(
4342  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4343  const auto scalar_sources_owned = synthesize_inputs(
4344  data_sink_node, nest_level, source_metadata, input_to_nest_level);
4345  exprs_owned.insert(
4346  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4347  }
4348  }
4349  return std::make_pair(in_metainfo, exprs_owned);
4350 }
4351 
4352 } // namespace
4353 
4355  const SortInfo& sort_info,
4356  const bool just_explain) {
4357  CHECK_EQ(size_t(1), filter->inputCount());
4358  std::vector<InputDescriptor> input_descs;
4359  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4360  std::vector<TargetMetaInfo> in_metainfo;
4361  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4362  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4363 
4364  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4365  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4366  get_input_desc(filter, input_to_nest_level, {}, cat_);
4367  const auto join_type = get_join_type(filter);
4368  RelAlgTranslator translator(cat_,
4369  query_state_,
4370  executor_,
4371  input_to_nest_level,
4372  {join_type},
4373  now_,
4374  just_explain);
4375  std::tie(in_metainfo, target_exprs_owned) =
4376  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4377  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4378  const auto qual = fold_expr(filter_expr.get());
4379  target_exprs_owned_.insert(
4380  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4381  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4382  filter->setOutputMetainfo(in_metainfo);
4383  const auto rewritten_qual = rewrite_expr(qual.get());
4384  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(filter,
4385  cat_,
4386  std::nullopt,
4389  executor_,
4390  translator);
4391  return {{input_descs,
4392  input_col_descs,
4393  {},
4394  {rewritten_qual ? rewritten_qual : qual},
4395  {},
4396  {nullptr},
4397  target_exprs,
4398  nullptr,
4399  sort_info,
4400  0,
4401  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
4402  dag_info.extracted_dag,
4403  dag_info.hash_table_plan_dag},
4404  filter,
4406  nullptr};
4407 }
4408 
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:1476
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:214
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
std::string cat(Ts &&...args)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1223
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
Definition: sqltypes.h:48
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3174
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
tuple d
Definition: test_fsi.py:9
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ExecutorDeviceType
ErrorInfo getErrorDescription(const int32_t error_code)
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:76
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:946
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:200
std::pair< int, int > ParallelismHint
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
size_t size() const override
std::vector< size_t > outer_fragment_indices
static SpeculativeTopNBlacklist speculative_topn_blacklist_
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< unsigned > visitLeftDeepInnerJoin(const RelLeftDeepInnerJoin *left_deep_join_tree) const override
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
const std::list< Analyzer::OrderEntry > order_entries
PersistentStorageMgr * getPersistentStorageMgr() const
Definition: DataMgr.cpp:571
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const RexScalar * getCondition() const
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5240
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:334
Definition: sqldefs.h:38
const QueryPlan extracted_dag
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1845
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
#define UNREACHABLE()
Definition: Logger.h:250
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:219
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:925
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
int64_t int_value_from_numbers_ptr(const SQLTypeInfo &type_info, const int8_t *data)
void prepare_string_dictionaries(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1148
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1144
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:76
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:251
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:4101
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1461
#define CHECK_GT(x, y)
Definition: Logger.h:218
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
double running_query_interrupt_freq
foreign_storage::ForeignStorageMgr * getForeignStorageMgr() const
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:296
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:461
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1145
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1146
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1143
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1134
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_from_table_reordering
Definition: Execute.cpp:83
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Classes representing a parse tree.
int get_logical_size() const
Definition: sqltypes.h:334
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1138
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
int count
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
const TableDescriptor * get_shard_for_key(const TableDescriptor *td, const Catalog_Namespace::Catalog &cat, const Fragmenter_Namespace::InsertData &data)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
SortAlgorithm
void set_parallelism_hints(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:102
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1140
bool is_extracted_dag_valid(ExtractedPlanDag &dag)
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1141
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
void executePostExecutionCallback()
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int getDatabaseId() const
Definition: Catalog.h:277
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
bool is_metadata_placeholder(const ChunkMetadata &metadata)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1137
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1522
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1098
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1147
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1135
size_t getTableFuncInputsSize() const
unsigned pending_query_interrupt_freq
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4119
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1626
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:214
int get_result_table_id() const
Definition: Analyzer.h:1638
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:284
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:103
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
const RexScalar * getProjectAt(const size_t idx) const
#define CHECK_LT(x, y)
Definition: Logger.h:216
Definition: sqltypes.h:51
bool hasInput(const RelAlgNode *needle) const
Definition: sqltypes.h:52
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
int32_t countRexLiteralArgs() const
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:253
string description
Definition: setup.py:45
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)