OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
19 #include "Parser/ParserNode.h"
34 #include "QueryEngine/RexVisitor.h"
37 #include "Shared/measure.h"
38 #include "Shared/misc.h"
39 #include "Shared/shard_key.h"
40 
41 #include <boost/algorithm/cxx11/any_of.hpp>
42 #include <boost/range/adaptor/reversed.hpp>
43 
44 #include <algorithm>
45 #include <functional>
46 #include <numeric>
47 
49 extern bool g_enable_bump_allocator;
50 bool g_enable_interop{false};
51 bool g_enable_union{false};
52 
53 namespace {
54 
55 bool node_is_aggregate(const RelAlgNode* ra) {
56  const auto compound = dynamic_cast<const RelCompound*>(ra);
57  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
58  return ((compound && compound->isAggregate()) || aggregate);
59 }
60 
61 std::unordered_set<PhysicalInput> get_physical_inputs(
63  const RelAlgNode* ra) {
64  auto phys_inputs = get_physical_inputs(ra);
65  std::unordered_set<PhysicalInput> phys_inputs2;
66  for (auto& phi : phys_inputs) {
67  phys_inputs2.insert(
68  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
69  }
70  return phys_inputs2;
71 }
72 
73 bool is_metadata_placeholder(const ChunkMetadata& metadata) {
74  // Only supported type for now
76  return metadata.chunkStats.min.intval > metadata.chunkStats.max.intval;
77 }
78 
79 void prepare_foreign_table_for_execution(const RelAlgNode& ra_node, int database_id) {
80  // Iterate through ra_node inputs for types that need to be loaded pre-execution
81  // If they do not have valid metadata, load them into CPU memory to generate
82  // the metadata and leave them ready to be used by the query
83  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(database_id);
84  CHECK(catalog);
85  // provide ForeignStorageMgr with all columns needed for this node
86  std::map<ChunkKey, std::vector<int>> columns_per_table;
87  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
88  int table_id = physical_input.table_id;
89  auto table = catalog->getMetadataForTable(table_id, false);
90  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
91  int col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
92  columns_per_table[{database_id, table_id}].push_back(col_id);
93  }
94  }
95  if (columns_per_table.size() > 0) {
96  CHECK(catalog->getDataMgr().getPersistentStorageMgr()->getForeignStorageMgr() !=
97  nullptr);
98  catalog->getDataMgr()
99  .getPersistentStorageMgr()
100  ->getForeignStorageMgr()
101  ->setColumnHints(columns_per_table);
102  }
103  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
104  int table_id = physical_input.table_id;
105  auto table = catalog->getMetadataForTable(table_id, false);
106  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
107  int col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
108  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
109  auto foreign_table = catalog->getForeignTable(table_id);
110  if (col_desc->columnType.is_dict_encoded_type()) {
111  CHECK(foreign_table->fragmenter != nullptr);
112  for (const auto& fragment :
113  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
114  ChunkKey chunk_key = {database_id, table_id, col_id, fragment.fragmentId};
115  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
116  CHECK(metadata_map.find(col_id) != metadata_map.end());
117  if (is_metadata_placeholder(*(metadata_map.at(col_id)))) {
118  // When this goes out of scope it will stay in CPU cache but become
119  // evictable
120  std::shared_ptr<Chunk_NS::Chunk> chunk =
121  Chunk_NS::Chunk::getChunk(col_desc,
122  &(catalog->getDataMgr()),
123  chunk_key,
125  0,
126  0,
127  0);
128  }
129  }
130  }
131  }
132  }
133 }
134 
135 } // namespace
136 
138  const ExecutionOptions& eo) {
139  if (eo.find_push_down_candidates) {
140  return 0;
141  }
142 
143  if (eo.just_explain) {
144  return 0;
145  }
146 
147  CHECK(query_dag_);
148 
149  query_dag_->resetQueryExecutionState();
150  const auto& ra = query_dag_->getRootNode();
151 
152  mapd_shared_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
153  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
154  const auto phys_inputs = get_physical_inputs(cat_, &ra);
155  const auto phys_table_ids = get_physical_table_inputs(&ra);
156  executor_->setCatalog(&cat_);
157  executor_->setupCaching(phys_inputs, phys_table_ids);
158 
159  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
160  auto ed_seq = RaExecutionSequence(&ra);
161 
162  if (!getSubqueries().empty()) {
163  return 0;
164  }
165 
166  CHECK(!ed_seq.empty());
167  if (ed_seq.size() > 1) {
168  return 0;
169  }
170 
173  executor_->catalog_ = &cat_;
174  executor_->temporary_tables_ = &temporary_tables_;
175 
177  auto exec_desc_ptr = ed_seq.getDescriptor(0);
178  CHECK(exec_desc_ptr);
179  auto& exec_desc = *exec_desc_ptr;
180  const auto body = exec_desc.getBody();
181  if (body->isNop()) {
182  return 0;
183  }
184 
185  const auto project = dynamic_cast<const RelProject*>(body);
186  if (project) {
187  auto work_unit =
188  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
189 
190  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
191  executor_);
192  }
193 
194  const auto compound = dynamic_cast<const RelCompound*>(body);
195  if (compound) {
196  if (compound->isDeleteViaSelect()) {
197  return 0;
198  } else if (compound->isUpdateViaSelect()) {
199  return 0;
200  } else {
201  if (compound->isAggregate()) {
202  return 0;
203  }
204 
205  const auto work_unit =
206  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
207 
208  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
209  executor_);
210  }
211  }
212 
213  return 0;
214 }
215 
217  const ExecutionOptions& eo,
218  const bool just_explain_plan,
219  RenderInfo* render_info) {
220  CHECK(query_dag_);
221  auto timer = DEBUG_TIMER(__func__);
223 
224  auto run_query = [&](const CompilationOptions& co_in) {
225  auto execution_result =
226  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
228  VLOG(1) << "Running post execution callback.";
229  (*post_execution_callback_)();
230  }
231  return execution_result;
232  };
233 
234  try {
235  return run_query(co);
236  } catch (const QueryMustRunOnCpu&) {
237  if (!g_allow_cpu_retry) {
238  throw;
239  }
240  }
241  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
242  auto co_cpu = CompilationOptions::makeCpuOnly(co);
243 
244  if (render_info) {
245  render_info->setForceNonInSituData();
246  }
247  return run_query(co_cpu);
248 }
249 
250 namespace {
251 
253  mapd_shared_lock<mapd_shared_mutex> shared_lock;
254  mapd_unique_lock<mapd_shared_mutex> unique_lock;
255 };
256 
257 } // namespace
258 
260  const ExecutionOptions& eo,
261  const bool just_explain_plan,
262  RenderInfo* render_info) {
264  auto timer = DEBUG_TIMER(__func__);
265  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
266 
267  query_dag_->resetQueryExecutionState();
268  const auto& ra = query_dag_->getRootNode();
269 
270  // capture the lock acquistion time
271  auto clock_begin = timer_start();
273  executor_->resetInterrupt();
274  }
275  std::string query_session = "";
276  std::string query_str = "N/A";
277  bool acquire_spin_lock = false;
278  auto validate_or_explain_query =
279  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
280  ScopeGuard clearRuntimeInterruptStatus = [this,
281  &query_session,
282  &render_info,
283  &eo,
284  &acquire_spin_lock,
285  &validate_or_explain_query] {
286  // reset the runtime query interrupt status after the end of query execution
287  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
288  !validate_or_explain_query) {
289  executor_->clearQuerySessionStatus(
290  query_session, query_state_->getQuerySubmittedTime(), acquire_spin_lock);
291  }
292  };
293 
294  if (!render_info && eo.allow_runtime_query_interrupt && !validate_or_explain_query) {
295  // if we reach here, the current query which was waiting an idle executor
296  // within the dispatch queue is now scheduled to the specific executor
297  // (not UNITARY_EXECUTOR)
298  // so we update the query session's status with the executor that takes this query
299  std::tie(query_session, query_str) =
300  executor_->attachExecutorToQuerySession(query_state_);
301 
302  // For now we can run a single query at a time, so if other query is already
303  // executed by different executor (i.e., when we turn on parallel executor),
304  // we can check the possibility of running this query by using the spinlock.
305  // If it fails to acquire a lock, it sleeps {g_pending_query_interrupt_freq} ms.
306  // And then try to get the lock (and again and again...)
307  if (!query_session.empty()) {
308  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
309  try {
310  executor_->checkPendingQueryStatus(query_session);
311  } catch (QueryExecutionError& e) {
313  throw std::runtime_error(
314  "Query execution has been interrupted (pending query)");
315  }
316  throw e;
317  } catch (...) {
318  throw std::runtime_error("Checking pending query status failed: unknown error");
319  }
320  // here it fails to acquire the lock, so sleep...
321  std::this_thread::sleep_for(
322  std::chrono::milliseconds(g_pending_query_interrupt_freq));
323  }
324  }
325  }
326  auto acquire_execute_mutex =
327  [&acquire_spin_lock](Executor* executor) -> ExecutorMutexHolder {
328  ExecutorMutexHolder ret;
329  if (executor->executor_id_ == Executor::UNITARY_EXECUTOR_ID) {
330  // Only one unitary executor can run at a time
331  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(executor->execute_mutex_);
332  } else {
333  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(executor->execute_mutex_);
334  }
335  acquire_spin_lock = true;
336  return ret;
337  };
338  // now we get the spinlock that means this query is ready to run by the executor
339  // so we acquire executor lock in here to make sure that this executor holds
340  // all necessary resources and at the same time protect them against other executor
341  auto lock = acquire_execute_mutex(executor_);
342 
343  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
344  !validate_or_explain_query) {
345  // check whether this query session is "already" interrupted
346  // this case occurs when there is very short gap between being interrupted and
347  // taking the execute lock
348  // for instance, the session can fire multiple queries that other queries are waiting
349  // in the spinlock but the user can request the query interruption
350  // if so we have to remove "all" queries initiated by this session and we do in here
351  // without running the query
352  try {
353  executor_->checkPendingQueryStatus(query_session);
354  } catch (QueryExecutionError& e) {
356  throw std::runtime_error("Query execution has been interrupted (pending query)");
357  }
358  throw e;
359  } catch (...) {
360  throw std::runtime_error("Checking pending query status failed: unknown error");
361  }
362  // now the query is going to be executed, so update the status as "RUNNING"
363  executor_->updateQuerySessionStatus(query_state_,
364  QuerySessionStatus::QueryStatus::RUNNING);
365  }
366 
367  // Notify foreign tables to load prior to caching
369 
370  int64_t queue_time_ms = timer_stop(clock_begin);
371  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
372  const auto phys_inputs = get_physical_inputs(cat_, &ra);
373  const auto phys_table_ids = get_physical_table_inputs(&ra);
374  executor_->setCatalog(&cat_);
375  executor_->setupCaching(phys_inputs, phys_table_ids);
376 
377  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
378  auto ed_seq = RaExecutionSequence(&ra);
379 
380  if (just_explain_plan) {
381  std::stringstream ss;
382  std::vector<const RelAlgNode*> nodes;
383  for (size_t i = 0; i < ed_seq.size(); i++) {
384  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
385  }
386  size_t ctr = nodes.size();
387  size_t tab_ctr = 0;
388  for (auto& body : boost::adaptors::reverse(nodes)) {
389  const auto index = ctr--;
390  const auto tabs = std::string(tab_ctr++, '\t');
391  CHECK(body);
392  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
393  if (auto sort = dynamic_cast<const RelSort*>(body)) {
394  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
395  }
396  if (dynamic_cast<const RelProject*>(body) ||
397  dynamic_cast<const RelCompound*>(body)) {
398  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
399  ss << tabs << " : " << join->toString() << "\n";
400  }
401  }
402  }
403  const auto& subqueries = getSubqueries();
404  if (!subqueries.empty()) {
405  ss << "Subqueries: "
406  << "\n";
407  for (const auto& subquery : subqueries) {
408  const auto ra = subquery->getRelAlg();
409  ss << "\t" << ra->toString() << "\n";
410  }
411  }
412  auto rs = std::make_shared<ResultSet>(ss.str());
413  return {rs, {}};
414  }
415 
416  if (render_info) {
417  // set render to be non-insitu in certain situations.
418  if (!render_info->disallow_in_situ_only_if_final_ED_is_aggregate &&
419  ed_seq.size() > 1) {
420  // old logic
421  // disallow if more than one ED
422  render_info->setInSituDataIfUnset(false);
423  }
424  }
425 
426  if (eo.find_push_down_candidates) {
427  // this extra logic is mainly due to current limitations on multi-step queries
428  // and/or subqueries.
430  ed_seq, co, eo, render_info, queue_time_ms);
431  }
432  timer_setup.stop();
433 
434  // Dispatch the subqueries first
435  for (auto subquery : getSubqueries()) {
436  const auto subquery_ra = subquery->getRelAlg();
437  CHECK(subquery_ra);
438  if (subquery_ra->hasContextData()) {
439  continue;
440  }
441  // Execute the subquery and cache the result.
442  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
443  RaExecutionSequence subquery_seq(subquery_ra);
444  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
445  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
446  }
447  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
448 }
449 
451  AggregatedColRange agg_col_range_cache;
452  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
453  return executor_->computeColRangesCache(phys_inputs);
454 }
455 
457  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
458  return executor_->computeStringDictionaryGenerations(phys_inputs);
459 }
460 
462  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
463  return executor_->computeTableGenerations(phys_table_ids);
464 }
465 
466 Executor* RelAlgExecutor::getExecutor() const {
467  return executor_;
468 }
469 
471  CHECK(executor_);
472  executor_->row_set_mem_owner_ = nullptr;
473 }
474 
475 namespace {
476 
478  CHECK_EQ(size_t(1), sort->inputCount());
479  const auto source = sort->getInput(0);
480  if (dynamic_cast<const RelSort*>(source)) {
481  throw std::runtime_error("Sort node not supported as input to another sort");
482  }
483 }
484 
485 } // namespace
486 
488  const RaExecutionSequence& seq,
489  const size_t step_idx,
490  const CompilationOptions& co,
491  const ExecutionOptions& eo,
492  RenderInfo* render_info) {
493  INJECT_TIMER(executeRelAlgQueryStep);
494 
495  auto exe_desc_ptr = seq.getDescriptor(step_idx);
496  CHECK(exe_desc_ptr);
497  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
498 
499  size_t shard_count{0};
500  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
501  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
502  };
503 
504  if (sort) {
506  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
508  source_work_unit.exe_unit, *executor_->getCatalog());
509  if (!shard_count) {
510  // No point in sorting on the leaf, only execute the input to the sort node.
511  CHECK_EQ(size_t(1), sort->inputCount());
512  const auto source = sort->getInput(0);
513  if (sort->collationCount() || node_is_aggregate(source)) {
514  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
515  CHECK_EQ(temp_seq.size(), size_t(1));
516  ExecutionOptions eo_copy = {
518  eo.allow_multifrag,
519  eo.just_explain,
520  eo.allow_loop_joins,
521  eo.with_watchdog,
522  eo.jit_debug,
523  eo.just_validate || sort->isEmptyResult(),
531  eo.executor_type,
532  };
533  // Use subseq to avoid clearing existing temporary tables
534  return {
535  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
536  merge_type(source),
537  source->getId(),
538  false};
539  }
540  }
541  }
544  std::make_pair(step_idx, step_idx + 1),
545  co,
546  eo,
547  render_info,
549  merge_type(exe_desc_ptr->getBody()),
550  exe_desc_ptr->getBody()->getId(),
551  false};
553  VLOG(1) << "Running post execution callback.";
554  (*post_execution_callback_)();
555  }
556  return result;
557 }
558 
560  const AggregatedColRange& agg_col_range,
561  const StringDictionaryGenerations& string_dictionary_generations,
562  const TableGenerations& table_generations) {
563  // capture the lock acquistion time
564  auto clock_begin = timer_start();
566  executor_->resetInterrupt();
567  }
568  queue_time_ms_ = timer_stop(clock_begin);
569  executor_->row_set_mem_owner_ =
570  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
571  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
572  executor_->table_generations_ = table_generations;
573  executor_->agg_col_range_cache_ = agg_col_range;
574 }
575 
577  const CompilationOptions& co,
578  const ExecutionOptions& eo,
579  RenderInfo* render_info,
580  const int64_t queue_time_ms,
581  const bool with_existing_temp_tables) {
583  auto timer = DEBUG_TIMER(__func__);
584  if (!with_existing_temp_tables) {
586  }
588  executor_->catalog_ = &cat_;
589  executor_->temporary_tables_ = &temporary_tables_;
590 
591  time(&now_);
592  CHECK(!seq.empty());
593  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
594 
595  for (size_t i = 0; i < exec_desc_count; i++) {
596  VLOG(1) << "Executing query step " << i;
597  // only render on the last step
598  try {
599  executeRelAlgStep(seq,
600  i,
601  co,
602  eo,
603  (i == exec_desc_count - 1) ? render_info : nullptr,
604  queue_time_ms);
605  } catch (const NativeExecutionError&) {
606  if (!g_enable_interop) {
607  throw;
608  }
609  auto eo_extern = eo;
610  eo_extern.executor_type = ::ExecutorType::Extern;
611  auto exec_desc_ptr = seq.getDescriptor(i);
612  const auto body = exec_desc_ptr->getBody();
613  const auto compound = dynamic_cast<const RelCompound*>(body);
614  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
615  LOG(INFO) << "Also failed to run the query using interoperability";
616  throw;
617  }
618  executeRelAlgStep(seq,
619  i,
620  co,
621  eo_extern,
622  (i == exec_desc_count - 1) ? render_info : nullptr,
623  queue_time_ms);
624  }
625  }
626 
627  return seq.getDescriptor(exec_desc_count - 1)->getResult();
628 }
629 
631  const RaExecutionSequence& seq,
632  const std::pair<size_t, size_t> interval,
633  const CompilationOptions& co,
634  const ExecutionOptions& eo,
635  RenderInfo* render_info,
636  const int64_t queue_time_ms) {
638  executor_->catalog_ = &cat_;
639  executor_->temporary_tables_ = &temporary_tables_;
640 
641  time(&now_);
642  for (size_t i = interval.first; i < interval.second; i++) {
643  // only render on the last step
644  executeRelAlgStep(seq,
645  i,
646  co,
647  eo,
648  (i == interval.second - 1) ? render_info : nullptr,
649  queue_time_ms);
650  }
651 
652  return seq.getDescriptor(interval.second - 1)->getResult();
653 }
654 
656  const size_t step_idx,
657  const CompilationOptions& co,
658  const ExecutionOptions& eo,
659  RenderInfo* render_info,
660  const int64_t queue_time_ms) {
662  auto timer = DEBUG_TIMER(__func__);
664  auto exec_desc_ptr = seq.getDescriptor(step_idx);
665  CHECK(exec_desc_ptr);
666  auto& exec_desc = *exec_desc_ptr;
667  const auto body = exec_desc.getBody();
668  if (body->isNop()) {
669  handleNop(exec_desc);
670  return;
671  }
672  const ExecutionOptions eo_work_unit{
674  eo.allow_multifrag,
675  eo.just_explain,
676  eo.allow_loop_joins,
677  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
678  eo.jit_debug,
679  eo.just_validate,
687  eo.executor_type,
688  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
689 
690  // Notify foreign tables to load prior to execution
692 
693  const auto compound = dynamic_cast<const RelCompound*>(body);
694  if (compound) {
695  if (compound->isDeleteViaSelect()) {
696  executeDelete(compound, co, eo_work_unit, queue_time_ms);
697  } else if (compound->isUpdateViaSelect()) {
698  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
699  } else {
700  exec_desc.setResult(
701  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
702  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
703  << static_cast<int>(-compound->getId()) << ", ...)"
704  << " exec_desc.getResult().getDataPtr()->rowCount()="
705  << exec_desc.getResult().getDataPtr()->rowCount();
706  if (exec_desc.getResult().isFilterPushDownEnabled()) {
707  return;
708  }
709  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
710  }
711  return;
712  }
713  const auto project = dynamic_cast<const RelProject*>(body);
714  if (project) {
715  if (project->isDeleteViaSelect()) {
716  executeDelete(project, co, eo_work_unit, queue_time_ms);
717  } else if (project->isUpdateViaSelect()) {
718  executeUpdate(project, co, eo_work_unit, queue_time_ms);
719  } else {
720  std::optional<size_t> prev_count;
721  // Disabling the intermediate count optimization in distributed, as the previous
722  // execution descriptor will likely not hold the aggregated result.
723  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
724  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
725  CHECK(prev_exec_desc);
726  RelAlgNode const* prev_body = prev_exec_desc->getBody();
727  // This optimization needs to be restricted in its application for UNION, which
728  // can have 2 input nodes in which neither should restrict the count of the other.
729  // However some non-UNION queries are measurably slower with this restriction, so
730  // it is only applied when g_enable_union is true.
731  bool const parent_check =
732  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
733  // If the previous node produced a reliable count, skip the pre-flight count
734  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
735  dynamic_cast<const RelLogicalValues*>(prev_body))) {
736  const auto& prev_exe_result = prev_exec_desc->getResult();
737  const auto prev_result = prev_exe_result.getRows();
738  if (prev_result) {
739  prev_count = prev_result->rowCount();
740  VLOG(3) << "Setting output row count for projection node to previous node ("
741  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
742  }
743  }
744  }
745  exec_desc.setResult(executeProject(
746  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
747  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
748  << static_cast<int>(-project->getId()) << ", ...)"
749  << " exec_desc.getResult().getDataPtr()->rowCount()="
750  << exec_desc.getResult().getDataPtr()->rowCount();
751  if (exec_desc.getResult().isFilterPushDownEnabled()) {
752  return;
753  }
754  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
755  }
756  return;
757  }
758  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
759  if (aggregate) {
760  exec_desc.setResult(
761  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
762  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
763  return;
764  }
765  const auto filter = dynamic_cast<const RelFilter*>(body);
766  if (filter) {
767  exec_desc.setResult(
768  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
769  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
770  return;
771  }
772  const auto sort = dynamic_cast<const RelSort*>(body);
773  if (sort) {
774  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
775  if (exec_desc.getResult().isFilterPushDownEnabled()) {
776  return;
777  }
778  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
779  return;
780  }
781  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
782  if (logical_values) {
783  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
784  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
785  return;
786  }
787  const auto modify = dynamic_cast<const RelModify*>(body);
788  if (modify) {
789  exec_desc.setResult(executeModify(modify, eo_work_unit));
790  return;
791  }
792  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
793  if (logical_union) {
794  exec_desc.setResult(
795  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
796  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
797  return;
798  }
799  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
800  if (table_func) {
801  exec_desc.setResult(
802  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
803  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
804  return;
805  }
806  LOG(FATAL) << "Unhandled body type: " << body->toString();
807 }
808 
810  // just set the result of the previous node as the result of no op
811  auto body = ed.getBody();
812  CHECK(dynamic_cast<const RelAggregate*>(body));
813  CHECK_EQ(size_t(1), body->inputCount());
814  const auto input = body->getInput(0);
815  body->setOutputMetainfo(input->getOutputMetainfo());
816  const auto it = temporary_tables_.find(-input->getId());
817  CHECK(it != temporary_tables_.end());
818  // set up temp table as it could be used by the outer query or next step
819  addTemporaryTable(-body->getId(), it->second);
820 
821  ed.setResult({it->second, input->getOutputMetainfo()});
822 }
823 
824 namespace {
825 
826 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
827  public:
829 
830  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
831  return synthesized_physical_inputs_owned;
832  }
833 
834  std::unordered_set<const RexInput*> visitInput(
835  const RexInput* rex_input) const override {
836  const auto input_ra = rex_input->getSourceNode();
837  CHECK(input_ra);
838  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
839  if (scan_ra) {
840  const auto td = scan_ra->getTableDescriptor();
841  if (td) {
842  const auto col_id = rex_input->getIndex();
843  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
844  if (cd && cd->columnType.get_physical_cols() > 0) {
845  CHECK(IS_GEO(cd->columnType.get_type()));
846  std::unordered_set<const RexInput*> synthesized_physical_inputs;
847  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
848  auto physical_input =
849  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
850  synthesized_physical_inputs_owned.emplace_back(physical_input);
851  synthesized_physical_inputs.insert(physical_input);
852  }
853  return synthesized_physical_inputs;
854  }
855  }
856  }
857  return {rex_input};
858  }
859 
860  protected:
861  std::unordered_set<const RexInput*> aggregateResult(
862  const std::unordered_set<const RexInput*>& aggregate,
863  const std::unordered_set<const RexInput*>& next_result) const override {
864  auto result = aggregate;
865  result.insert(next_result.begin(), next_result.end());
866  return result;
867  }
868 
869  private:
870  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
872 };
873 
874 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
875  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
876  return table_func;
877  }
878  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
879  CHECK_EQ(size_t(2), join->inputCount());
880  return join;
881  }
882  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
883  CHECK_EQ(size_t(1), ra_node->inputCount());
884  }
885  auto only_src = ra_node->getInput(0);
886  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
887  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
888  return is_join ? only_src : ra_node;
889 }
890 
891 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
893  RexUsedInputsVisitor visitor(cat);
894  const auto filter_expr = compound->getFilterExpr();
895  std::unordered_set<const RexInput*> used_inputs =
896  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
897  const auto sources_size = compound->getScalarSourcesSize();
898  for (size_t i = 0; i < sources_size; ++i) {
899  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
900  used_inputs.insert(source_inputs.begin(), source_inputs.end());
901  }
902  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
903  return std::make_pair(used_inputs, used_inputs_owned);
904 }
905 
906 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
908  CHECK_EQ(size_t(1), aggregate->inputCount());
909  std::unordered_set<const RexInput*> used_inputs;
910  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
911  const auto source = aggregate->getInput(0);
912  const auto& in_metainfo = source->getOutputMetainfo();
913  const auto group_count = aggregate->getGroupByCount();
914  CHECK_GE(in_metainfo.size(), group_count);
915  for (size_t i = 0; i < group_count; ++i) {
916  auto synthesized_used_input = new RexInput(source, i);
917  used_inputs_owned.emplace_back(synthesized_used_input);
918  used_inputs.insert(synthesized_used_input);
919  }
920  for (const auto& agg_expr : aggregate->getAggExprs()) {
921  for (size_t i = 0; i < agg_expr->size(); ++i) {
922  const auto operand_idx = agg_expr->getOperand(i);
923  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
924  auto synthesized_used_input = new RexInput(source, operand_idx);
925  used_inputs_owned.emplace_back(synthesized_used_input);
926  used_inputs.insert(synthesized_used_input);
927  }
928  }
929  return std::make_pair(used_inputs, used_inputs_owned);
930 }
931 
932 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
934  RexUsedInputsVisitor visitor(cat);
935  std::unordered_set<const RexInput*> used_inputs;
936  for (size_t i = 0; i < project->size(); ++i) {
937  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
938  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
939  }
940  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
941  return std::make_pair(used_inputs, used_inputs_owned);
942 }
943 
944 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
947  RexUsedInputsVisitor visitor(cat);
948  std::unordered_set<const RexInput*> used_inputs;
949  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
950  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
951  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
952  }
953  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
954  return std::make_pair(used_inputs, used_inputs_owned);
955 }
956 
957 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
959  std::unordered_set<const RexInput*> used_inputs;
960  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
961  const auto data_sink_node = get_data_sink(filter);
962  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
963  const auto source = data_sink_node->getInput(nest_level);
964  const auto scan_source = dynamic_cast<const RelScan*>(source);
965  if (scan_source) {
966  CHECK(source->getOutputMetainfo().empty());
967  for (size_t i = 0; i < scan_source->size(); ++i) {
968  auto synthesized_used_input = new RexInput(scan_source, i);
969  used_inputs_owned.emplace_back(synthesized_used_input);
970  used_inputs.insert(synthesized_used_input);
971  }
972  } else {
973  const auto& partial_in_metadata = source->getOutputMetainfo();
974  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
975  auto synthesized_used_input = new RexInput(source, i);
976  used_inputs_owned.emplace_back(synthesized_used_input);
977  used_inputs.insert(synthesized_used_input);
978  }
979  }
980  }
981  return std::make_pair(used_inputs, used_inputs_owned);
982 }
983 
984 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
986  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
987  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
988  used_inputs_owned.reserve(logical_union->inputCount());
989  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
990  auto const n_inputs = logical_union->inputCount();
991  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
992  auto input = logical_union->getInput(nest_level);
993  for (size_t i = 0; i < input->size(); ++i) {
994  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
995  used_inputs.insert(used_inputs_owned.back().get());
996  }
997  }
998  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
999 }
1000 
1001 int table_id_from_ra(const RelAlgNode* ra_node) {
1002  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1003  if (scan_ra) {
1004  const auto td = scan_ra->getTableDescriptor();
1005  CHECK(td);
1006  return td->tableId;
1007  }
1008  return -ra_node->getId();
1009 }
1010 
1011 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1012  const RelAlgNode* ra_node,
1013  const std::vector<size_t>& input_permutation) {
1014  const auto data_sink_node = get_data_sink(ra_node);
1015  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1016  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1017  const auto input_node_idx =
1018  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1019  const auto input_ra = data_sink_node->getInput(input_node_idx);
1020  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1021  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1022  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1023  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1024  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1025  CHECK(it_ok.second);
1026  LOG_IF(INFO, !input_permutation.empty())
1027  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1028  }
1029  return input_to_nest_level;
1030 }
1031 
1032 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1035  const auto data_sink_node = get_data_sink(ra_node);
1036  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1037  CHECK_EQ(join->inputCount(), 2u);
1038  const auto condition = join->getCondition();
1039  RexUsedInputsVisitor visitor(cat);
1040  auto condition_inputs = visitor.visit(condition);
1041  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1042  visitor.get_inputs_owned());
1043  return std::make_pair(condition_inputs, condition_inputs_owned);
1044  }
1045 
1046  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1047  CHECK_GE(left_deep_join->inputCount(), 2u);
1048  const auto condition = left_deep_join->getInnerCondition();
1049  RexUsedInputsVisitor visitor(cat);
1050  auto result = visitor.visit(condition);
1051  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1052  ++nesting_level) {
1053  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1054  if (outer_condition) {
1055  const auto outer_result = visitor.visit(outer_condition);
1056  result.insert(outer_result.begin(), outer_result.end());
1057  }
1058  }
1059  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1060  return std::make_pair(result, used_inputs_owned);
1061  }
1062 
1063  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1064  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1065  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1066  CHECK_GT(ra_node->inputCount(), 0u) << ra_node->toString();
1067  } else {
1068  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1069  }
1070  return std::make_pair(std::unordered_set<const RexInput*>{},
1071  std::vector<std::shared_ptr<RexInput>>{});
1072 }
1073 
1075  std::vector<InputDescriptor>& input_descs,
1077  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1078  const RelAlgNode* ra_node,
1079  const std::unordered_set<const RexInput*>& source_used_inputs,
1080  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1081  VLOG(3) << "ra_node=" << ra_node->toString()
1082  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1083  << " source_used_inputs.size()=" << source_used_inputs.size();
1084  for (const auto used_input : source_used_inputs) {
1085  const auto input_ra = used_input->getSourceNode();
1086  const int table_id = table_id_from_ra(input_ra);
1087  const auto col_id = used_input->getIndex();
1088  auto it = input_to_nest_level.find(input_ra);
1089  if (it != input_to_nest_level.end()) {
1090  const int input_desc = it->second;
1091  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1092  dynamic_cast<const RelScan*>(input_ra)
1093  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1094  : col_id,
1095  table_id,
1096  input_desc));
1097  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1098  throw std::runtime_error("Bushy joins not supported");
1099  }
1100  }
1101 }
1102 
1103 template <class RA>
1104 std::pair<std::vector<InputDescriptor>,
1105  std::list<std::shared_ptr<const InputColDescriptor>>>
1106 get_input_desc_impl(const RA* ra_node,
1107  const std::unordered_set<const RexInput*>& used_inputs,
1108  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1109  const std::vector<size_t>& input_permutation,
1111  std::vector<InputDescriptor> input_descs;
1112  const auto data_sink_node = get_data_sink(ra_node);
1113  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1114  const auto input_node_idx =
1115  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1116  auto input_ra = data_sink_node->getInput(input_node_idx);
1117  const int table_id = table_id_from_ra(input_ra);
1118  input_descs.emplace_back(table_id, input_idx);
1119  }
1120  std::sort(input_descs.begin(),
1121  input_descs.end(),
1122  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1123  return lhs.getNestLevel() < rhs.getNestLevel();
1124  });
1125  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1126  collect_used_input_desc(input_descs,
1127  cat,
1128  input_col_descs_unique, // modified
1129  ra_node,
1130  used_inputs,
1131  input_to_nest_level);
1132  std::unordered_set<const RexInput*> join_source_used_inputs;
1133  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1134  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1135  get_join_source_used_inputs(ra_node, cat);
1136  collect_used_input_desc(input_descs,
1137  cat,
1138  input_col_descs_unique, // modified
1139  ra_node,
1140  join_source_used_inputs,
1141  input_to_nest_level);
1142  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1143  input_col_descs_unique.begin(), input_col_descs_unique.end());
1144 
1145  std::sort(input_col_descs.begin(),
1146  input_col_descs.end(),
1147  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1148  std::shared_ptr<const InputColDescriptor> const& rhs) {
1149  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1150  lhs->getColId(),
1151  lhs->getScanDesc().getTableId()) <
1152  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1153  rhs->getColId(),
1154  rhs->getScanDesc().getTableId());
1155  });
1156  return {input_descs,
1157  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1158  input_col_descs.end())};
1159 }
1160 
1161 template <class RA>
1162 std::tuple<std::vector<InputDescriptor>,
1163  std::list<std::shared_ptr<const InputColDescriptor>>,
1164  std::vector<std::shared_ptr<RexInput>>>
1165 get_input_desc(const RA* ra_node,
1166  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1167  const std::vector<size_t>& input_permutation,
1168  const Catalog_Namespace::Catalog& cat) {
1169  std::unordered_set<const RexInput*> used_inputs;
1170  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1171  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1172  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1173  auto input_desc_pair = get_input_desc_impl(
1174  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1175  return std::make_tuple(
1176  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1177 }
1178 
1179 size_t get_scalar_sources_size(const RelCompound* compound) {
1180  return compound->getScalarSourcesSize();
1181 }
1182 
1183 size_t get_scalar_sources_size(const RelProject* project) {
1184  return project->size();
1185 }
1186 
1187 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1188  return table_func->getTableFuncInputsSize();
1189 }
1190 
1191 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1192  return compound->getScalarSource(i);
1193 }
1194 
1195 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1196  return project->getProjectAt(i);
1197 }
1198 
1199 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1200  return table_func->getTableFuncInputAt(i);
1201 }
1202 
1203 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1204  const std::shared_ptr<Analyzer::Expr> expr) {
1205  const auto& ti = expr->get_type_info();
1206  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1207  return expr;
1208  }
1209  auto transient_dict_ti = ti;
1210  transient_dict_ti.set_compression(kENCODING_DICT);
1211  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1212  transient_dict_ti.set_fixed_size();
1213  return expr->add_cast(transient_dict_ti);
1214 }
1215 
1217  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1218  const std::shared_ptr<Analyzer::Expr>& expr) {
1219  try {
1220  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1221  } catch (...) {
1222  scalar_sources.push_back(fold_expr(expr.get()));
1223  }
1224 }
1225 
1226 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1227  const std::shared_ptr<Analyzer::Expr>& input) {
1228  const auto& input_ti = input->get_type_info();
1229  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1230  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1231  }
1232  return input;
1233 }
1234 
1235 template <class RA>
1236 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1237  const RA* ra_node,
1238  const RelAlgTranslator& translator,
1239  const ::ExecutorType executor_type) {
1240  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1241  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1242  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1243  << ") = " << scalar_sources_size;
1244  for (size_t i = 0; i < scalar_sources_size; ++i) {
1245  const auto scalar_rex = scalar_at(i, ra_node);
1246  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1247  // RexRef are synthetic scalars we append at the end of the real ones
1248  // for the sake of taking memory ownership, no real work needed here.
1249  continue;
1250  }
1251 
1252  const auto scalar_expr =
1253  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1254  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1255  if (executor_type == ExecutorType::Native) {
1256  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1257  } else {
1258  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1259  }
1260  }
1261 
1262  return scalar_sources;
1263 }
1264 
1265 template <class RA>
1266 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1267  const RA* ra_node,
1268  const RelAlgTranslator& translator,
1269  int32_t tableId,
1270  const Catalog_Namespace::Catalog& cat,
1271  const ColumnNameList& colNames,
1272  size_t starting_projection_column_idx) {
1273  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1274  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1275  const auto scalar_rex = scalar_at(i, ra_node);
1276  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1277  // RexRef are synthetic scalars we append at the end of the real ones
1278  // for the sake of taking memory ownership, no real work needed here.
1279  continue;
1280  }
1281 
1282  std::shared_ptr<Analyzer::Expr> translated_expr;
1283  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1284  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1285  tableId,
1286  cat,
1287  colNames[i - starting_projection_column_idx]);
1288  } else {
1289  translated_expr = translator.translateScalarRex(scalar_rex);
1290  }
1291  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1292  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1293  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1294  }
1295 
1296  return scalar_sources;
1297 }
1298 
1299 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1300  const RelCompound* compound,
1301  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1302  if (!compound->isAggregate()) {
1303  return {nullptr};
1304  }
1305  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1306  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1307  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1308  }
1309  return groupby_exprs;
1310 }
1311 
1312 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1313  const RelAggregate* aggregate,
1314  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1315  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1316  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1317  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1318  }
1319  return groupby_exprs;
1320 }
1321 
1323  const RelAlgTranslator& translator) {
1324  const auto filter_rex = compound->getFilterExpr();
1325  const auto filter_expr =
1326  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1327  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1329 }
1330 
1331 std::vector<Analyzer::Expr*> translate_targets(
1332  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1333  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1334  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1335  const RelCompound* compound,
1336  const RelAlgTranslator& translator,
1337  const ExecutorType executor_type) {
1338  std::vector<Analyzer::Expr*> target_exprs;
1339  for (size_t i = 0; i < compound->size(); ++i) {
1340  const auto target_rex = compound->getTargetExpr(i);
1341  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1342  std::shared_ptr<Analyzer::Expr> target_expr;
1343  if (target_rex_agg) {
1344  target_expr =
1345  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1346  } else {
1347  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1348  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1349  if (target_rex_ref) {
1350  const auto ref_idx = target_rex_ref->getIndex();
1351  CHECK_GE(ref_idx, size_t(1));
1352  CHECK_LE(ref_idx, groupby_exprs.size());
1353  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1354  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1355  } else {
1356  target_expr = translator.translateScalarRex(target_rex_scalar);
1357  auto rewritten_expr = rewrite_expr(target_expr.get());
1358  target_expr = fold_expr(rewritten_expr.get());
1359  if (executor_type == ExecutorType::Native) {
1360  try {
1361  target_expr = set_transient_dict(target_expr);
1362  } catch (...) {
1363  // noop
1364  }
1365  } else {
1366  target_expr = cast_dict_to_none(target_expr);
1367  }
1368  }
1369  }
1370  CHECK(target_expr);
1371  target_exprs_owned.push_back(target_expr);
1372  target_exprs.push_back(target_expr.get());
1373  }
1374  return target_exprs;
1375 }
1376 
1377 std::vector<Analyzer::Expr*> translate_targets(
1378  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1379  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1380  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1381  const RelAggregate* aggregate,
1382  const RelAlgTranslator& translator) {
1383  std::vector<Analyzer::Expr*> target_exprs;
1384  size_t group_key_idx = 1;
1385  for (const auto& groupby_expr : groupby_exprs) {
1386  auto target_expr =
1387  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1388  target_exprs_owned.push_back(target_expr);
1389  target_exprs.push_back(target_expr.get());
1390  }
1391 
1392  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1393  auto target_expr =
1394  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1395  CHECK(target_expr);
1396  target_expr = fold_expr(target_expr.get());
1397  target_exprs_owned.push_back(target_expr);
1398  target_exprs.push_back(target_expr.get());
1399  }
1400  return target_exprs;
1401 }
1402 
1404  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1405  return agg_expr && agg_expr->get_is_distinct();
1406 }
1407 
1408 bool is_agg(const Analyzer::Expr* expr) {
1409  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1410  if (agg_expr && agg_expr->get_contains_agg()) {
1411  auto agg_type = agg_expr->get_aggtype();
1412  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1413  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1414  return true;
1415  }
1416  }
1417  return false;
1418 }
1419 
1421  if (is_count_distinct(&expr)) {
1422  return SQLTypeInfo(kBIGINT, false);
1423  } else if (is_agg(&expr)) {
1425  }
1426  return get_logical_type_info(expr.get_type_info());
1427 }
1428 
1429 template <class RA>
1430 std::vector<TargetMetaInfo> get_targets_meta(
1431  const RA* ra_node,
1432  const std::vector<Analyzer::Expr*>& target_exprs) {
1433  std::vector<TargetMetaInfo> targets_meta;
1434  CHECK_EQ(ra_node->size(), target_exprs.size());
1435  for (size_t i = 0; i < ra_node->size(); ++i) {
1436  CHECK(target_exprs[i]);
1437  // TODO(alex): remove the count distinct type fixup.
1438  targets_meta.emplace_back(ra_node->getFieldName(i),
1439  get_logical_type_for_expr(*target_exprs[i]),
1440  target_exprs[i]->get_type_info());
1441  }
1442  return targets_meta;
1443 }
1444 
1445 template <>
1446 std::vector<TargetMetaInfo> get_targets_meta(
1447  const RelFilter* filter,
1448  const std::vector<Analyzer::Expr*>& target_exprs) {
1449  RelAlgNode const* input0 = filter->getInput(0);
1450  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1451  return get_targets_meta(input, target_exprs);
1452  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1453  return get_targets_meta(input, target_exprs);
1454  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1455  return get_targets_meta(input, target_exprs);
1456  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1457  return get_targets_meta(input, target_exprs);
1458  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1459  return get_targets_meta(input, target_exprs);
1460  }
1461  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1462  return {};
1463 }
1464 
1465 } // namespace
1466 
1468  const CompilationOptions& co_in,
1469  const ExecutionOptions& eo_in,
1470  const int64_t queue_time_ms) {
1471  CHECK(node);
1472  auto timer = DEBUG_TIMER(__func__);
1473 
1475 
1476  auto co = co_in;
1477  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1478  // encoded string updates
1479 
1480  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1481  auto& work_unit,
1482  const bool is_aggregate) {
1484  std::make_unique<UpdateTransactionParameters>(node->getModifiedTableDescriptor(),
1485  node->getTargetColumns(),
1486  node->getOutputMetainfo(),
1487  node->isVarlenUpdateRequired());
1488 
1489  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1490 
1491  auto execute_update_ra_exe_unit = [this, &co, &eo_in, &table_infos](
1492  const RelAlgExecutionUnit& ra_exe_unit,
1493  const bool is_aggregate) {
1495 
1496  auto eo = eo_in;
1497  if (dml_transaction_parameters_->tableIsTemporary()) {
1498  eo.output_columnar_hint = true;
1499  co_project.allow_lazy_fetch = false;
1500  co_project.filter_on_deleted_column =
1501  false; // project the entire delete column for columnar update
1502  }
1503 
1504  auto update_transaction_parameters =
1506  CHECK(update_transaction_parameters);
1507  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1508  executor_->executeUpdate(ra_exe_unit,
1509  table_infos,
1510  co_project,
1511  eo,
1512  cat_,
1513  executor_->row_set_mem_owner_,
1514  update_callback,
1515  is_aggregate);
1516  post_execution_callback_ = [this]() {
1517  dml_transaction_parameters_->finalizeTransaction();
1518  };
1519  };
1520 
1521  if (dml_transaction_parameters_->tableIsTemporary()) {
1522  // hold owned target exprs during execution if rewriting
1523  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1524  // rewrite temp table updates to generate the full column by moving the where
1525  // clause into a case if such a rewrite is not possible, bail on the update
1526  // operation build an expr for the update target
1527  auto update_transaction_params =
1529  CHECK(update_transaction_params);
1530  const auto td = update_transaction_params->getTableDescriptor();
1531  CHECK(td);
1532  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1533  if (update_column_names.size() > 1) {
1534  throw std::runtime_error(
1535  "Multi-column update is not yet supported for temporary tables.");
1536  }
1537 
1538  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1539  CHECK(cd);
1540  auto projected_column_to_update =
1541  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1542  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1543  work_unit.exe_unit, projected_column_to_update);
1544  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1545  throw std::runtime_error(
1546  "Variable length updates not yet supported on temporary tables.");
1547  }
1548  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1549  } else {
1550  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1551  }
1552  };
1553 
1554  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1555  auto work_unit =
1556  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1557 
1558  execute_update_for_node(compound, work_unit, compound->isAggregate());
1559  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1560  auto work_unit =
1561  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1562 
1563  if (project->isSimple()) {
1564  CHECK_EQ(size_t(1), project->inputCount());
1565  const auto input_ra = project->getInput(0);
1566  if (dynamic_cast<const RelSort*>(input_ra)) {
1567  const auto& input_table =
1568  get_temporary_table(&temporary_tables_, -input_ra->getId());
1569  CHECK(input_table);
1570  work_unit.exe_unit.scan_limit = input_table->rowCount();
1571  }
1572  }
1573 
1574  execute_update_for_node(project, work_unit, false);
1575  } else {
1576  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1577  }
1578 }
1579 
1581  const CompilationOptions& co,
1582  const ExecutionOptions& eo_in,
1583  const int64_t queue_time_ms) {
1584  CHECK(node);
1585  auto timer = DEBUG_TIMER(__func__);
1586 
1588 
1589  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1590  auto& work_unit,
1591  const bool is_aggregate) {
1592  auto* table_descriptor = node->getModifiedTableDescriptor();
1593  CHECK(table_descriptor);
1594  if (!table_descriptor->hasDeletedCol) {
1595  throw std::runtime_error(
1596  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1597  }
1598 
1599  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1600 
1601  auto execute_delete_ra_exe_unit =
1602  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1603  const bool is_aggregate) {
1604  dml_transaction_parameters_ = std::make_unique<DeleteTransactionParameters>(
1605  table_is_temporary(table_descriptor));
1606  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1608  CHECK(delete_params);
1609  auto delete_callback = yieldDeleteCallback(*delete_params);
1611 
1612  auto eo = eo_in;
1613  if (dml_transaction_parameters_->tableIsTemporary()) {
1614  eo.output_columnar_hint = true;
1615  co_delete.filter_on_deleted_column =
1616  false; // project the entire delete column for columnar update
1617  } else {
1618  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1619  }
1620 
1621  executor_->executeUpdate(exe_unit,
1622  table_infos,
1623  co_delete,
1624  eo,
1625  cat_,
1626  executor_->row_set_mem_owner_,
1627  delete_callback,
1628  is_aggregate);
1629  post_execution_callback_ = [this]() {
1630  dml_transaction_parameters_->finalizeTransaction();
1631  };
1632  };
1633 
1634  if (table_is_temporary(table_descriptor)) {
1635  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1636  auto cd = cat_.getDeletedColumn(table_descriptor);
1637  CHECK(cd);
1638  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1639  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1640  const auto rewritten_exe_unit =
1641  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1642  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1643  } else {
1644  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1645  }
1646  };
1647 
1648  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1649  const auto work_unit =
1650  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1651  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1652  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1653  auto work_unit =
1654  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1655  if (project->isSimple()) {
1656  CHECK_EQ(size_t(1), project->inputCount());
1657  const auto input_ra = project->getInput(0);
1658  if (dynamic_cast<const RelSort*>(input_ra)) {
1659  const auto& input_table =
1660  get_temporary_table(&temporary_tables_, -input_ra->getId());
1661  CHECK(input_table);
1662  work_unit.exe_unit.scan_limit = input_table->rowCount();
1663  }
1664  }
1665  execute_delete_for_node(project, work_unit, false);
1666  } else {
1667  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1668  }
1669 }
1670 
1672  const CompilationOptions& co,
1673  const ExecutionOptions& eo,
1674  RenderInfo* render_info,
1675  const int64_t queue_time_ms) {
1676  auto timer = DEBUG_TIMER(__func__);
1677  const auto work_unit =
1678  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1679  CompilationOptions co_compound = co;
1680  return executeWorkUnit(work_unit,
1681  compound->getOutputMetainfo(),
1682  compound->isAggregate(),
1683  co_compound,
1684  eo,
1685  render_info,
1686  queue_time_ms);
1687 }
1688 
1690  const CompilationOptions& co,
1691  const ExecutionOptions& eo,
1692  RenderInfo* render_info,
1693  const int64_t queue_time_ms) {
1694  auto timer = DEBUG_TIMER(__func__);
1695  const auto work_unit = createAggregateWorkUnit(
1696  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1697  return executeWorkUnit(work_unit,
1698  aggregate->getOutputMetainfo(),
1699  true,
1700  co,
1701  eo,
1702  render_info,
1703  queue_time_ms);
1704 }
1705 
1706 namespace {
1707 
1708 // Returns true iff the execution unit contains window functions.
1710  return std::any_of(ra_exe_unit.target_exprs.begin(),
1711  ra_exe_unit.target_exprs.end(),
1712  [](const Analyzer::Expr* expr) {
1713  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1714  });
1715 }
1716 
1717 } // namespace
1718 
1720  const RelProject* project,
1721  const CompilationOptions& co,
1722  const ExecutionOptions& eo,
1723  RenderInfo* render_info,
1724  const int64_t queue_time_ms,
1725  const std::optional<size_t> previous_count) {
1726  auto timer = DEBUG_TIMER(__func__);
1727  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1728  CompilationOptions co_project = co;
1729  if (project->isSimple()) {
1730  CHECK_EQ(size_t(1), project->inputCount());
1731  const auto input_ra = project->getInput(0);
1732  if (dynamic_cast<const RelSort*>(input_ra)) {
1733  co_project.device_type = ExecutorDeviceType::CPU;
1734  const auto& input_table =
1735  get_temporary_table(&temporary_tables_, -input_ra->getId());
1736  CHECK(input_table);
1737  work_unit.exe_unit.scan_limit =
1738  std::min(input_table->getLimit(), input_table->rowCount());
1739  }
1740  }
1741  return executeWorkUnit(work_unit,
1742  project->getOutputMetainfo(),
1743  false,
1744  co_project,
1745  eo,
1746  render_info,
1747  queue_time_ms,
1748  previous_count);
1749 }
1750 
1752  const CompilationOptions& co_in,
1753  const ExecutionOptions& eo,
1754  const int64_t queue_time_ms) {
1756  auto timer = DEBUG_TIMER(__func__);
1757 
1758  auto co = co_in;
1759 
1760  if (g_cluster) {
1761  throw std::runtime_error("Table functions not supported in distributed mode yet");
1762  }
1763  if (!g_enable_table_functions) {
1764  throw std::runtime_error("Table function support is disabled");
1765  }
1766  auto table_func_work_unit = createTableFunctionWorkUnit(
1767  table_func,
1768  eo.just_explain,
1769  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1770  const auto body = table_func_work_unit.body;
1771  CHECK(body);
1772 
1773  const auto table_infos =
1774  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1775 
1776  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1777  co.device_type,
1779  nullptr,
1780  executor_->getCatalog(),
1781  executor_->blockSize(),
1782  executor_->gridSize()),
1783  {}};
1784 
1785  try {
1786  result = {executor_->executeTableFunction(
1787  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1788  body->getOutputMetainfo()};
1789  } catch (const QueryExecutionError& e) {
1792  throw std::runtime_error("Table function ran out of memory during execution");
1793  }
1794  result.setQueueTime(queue_time_ms);
1795  return result;
1796 }
1797 
1798 namespace {
1799 
1800 // Creates a new expression which has the range table index set to 1. This is needed to
1801 // reuse the hash join construction helpers to generate a hash table for the window
1802 // function partition: create an equals expression with left and right sides identical
1803 // except for the range table index.
1804 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1805  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1806  if (tuple) {
1807  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1808  for (const auto& element : tuple->getTuple()) {
1809  transformed_tuple.push_back(transform_to_inner(element.get()));
1810  }
1811  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1812  }
1813  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1814  if (!col) {
1815  throw std::runtime_error("Only columns supported in the window partition for now");
1816  }
1817  return makeExpr<Analyzer::ColumnVar>(
1818  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1819 }
1820 
1821 } // namespace
1822 
1824  const CompilationOptions& co,
1825  const ExecutionOptions& eo,
1826  ColumnCacheMap& column_cache_map,
1827  const int64_t queue_time_ms) {
1828  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1829  CHECK_EQ(query_infos.size(), size_t(1));
1830  if (query_infos.front().info.fragments.size() != 1) {
1831  throw std::runtime_error(
1832  "Only single fragment tables supported for window functions for now");
1833  }
1834  if (eo.executor_type == ::ExecutorType::Extern) {
1835  return;
1836  }
1837  query_infos.push_back(query_infos.front());
1838  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1839  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1840  ++target_index) {
1841  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1842  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1843  if (!window_func) {
1844  continue;
1845  }
1846  // Always use baseline layout hash tables for now, make the expression a tuple.
1847  const auto& partition_keys = window_func->getPartitionKeys();
1848  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1849  if (partition_keys.size() > 1) {
1850  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1851  } else {
1852  if (partition_keys.empty()) {
1853  throw std::runtime_error(
1854  "Empty window function partitions are not supported yet");
1855  }
1856  CHECK_EQ(partition_keys.size(), size_t(1));
1857  partition_key_tuple = partition_keys.front();
1858  }
1859  // Creates a tautology equality with the partition expression on both sides.
1860  const auto partition_key_cond =
1861  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1862  kBW_EQ,
1863  kONE,
1864  partition_key_tuple,
1865  transform_to_inner(partition_key_tuple.get()));
1866  auto context = createWindowFunctionContext(window_func,
1867  partition_key_cond,
1868  ra_exe_unit,
1869  query_infos,
1870  co,
1871  column_cache_map,
1872  executor_->getRowSetMemoryOwner());
1873  context->compute();
1874  window_project_node_context->addWindowFunctionContext(std::move(context),
1875  target_index);
1876  }
1877 }
1878 
1879 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1880  const Analyzer::WindowFunction* window_func,
1881  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1882  const RelAlgExecutionUnit& ra_exe_unit,
1883  const std::vector<InputTableInfo>& query_infos,
1884  const CompilationOptions& co,
1885  ColumnCacheMap& column_cache_map,
1886  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1887  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1890  const auto join_table_or_err =
1891  executor_->buildHashTableForQualifier(partition_key_cond,
1892  query_infos,
1893  memory_level,
1895  column_cache_map,
1896  ra_exe_unit.query_hint);
1897  if (!join_table_or_err.fail_reason.empty()) {
1898  throw std::runtime_error(join_table_or_err.fail_reason);
1899  }
1900  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
1901  const auto& order_keys = window_func->getOrderKeys();
1902  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1903  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1904  auto context = std::make_unique<WindowFunctionContext>(window_func,
1905  join_table_or_err.hash_table,
1906  elem_count,
1907  co.device_type,
1908  row_set_mem_owner);
1909  for (const auto& order_key : order_keys) {
1910  const auto order_col =
1911  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1912  if (!order_col) {
1913  throw std::runtime_error("Only order by columns supported for now");
1914  }
1915  const int8_t* column;
1916  size_t join_col_elem_count;
1917  std::tie(column, join_col_elem_count) =
1919  *order_col,
1920  query_infos.front().info.fragments.front(),
1921  memory_level,
1922  0,
1923  nullptr,
1924  chunks_owner,
1925  column_cache_map);
1926  CHECK_EQ(join_col_elem_count, elem_count);
1927  context->addOrderColumn(column, order_col.get(), chunks_owner);
1928  }
1929  return context;
1930 }
1931 
1933  const CompilationOptions& co,
1934  const ExecutionOptions& eo,
1935  RenderInfo* render_info,
1936  const int64_t queue_time_ms) {
1937  auto timer = DEBUG_TIMER(__func__);
1938  const auto work_unit =
1939  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1940  return executeWorkUnit(
1941  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1942 }
1943 
1944 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
1945  std::vector<TargetMetaInfo> const& rhs) {
1946  if (lhs.size() == rhs.size()) {
1947  for (size_t i = 0; i < lhs.size(); ++i) {
1948  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
1949  return false;
1950  }
1951  }
1952  return true;
1953  }
1954  return false;
1955 }
1956 
1957 bool isGeometry(TargetMetaInfo const& target_meta_info) {
1958  return target_meta_info.get_type_info().is_geometry();
1959 }
1960 
1962  const RaExecutionSequence& seq,
1963  const CompilationOptions& co,
1964  const ExecutionOptions& eo,
1965  RenderInfo* render_info,
1966  const int64_t queue_time_ms) {
1967  auto timer = DEBUG_TIMER(__func__);
1968  if (!logical_union->isAll()) {
1969  throw std::runtime_error("UNION without ALL is not supported yet.");
1970  }
1971  // Will throw a std::runtime_error if types don't match.
1972  logical_union->checkForMatchingMetaInfoTypes();
1973  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
1974  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
1975  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
1976  }
1977  // Only Projections and Aggregates from a UNION are supported for now.
1978  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
1979  if (node->hasInput(logical_union) &&
1980  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
1981  node)) {
1982  throw std::runtime_error("UNION ALL not yet supported in this context.");
1983  }
1984  });
1985 
1986  auto work_unit =
1987  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
1988  return executeWorkUnit(work_unit,
1989  logical_union->getOutputMetainfo(),
1990  false,
1992  eo,
1993  render_info,
1994  queue_time_ms);
1995 }
1996 
1998  const RelLogicalValues* logical_values,
1999  const ExecutionOptions& eo) {
2000  auto timer = DEBUG_TIMER(__func__);
2001  if (eo.just_explain) {
2002  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
2003  }
2004 
2006  logical_values->getNumRows(),
2008  /*is_table_function=*/false);
2009 
2010  auto tuple_type = logical_values->getTupleType();
2011  for (size_t i = 0; i < tuple_type.size(); ++i) {
2012  auto& target_meta_info = tuple_type[i];
2013  if (target_meta_info.get_type_info().is_varlen()) {
2014  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2015  }
2016  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2017  // replace w/ bigint
2018  tuple_type[i] =
2019  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2020  }
2021  query_mem_desc.addColSlotInfo(
2022  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2023  }
2024  logical_values->setOutputMetainfo(tuple_type);
2025 
2026  std::vector<TargetInfo> target_infos;
2027  for (const auto& tuple_type_component : tuple_type) {
2028  target_infos.emplace_back(TargetInfo{false,
2029  kCOUNT,
2030  tuple_type_component.get_type_info(),
2031  SQLTypeInfo(kNULLT, false),
2032  false,
2033  false});
2034  }
2035 
2036  std::shared_ptr<ResultSet> rs{
2037  ResultSetLogicalValuesBuilder{logical_values,
2038  target_infos,
2041  executor_->getRowSetMemoryOwner(),
2042  executor_}
2043  .build()};
2044 
2045  return {rs, tuple_type};
2046 }
2047 
2048 namespace {
2049 
2050 template <class T>
2051 int64_t insert_one_dict_str(T* col_data,
2052  const std::string& columnName,
2053  const SQLTypeInfo& columnType,
2054  const Analyzer::Constant* col_cv,
2055  const Catalog_Namespace::Catalog& catalog) {
2056  if (col_cv->get_is_null()) {
2057  *col_data = inline_fixed_encoding_null_val(columnType);
2058  } else {
2059  const int dict_id = columnType.get_comp_param();
2060  const auto col_datum = col_cv->get_constval();
2061  const auto& str = *col_datum.stringval;
2062  const auto dd = catalog.getMetadataForDict(dict_id);
2063  CHECK(dd && dd->stringDict);
2064  int32_t str_id = dd->stringDict->getOrAdd(str);
2065  if (!dd->dictIsTemp) {
2066  const auto checkpoint_ok = dd->stringDict->checkpoint();
2067  if (!checkpoint_ok) {
2068  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2069  columnName);
2070  }
2071  }
2072  const bool invalid = str_id > max_valid_int_value<T>();
2073  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2074  if (invalid) {
2075  LOG(ERROR) << "Could not encode string: " << str
2076  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2077  << " bits. Will store NULL instead.";
2078  }
2079  str_id = inline_fixed_encoding_null_val(columnType);
2080  }
2081  *col_data = str_id;
2082  }
2083  return *col_data;
2084 }
2085 
2086 template <class T>
2087 int64_t insert_one_dict_str(T* col_data,
2088  const ColumnDescriptor* cd,
2089  const Analyzer::Constant* col_cv,
2090  const Catalog_Namespace::Catalog& catalog) {
2091  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2092 }
2093 
2094 } // namespace
2095 
2097  const ExecutionOptions& eo) {
2098  auto timer = DEBUG_TIMER(__func__);
2099  if (eo.just_explain) {
2100  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2101  }
2102 
2103  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2106  executor_->getRowSetMemoryOwner(),
2107  executor_->getCatalog(),
2108  executor_->blockSize(),
2109  executor_->gridSize());
2110 
2111  std::vector<TargetMetaInfo> empty_targets;
2112  return {rs, empty_targets};
2113 }
2114 
2116  // Note: We currently obtain an executor for this method, but we do not need it.
2117  // Therefore, we skip the executor state setup in the regular execution path. In the
2118  // future, we will likely want to use the executor to evaluate expressions in the insert
2119  // statement.
2120 
2121  const auto& targets = query.get_targetlist();
2122  const int table_id = query.get_result_table_id();
2123  const auto& col_id_list = query.get_result_col_list();
2124 
2125  std::vector<const ColumnDescriptor*> col_descriptors;
2126  std::vector<int> col_ids;
2127  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2128  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2129  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2130 
2131  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2132  CHECK(table_descriptor);
2133  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
2134  const TableDescriptor* shard{nullptr};
2135 
2136  for (const int col_id : col_id_list) {
2137  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2138  const auto col_enc = cd->columnType.get_compression();
2139  if (cd->columnType.is_string()) {
2140  switch (col_enc) {
2141  case kENCODING_NONE: {
2142  auto it_ok =
2143  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2144  CHECK(it_ok.second);
2145  break;
2146  }
2147  case kENCODING_DICT: {
2148  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2149  CHECK(dd);
2150  const auto it_ok = col_buffers.emplace(
2151  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2152  CHECK(it_ok.second);
2153  break;
2154  }
2155  default:
2156  CHECK(false);
2157  }
2158  } else if (cd->columnType.is_geometry()) {
2159  auto it_ok =
2160  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2161  CHECK(it_ok.second);
2162  } else if (cd->columnType.is_array()) {
2163  auto it_ok =
2164  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2165  CHECK(it_ok.second);
2166  } else {
2167  const auto it_ok = col_buffers.emplace(
2168  col_id,
2169  std::unique_ptr<uint8_t[]>(
2170  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2171  // the buffer
2172  CHECK(it_ok.second);
2173  }
2174  col_descriptors.push_back(cd);
2175  col_ids.push_back(col_id);
2176  }
2177  size_t col_idx = 0;
2179  insert_data.databaseId = cat_.getCurrentDB().dbId;
2180  insert_data.tableId = table_id;
2181  int64_t int_col_val{0};
2182  for (auto target_entry : targets) {
2183  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2184  if (!col_cv) {
2185  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2186  CHECK(col_cast);
2187  CHECK_EQ(kCAST, col_cast->get_optype());
2188  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2189  }
2190  CHECK(col_cv);
2191  const auto cd = col_descriptors[col_idx];
2192  auto col_datum = col_cv->get_constval();
2193  auto col_type = cd->columnType.get_type();
2194  uint8_t* col_data_bytes{nullptr};
2195  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2196  (!cd->columnType.is_string() ||
2197  cd->columnType.get_compression() == kENCODING_DICT)) {
2198  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2199  CHECK(col_data_bytes_it != col_buffers.end());
2200  col_data_bytes = col_data_bytes_it->second.get();
2201  }
2202  switch (col_type) {
2203  case kBOOLEAN: {
2204  auto col_data = col_data_bytes;
2205  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2206  : (col_datum.boolval ? 1 : 0);
2207  break;
2208  }
2209  case kTINYINT: {
2210  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2211  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2212  : col_datum.tinyintval;
2213  int_col_val = col_datum.tinyintval;
2214  break;
2215  }
2216  case kSMALLINT: {
2217  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2218  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2219  : col_datum.smallintval;
2220  int_col_val = col_datum.smallintval;
2221  break;
2222  }
2223  case kINT: {
2224  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2225  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2226  : col_datum.intval;
2227  int_col_val = col_datum.intval;
2228  break;
2229  }
2230  case kBIGINT:
2231  case kDECIMAL:
2232  case kNUMERIC: {
2233  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2234  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2235  : col_datum.bigintval;
2236  int_col_val = col_datum.bigintval;
2237  break;
2238  }
2239  case kFLOAT: {
2240  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2241  *col_data = col_datum.floatval;
2242  break;
2243  }
2244  case kDOUBLE: {
2245  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2246  *col_data = col_datum.doubleval;
2247  break;
2248  }
2249  case kTEXT:
2250  case kVARCHAR:
2251  case kCHAR: {
2252  switch (cd->columnType.get_compression()) {
2253  case kENCODING_NONE:
2254  str_col_buffers[col_ids[col_idx]].push_back(
2255  col_datum.stringval ? *col_datum.stringval : "");
2256  break;
2257  case kENCODING_DICT: {
2258  switch (cd->columnType.get_size()) {
2259  case 1:
2260  int_col_val = insert_one_dict_str(
2261  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2262  break;
2263  case 2:
2264  int_col_val = insert_one_dict_str(
2265  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2266  break;
2267  case 4:
2268  int_col_val = insert_one_dict_str(
2269  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2270  break;
2271  default:
2272  CHECK(false);
2273  }
2274  break;
2275  }
2276  default:
2277  CHECK(false);
2278  }
2279  break;
2280  }
2281  case kTIME:
2282  case kTIMESTAMP:
2283  case kDATE: {
2284  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2285  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2286  : col_datum.bigintval;
2287  break;
2288  }
2289  case kARRAY: {
2290  const auto is_null = col_cv->get_is_null();
2291  const auto size = cd->columnType.get_size();
2292  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2293  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2294  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2295  if (is_null && !is_point_coords) {
2296  if (size > 0) {
2297  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2298  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2299  throw std::runtime_error("Column " + cd->columnName +
2300  " doesn't accept NULL values");
2301  }
2302  int8_t* buf = (int8_t*)checked_malloc(size);
2303  put_null_array(static_cast<void*>(buf), elem_ti, "");
2304  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2305  p += elem_ti.get_size()) {
2306  put_null(static_cast<void*>(p), elem_ti, "");
2307  }
2308  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2309  } else {
2310  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2311  }
2312  break;
2313  }
2314  const auto l = col_cv->get_value_list();
2315  size_t len = l.size() * elem_ti.get_size();
2316  if (size > 0 && static_cast<size_t>(size) != len) {
2317  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2318  std::to_string(size / elem_ti.get_size()) +
2319  " values, " + "received " + std::to_string(l.size()));
2320  }
2321  if (elem_ti.is_string()) {
2322  CHECK(kENCODING_DICT == elem_ti.get_compression());
2323  CHECK(4 == elem_ti.get_size());
2324 
2325  int8_t* buf = (int8_t*)checked_malloc(len);
2326  int32_t* p = reinterpret_cast<int32_t*>(buf);
2327 
2328  int elemIndex = 0;
2329  for (auto& e : l) {
2330  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2331  CHECK(c);
2332 
2333  int_col_val = insert_one_dict_str(
2334  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2335 
2336  elemIndex++;
2337  }
2338  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2339 
2340  } else {
2341  int8_t* buf = (int8_t*)checked_malloc(len);
2342  int8_t* p = buf;
2343  for (auto& e : l) {
2344  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2345  CHECK(c);
2346  p = appendDatum(p, c->get_constval(), elem_ti);
2347  }
2348  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2349  }
2350  break;
2351  }
2352  case kPOINT:
2353  case kLINESTRING:
2354  case kPOLYGON:
2355  case kMULTIPOLYGON:
2356  str_col_buffers[col_ids[col_idx]].push_back(
2357  col_datum.stringval ? *col_datum.stringval : "");
2358  break;
2359  default:
2360  CHECK(false);
2361  }
2362  ++col_idx;
2363  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2364  const auto shard_count = shard_tables.size();
2365  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2366  shard = shard_tables[shard_idx];
2367  }
2368  }
2369  for (const auto& kv : col_buffers) {
2370  insert_data.columnIds.push_back(kv.first);
2371  DataBlockPtr p;
2372  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2373  insert_data.data.push_back(p);
2374  }
2375  for (auto& kv : str_col_buffers) {
2376  insert_data.columnIds.push_back(kv.first);
2377  DataBlockPtr p;
2378  p.stringsPtr = &kv.second;
2379  insert_data.data.push_back(p);
2380  }
2381  for (auto& kv : arr_col_buffers) {
2382  insert_data.columnIds.push_back(kv.first);
2383  DataBlockPtr p;
2384  p.arraysPtr = &kv.second;
2385  insert_data.data.push_back(p);
2386  }
2387  insert_data.numRows = 1;
2388  if (shard) {
2389  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2390  } else {
2391  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2392  }
2393 
2394  // Ensure checkpoint happens across all shards, if not in distributed
2395  // mode (aggregator handles checkpointing in distributed mode)
2396  if (!g_cluster &&
2397  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2398  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2399  }
2400 
2401  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2404  executor_->getRowSetMemoryOwner(),
2405  nullptr,
2406  0,
2407  0);
2408  std::vector<TargetMetaInfo> empty_targets;
2409  return {rs, empty_targets};
2410 }
2411 
2412 namespace {
2413 
2414 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2415 // the executor interface to use the collation directly and remove this conversion.
2416 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2417  std::list<Analyzer::OrderEntry> result;
2418  for (size_t i = 0; i < sort->collationCount(); ++i) {
2419  const auto sort_field = sort->getCollation(i);
2420  result.emplace_back(sort_field.getField() + 1,
2421  sort_field.getSortDir() == SortDirection::Descending,
2422  sort_field.getNullsPosition() == NullSortedPosition::First);
2423  }
2424  return result;
2425 }
2426 
2427 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2428  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2429  if (aggregate) {
2430  return 0;
2431  }
2432  const auto compound = dynamic_cast<const RelCompound*>(ra);
2433  return (compound && compound->isAggregate()) ? 0 : limit;
2434 }
2435 
2436 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2437  return !order_entries.empty() && order_entries.front().is_desc;
2438 }
2439 
2440 } // namespace
2441 
2443  const CompilationOptions& co,
2444  const ExecutionOptions& eo,
2445  RenderInfo* render_info,
2446  const int64_t queue_time_ms) {
2447  auto timer = DEBUG_TIMER(__func__);
2449  const auto source = sort->getInput(0);
2450  const bool is_aggregate = node_is_aggregate(source);
2451  auto it = leaf_results_.find(sort->getId());
2452  if (it != leaf_results_.end()) {
2453  // Add any transient string literals to the sdp on the agg
2454  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2456  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2457  // Handle push-down for LIMIT for multi-node
2458  auto& aggregated_result = it->second;
2459  auto& result_rows = aggregated_result.rs;
2460  const size_t limit = sort->getLimit();
2461  const size_t offset = sort->getOffset();
2462  const auto order_entries = get_order_entries(sort);
2463  if (limit || offset) {
2464  if (!order_entries.empty()) {
2465  result_rows->sort(order_entries, limit + offset, executor_);
2466  }
2467  result_rows->dropFirstN(offset);
2468  if (limit) {
2469  result_rows->keepFirstN(limit);
2470  }
2471  }
2472  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2473  sort->setOutputMetainfo(aggregated_result.targets_meta);
2474  return result;
2475  }
2476 
2477  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2478  bool is_desc{false};
2479 
2480  auto execute_sort_query = [this,
2481  sort,
2482  &source,
2483  &is_aggregate,
2484  &eo,
2485  &co,
2486  render_info,
2487  queue_time_ms,
2488  &groupby_exprs,
2489  &is_desc]() -> ExecutionResult {
2490  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2491  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2492  ExecutionOptions eo_copy = {
2494  eo.allow_multifrag,
2495  eo.just_explain,
2496  eo.allow_loop_joins,
2497  eo.with_watchdog,
2498  eo.jit_debug,
2499  eo.just_validate || sort->isEmptyResult(),
2500  eo.with_dynamic_watchdog,
2501  eo.dynamic_watchdog_time_limit,
2502  eo.find_push_down_candidates,
2503  eo.just_calcite_explain,
2504  eo.gpu_input_mem_limit_percent,
2505  eo.allow_runtime_query_interrupt,
2506  eo.pending_query_interrupt_freq,
2507  eo.executor_type,
2508  };
2509 
2510  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2511  auto source_result = executeWorkUnit(source_work_unit,
2512  source->getOutputMetainfo(),
2513  is_aggregate,
2514  co,
2515  eo_copy,
2516  render_info,
2517  queue_time_ms);
2518  if (render_info && render_info->isPotentialInSituRender()) {
2519  return source_result;
2520  }
2521  if (source_result.isFilterPushDownEnabled()) {
2522  return source_result;
2523  }
2524  auto rows_to_sort = source_result.getRows();
2525  if (eo.just_explain) {
2526  return {rows_to_sort, {}};
2527  }
2528  const size_t limit = sort->getLimit();
2529  const size_t offset = sort->getOffset();
2530  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2531  !use_speculative_top_n(source_work_unit.exe_unit,
2532  rows_to_sort->getQueryMemDesc())) {
2533  const size_t top_n = limit == 0 ? 0 : limit + offset;
2534  rows_to_sort->sort(
2535  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2536  }
2537  if (limit || offset) {
2538  if (g_cluster && sort->collationCount() == 0) {
2539  if (offset >= rows_to_sort->rowCount()) {
2540  rows_to_sort->dropFirstN(offset);
2541  } else {
2542  rows_to_sort->keepFirstN(limit + offset);
2543  }
2544  } else {
2545  rows_to_sort->dropFirstN(offset);
2546  if (limit) {
2547  rows_to_sort->keepFirstN(limit);
2548  }
2549  }
2550  }
2551  return {rows_to_sort, source_result.getTargetsMeta()};
2552  };
2553 
2554  try {
2555  return execute_sort_query();
2556  } catch (const SpeculativeTopNFailed& e) {
2557  CHECK_EQ(size_t(1), groupby_exprs.size());
2558  CHECK(groupby_exprs.front());
2559  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2560  return execute_sort_query();
2561  }
2562 }
2563 
2565  const RelSort* sort,
2566  const ExecutionOptions& eo) {
2567  const auto source = sort->getInput(0);
2568  const size_t limit = sort->getLimit();
2569  const size_t offset = sort->getOffset();
2570  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2571  const size_t scan_total_limit =
2572  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2573  size_t max_groups_buffer_entry_guess{
2574  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2576  const auto order_entries = get_order_entries(sort);
2577  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2578  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2579  const auto& source_exe_unit = source_work_unit.exe_unit;
2580 
2581  // we do not allow sorting geometry or array types
2582  for (auto order_entry : order_entries) {
2583  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2584  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2585  const auto& ti = get_target_info(te, false);
2586  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2587  throw std::runtime_error(
2588  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2589  }
2590  }
2591 
2592  if (source_exe_unit.groupby_exprs.size() == 1) {
2593  if (!source_exe_unit.groupby_exprs.front()) {
2594  sort_algorithm = SortAlgorithm::StreamingTopN;
2595  } else {
2596  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2597  first_oe_is_desc(order_entries))) {
2598  sort_algorithm = SortAlgorithm::Default;
2599  }
2600  }
2601  }
2602 
2603  sort->setOutputMetainfo(source->getOutputMetainfo());
2604  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2605  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2606  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2607  std::move(source_exe_unit.input_col_descs),
2608  source_exe_unit.simple_quals,
2609  source_exe_unit.quals,
2610  source_exe_unit.join_quals,
2611  source_exe_unit.groupby_exprs,
2612  source_exe_unit.target_exprs,
2613  nullptr,
2614  {sort_info.order_entries, sort_algorithm, limit, offset},
2615  scan_total_limit,
2616  source_exe_unit.query_hint,
2617  source_exe_unit.use_bump_allocator,
2618  source_exe_unit.union_all,
2619  source_exe_unit.query_state},
2620  source,
2621  max_groups_buffer_entry_guess,
2622  std::move(source_work_unit.query_rewriter),
2623  source_work_unit.input_permutation,
2624  source_work_unit.left_deep_join_input_sizes};
2625 }
2626 
2627 namespace {
2628 
2635 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2636  CHECK(!table_infos.empty());
2637  const auto& first_table = table_infos.front();
2638  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2639  for (const auto& table_info : table_infos) {
2640  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2641  max_num_groups = table_info.info.getNumTuplesUpperBound();
2642  }
2643  }
2644  return std::max(max_num_groups, size_t(1));
2645 }
2646 
2654  for (const auto target_expr : ra_exe_unit.target_exprs) {
2655  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2656  return false;
2657  }
2658  }
2659  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2660  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2661  return true;
2662  }
2663  return false;
2664 }
2665 
2666 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2667  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2668  ra_exe_unit.simple_quals.empty());
2669 }
2670 
2672  const RelAlgExecutionUnit& ra_exe_unit_in,
2673  const std::vector<InputTableInfo>& table_infos,
2674  const Executor* executor,
2675  const ExecutorDeviceType device_type_in,
2676  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2677  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2678  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2679  const auto target_expr = ra_exe_unit.target_exprs[i];
2680  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2681  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2682  continue;
2683  }
2684  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2685  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2686  CHECK(arg);
2687  const auto& arg_ti = arg->get_type_info();
2688  // Avoid calling getExpressionRange for variable length types (string and array),
2689  // it'd trigger an assertion since that API expects to be called only for types
2690  // for which the notion of range is well-defined. A bit of a kludge, but the
2691  // logic to reject these types anyway is at lower levels in the stack and not
2692  // really worth pulling into a separate function for now.
2693  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2694  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2695  continue;
2696  }
2697  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2698  if (arg_range.getType() != ExpressionRangeType::Integer) {
2699  continue;
2700  }
2701  // When running distributed, the threshold for using the precise implementation
2702  // must be consistent across all leaves, otherwise we could have a mix of precise
2703  // and approximate bitmaps and we cannot aggregate them.
2704  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2705  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2706  const auto sub_bitmap_count =
2707  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2708  int64_t approx_bitmap_sz_bits{0};
2709  const auto error_rate =
2710  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2711  if (error_rate) {
2712  CHECK(error_rate->get_type_info().get_type() == kINT);
2713  CHECK_GE(error_rate->get_constval().intval, 1);
2714  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2715  } else {
2716  approx_bitmap_sz_bits = g_hll_precision_bits;
2717  }
2718  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2719  arg_range.getIntMin(),
2720  approx_bitmap_sz_bits,
2721  true,
2722  device_type,
2723  sub_bitmap_count};
2724  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2725  arg_range.getIntMin(),
2726  bitmap_sz_bits,
2727  false,
2728  device_type,
2729  sub_bitmap_count};
2730  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2731  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2732  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2733  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2734  target_exprs_owned.push_back(precise_count_distinct);
2735  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2736  }
2737  }
2738  return ra_exe_unit;
2739 }
2740 
2742  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2743  const std::vector<TargetMetaInfo>& targets_meta) {
2744  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2745  render_info.targets.clear();
2746  for (size_t i = 0; i < targets_meta.size(); ++i) {
2747  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2748  targets_meta[i].get_resname(),
2749  work_unit_target_exprs[i]->get_shared_ptr(),
2750  false));
2751  }
2752 }
2753 
2754 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2755  const CompilationOptions& co,
2756  const ExecutionOptions& eo) {
2758  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2759 }
2760 
2761 } // namespace
2762 
2764  const RelAlgExecutor::WorkUnit& work_unit,
2765  const std::vector<TargetMetaInfo>& targets_meta,
2766  const bool is_agg,
2767  const CompilationOptions& co_in,
2768  const ExecutionOptions& eo,
2769  RenderInfo* render_info,
2770  const int64_t queue_time_ms,
2771  const std::optional<size_t> previous_count) {
2773  auto timer = DEBUG_TIMER(__func__);
2774 
2775  auto co = co_in;
2776  ColumnCacheMap column_cache;
2777  if (is_window_execution_unit(work_unit.exe_unit)) {
2779  throw std::runtime_error("Window functions support is disabled");
2780  }
2782  co.allow_lazy_fetch = false;
2783  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2784  }
2785  if (!eo.just_explain && eo.find_push_down_candidates) {
2786  // find potential candidates:
2787  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2788  if (!selected_filters.empty() || eo.just_calcite_explain) {
2789  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2790  }
2791  }
2792  if (render_info && render_info->isPotentialInSituRender()) {
2793  co.allow_lazy_fetch = false;
2794  }
2795  const auto body = work_unit.body;
2796  CHECK(body);
2797  auto it = leaf_results_.find(body->getId());
2798  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2799  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2800  if (it != leaf_results_.end()) {
2802  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2803  auto& aggregated_result = it->second;
2804  auto& result_rows = aggregated_result.rs;
2805  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2806  body->setOutputMetainfo(aggregated_result.targets_meta);
2807  if (render_info) {
2808  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2809  }
2810  return result;
2811  }
2812  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2813 
2815  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2816 
2817  // register query hint if query_dag_ is valid
2818  ra_exe_unit.query_hint =
2819  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults();
2820 
2821  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2822  if (is_window_execution_unit(ra_exe_unit)) {
2823  CHECK_EQ(table_infos.size(), size_t(1));
2824  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2825  max_groups_buffer_entry_guess =
2826  table_infos.front().info.fragments.front().getNumTuples();
2827  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2828  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2829  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2830  ra_exe_unit.scan_limit = *previous_count;
2831  } else {
2832  // TODO(adb): enable bump allocator path for render queries
2833  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2834  ra_exe_unit.scan_limit = 0;
2835  ra_exe_unit.use_bump_allocator = true;
2836  } else if (eo.executor_type == ::ExecutorType::Extern) {
2837  ra_exe_unit.scan_limit = 0;
2838  } else if (!eo.just_explain) {
2839  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2840  if (filter_count_all) {
2841  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2842  }
2843  }
2844  }
2845  }
2846  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2847  co.device_type,
2849  nullptr,
2850  executor_->getCatalog(),
2851  executor_->blockSize(),
2852  executor_->gridSize()),
2853  {}};
2854 
2855  auto execute_and_handle_errors =
2856  [&](const auto max_groups_buffer_entry_guess_in,
2857  const bool has_cardinality_estimation) -> ExecutionResult {
2858  // Note that the groups buffer entry guess may be modified during query execution.
2859  // Create a local copy so we can track those changes if we need to attempt a retry
2860  // due to OOM
2861  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2862  try {
2863  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2864  is_agg,
2865  table_infos,
2866  ra_exe_unit,
2867  co,
2868  eo,
2869  cat_,
2870  render_info,
2871  has_cardinality_estimation,
2872  column_cache),
2873  targets_meta};
2874  } catch (const QueryExecutionError& e) {
2876  return handleOutOfMemoryRetry(
2877  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2878  targets_meta,
2879  is_agg,
2880  co,
2881  eo,
2882  render_info,
2884  queue_time_ms);
2885  }
2886  };
2887 
2888  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2889  try {
2890  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2891  auto card = cached_cardinality.second;
2892  if (cached_cardinality.first && card >= 0) {
2893  result = execute_and_handle_errors(card, true);
2894  } else {
2895  result = execute_and_handle_errors(
2896  max_groups_buffer_entry_guess,
2898  }
2899  } catch (const CardinalityEstimationRequired& e) {
2900  // check the cardinality cache
2901  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2902  auto card = cached_cardinality.second;
2903  if (cached_cardinality.first && card >= 0) {
2904  result = execute_and_handle_errors(card, true);
2905  } else {
2906  const auto estimated_groups_buffer_entry_guess =
2907  2 * std::min(groups_approx_upper_bound(table_infos),
2908  getNDVEstimation(work_unit, e.range(), is_agg, co, eo));
2909  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2910  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2911  if (!(eo.just_validate || eo.just_explain)) {
2912  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
2913  }
2914  }
2915  }
2916 
2917  result.setQueueTime(queue_time_ms);
2918  if (render_info) {
2919  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2920  if (render_info->isPotentialInSituRender()) {
2921  // return an empty result (with the same queue time, and zero render time)
2922  return {std::make_shared<ResultSet>(
2923  queue_time_ms,
2924  0,
2925  executor_->row_set_mem_owner_
2926  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2927  : nullptr),
2928  {}};
2929  }
2930  }
2931  return result;
2932 }
2933 
2934 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
2935  const bool is_agg,
2936  const CompilationOptions& co,
2937  const ExecutionOptions& eo) {
2938  const auto count =
2939  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2940  kCOUNT,
2941  nullptr,
2942  false,
2943  nullptr);
2944  const auto count_all_exe_unit =
2945  create_count_all_execution_unit(work_unit.exe_unit, count);
2946  size_t one{1};
2947  ResultSetPtr count_all_result;
2948  try {
2949  ColumnCacheMap column_cache;
2950  count_all_result =
2951  executor_->executeWorkUnit(one,
2952  is_agg,
2953  get_table_infos(work_unit.exe_unit, executor_),
2954  count_all_exe_unit,
2955  co,
2956  eo,
2957  cat_,
2958  nullptr,
2959  false,
2960  column_cache);
2961  } catch (const foreign_storage::ForeignStorageException& error) {
2962  throw error;
2963  } catch (const std::exception& e) {
2964  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2965  return std::nullopt;
2966  }
2967  const auto count_row = count_all_result->getNextRow(false, false);
2968  CHECK_EQ(size_t(1), count_row.size());
2969  const auto& count_tv = count_row.front();
2970  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2971  CHECK(count_scalar_tv);
2972  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2973  CHECK(count_ptr);
2974  CHECK_GE(*count_ptr, 0);
2975  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2976  return std::max(count_upper_bound, size_t(1));
2977 }
2978 
2979 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2980  const auto& ra_exe_unit = work_unit.exe_unit;
2981  if (ra_exe_unit.input_descs.size() != 1) {
2982  return false;
2983  }
2984  const auto& table_desc = ra_exe_unit.input_descs.front();
2985  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2986  return false;
2987  }
2988  const int table_id = table_desc.getTableId();
2989  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
2990  const auto comp_expr =
2991  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2992  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2993  return false;
2994  }
2995  const auto lhs = comp_expr->get_left_operand();
2996  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2997  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2998  return false;
2999  }
3000  const auto rhs = comp_expr->get_right_operand();
3001  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3002  if (!rhs_const) {
3003  return false;
3004  }
3005  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3006  if (cd->isVirtualCol) {
3007  CHECK_EQ("rowid", cd->columnName);
3008  return true;
3009  }
3010  }
3011  return false;
3012 }
3013 
3015  const RelAlgExecutor::WorkUnit& work_unit,
3016  const std::vector<TargetMetaInfo>& targets_meta,
3017  const bool is_agg,
3018  const CompilationOptions& co,
3019  const ExecutionOptions& eo,
3020  RenderInfo* render_info,
3021  const bool was_multifrag_kernel_launch,
3022  const int64_t queue_time_ms) {
3023  // Disable the bump allocator
3024  // Note that this will have basically the same affect as using the bump allocator for
3025  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3026  // path and the bump allocator path for kernel per fragment execution.
3027  auto ra_exe_unit_in = work_unit.exe_unit;
3028  ra_exe_unit_in.use_bump_allocator = false;
3029 
3030  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3031  co.device_type,
3033  nullptr,
3034  executor_->getCatalog(),
3035  executor_->blockSize(),
3036  executor_->gridSize()),
3037  {}};
3038 
3039  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3040  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3041  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3042  false,
3043  false,
3044  eo.allow_loop_joins,
3045  eo.with_watchdog,
3046  eo.jit_debug,
3047  false,
3050  false,
3051  false,
3053  false,
3055  eo.executor_type,
3057 
3058  if (was_multifrag_kernel_launch) {
3059  try {
3060  // Attempt to retry using the kernel per fragment path. The smaller input size
3061  // required may allow the entire kernel to execute in GPU memory.
3062  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3063  "kernels disabled.";
3064  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3065  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3066  ColumnCacheMap column_cache;
3067  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3068  is_agg,
3069  table_infos,
3070  ra_exe_unit,
3071  co,
3072  eo_no_multifrag,
3073  cat_,
3074  nullptr,
3075  true,
3076  column_cache),
3077  targets_meta};
3078  result.setQueueTime(queue_time_ms);
3079  } catch (const QueryExecutionError& e) {
3081  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3082  }
3083  }
3084 
3085  if (render_info) {
3086  render_info->setForceNonInSituData();
3087  }
3088 
3089  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3090  // Only reset the group buffer entry guess if we ran out of slots, which
3091  // suggests a
3092  // highly pathological input which prevented a good estimation of distinct tuple
3093  // count. For projection queries, this will force a per-fragment scan limit, which is
3094  // compatible with the CPU path
3095  VLOG(1) << "Resetting max groups buffer entry guess.";
3096  max_groups_buffer_entry_guess = 0;
3097 
3098  int iteration_ctr = -1;
3099  while (true) {
3100  iteration_ctr++;
3102  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3103  ColumnCacheMap column_cache;
3104  try {
3105  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3106  is_agg,
3107  table_infos,
3108  ra_exe_unit,
3109  co_cpu,
3110  eo_no_multifrag,
3111  cat_,
3112  nullptr,
3113  true,
3114  column_cache),
3115  targets_meta};
3116  } catch (const QueryExecutionError& e) {
3117  // Ran out of slots
3118  if (e.getErrorCode() < 0) {
3119  // Even the conservative guess failed; it should only happen when we group
3120  // by a huge cardinality array. Maybe we should throw an exception instead?
3121  // Such a heavy query is entirely capable of exhausting all the host memory.
3122  CHECK(max_groups_buffer_entry_guess);
3123  // Only allow two iterations of increasingly large entry guesses up to a maximum
3124  // of 512MB per column per kernel
3125  if (g_enable_watchdog || iteration_ctr > 1) {
3126  throw std::runtime_error("Query ran out of output slots in the result");
3127  }
3128  max_groups_buffer_entry_guess *= 2;
3129  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3130  "groups buffer entry "
3131  "guess equal to "
3132  << max_groups_buffer_entry_guess;
3133  } else {
3135  }
3136  continue;
3137  }
3138  result.setQueueTime(queue_time_ms);
3139  return result;
3140  }
3141  return result;
3142 }
3143 
3144 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3145  LOG(ERROR) << "Query execution failed with error "
3146  << getErrorMessageFromCode(error_code);
3147  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3148  // We ran out of GPU memory, this doesn't count as an error if the query is
3149  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3150  // --allow-cpu-retry.
3151  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3152  if (!g_allow_cpu_retry) {
3153  throw std::runtime_error(
3154  "Query ran out of GPU memory, unable to automatically retry on CPU");
3155  }
3156  return;
3157  }
3158  throw std::runtime_error(getErrorMessageFromCode(error_code));
3159 }
3160 
3161 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3162  if (error_code < 0) {
3163  return "Ran out of slots in the query output buffer";
3164  }
3165  switch (error_code) {
3167  return "Division by zero";
3169  return "Query couldn't keep the entire working set of columns in GPU memory";
3171  return "Self joins not supported yet";
3173  return "Not enough host memory to execute the query";
3175  return "Overflow or underflow";
3177  return "Query execution has exceeded the time limit";
3179  return "Query execution has been interrupted";
3181  return "Columnar conversion not supported for variable length types";
3183  return "Too many literals in the query";
3185  return "NONE ENCODED String types are not supported as input result set.";
3187  return "Not enough OpenGL memory to render the query results";
3189  return "Streaming-Top-N not supported in Render Query";
3191  return "Multiple distinct values encountered";
3192  case Executor::ERR_GEOS:
3193  return "Geos call failure";
3194  }
3195  return "Other error: code " + std::to_string(error_code);
3196 }
3197 
3200  VLOG(1) << "Running post execution callback.";
3201  (*post_execution_callback_)();
3202  }
3203 }
3204 
3206  const SortInfo& sort_info,
3207  const ExecutionOptions& eo) {
3208  const auto compound = dynamic_cast<const RelCompound*>(node);
3209  if (compound) {
3210  return createCompoundWorkUnit(compound, sort_info, eo);
3211  }
3212  const auto project = dynamic_cast<const RelProject*>(node);
3213  if (project) {
3214  return createProjectWorkUnit(project, sort_info, eo);
3215  }
3216  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3217  if (aggregate) {
3218  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3219  }
3220  const auto filter = dynamic_cast<const RelFilter*>(node);
3221  if (filter) {
3222  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3223  }
3224  LOG(FATAL) << "Unhandled node type: " << node->toString();
3225  return {};
3226 }
3227 
3228 namespace {
3229 
3231  auto sink = get_data_sink(ra);
3232  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3233  return join->getJoinType();
3234  }
3235  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3236  return JoinType::INNER;
3237  }
3238 
3239  return JoinType::INVALID;
3240 }
3241 
3242 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3243  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3244  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3245  return nullptr;
3246  }
3247  const auto equi_join_condition =
3248  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3249  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3250  return nullptr;
3251  }
3252  const auto both_are_null_condition =
3253  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3254  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3255  both_are_null_condition->size() != 2) {
3256  return nullptr;
3257  }
3258  const auto lhs_is_null =
3259  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3260  const auto rhs_is_null =
3261  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3262  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3263  rhs_is_null->getOperator() != kISNULL) {
3264  return nullptr;
3265  }
3266  CHECK_EQ(size_t(1), lhs_is_null->size());
3267  CHECK_EQ(size_t(1), rhs_is_null->size());
3268  CHECK_EQ(size_t(2), equi_join_condition->size());
3269  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3270  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3271  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3272  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3273  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3274  return nullptr;
3275  }
3276  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3277  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3278  RexDeepCopyVisitor deep_copy_visitor;
3279  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3280  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3281  eq_operands.emplace_back(lhs_op_copy.release());
3282  eq_operands.emplace_back(rhs_op_copy.release());
3283  return boost::make_unique<const RexOperator>(
3284  kBW_EQ, eq_operands, equi_join_condition->getType());
3285  }
3286  return nullptr;
3287 }
3288 
3289 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3290  const RexScalar* scalar) {
3291  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3292  if (condition && condition->getOperator() == kAND) {
3293  CHECK_GE(condition->size(), size_t(2));
3294  auto acc = get_bitwise_equals(condition->getOperand(0));
3295  if (!acc) {
3296  return nullptr;
3297  }
3298  for (size_t i = 1; i < condition->size(); ++i) {
3299  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3300  and_operands.emplace_back(std::move(acc));
3301  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3302  acc =
3303  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3304  }
3305  return acc;
3306  }
3307  return get_bitwise_equals(scalar);
3308 }
3309 
3310 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3311  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3312  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3313  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3314  ++nesting_level) {
3315  if (left_deep_join->getOuterCondition(nesting_level)) {
3316  join_types[nesting_level - 1] = JoinType::LEFT;
3317  }
3318  }
3319  return join_types;
3320 }
3321 
3322 template <class RA>
3323 std::vector<size_t> do_table_reordering(
3324  std::vector<InputDescriptor>& input_descs,
3325  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3326  const JoinQualsPerNestingLevel& left_deep_join_quals,
3327  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3328  const RA* node,
3329  const std::vector<InputTableInfo>& query_infos,
3330  const Executor* executor) {
3331  if (g_cluster) {
3332  // Disable table reordering in distributed mode. The aggregator does not have enough
3333  // information to break ties
3334  return {};
3335  }
3336  const auto& cat = *executor->getCatalog();
3337  for (const auto& table_info : query_infos) {
3338  if (table_info.table_id < 0) {
3339  continue;
3340  }
3341  const auto td = cat.getMetadataForTable(table_info.table_id);
3342  CHECK(td);
3343  if (table_is_replicated(td)) {
3344  return {};
3345  }
3346  }
3347  const auto input_permutation =
3348  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3349  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3350  std::tie(input_descs, input_col_descs, std::ignore) =
3351  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3352  return input_permutation;
3353 }
3354 
3356  const RelLeftDeepInnerJoin* left_deep_join) {
3357  std::vector<size_t> input_sizes;
3358  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3359  const auto inputs = get_node_output(left_deep_join->getInput(i));
3360  input_sizes.push_back(inputs.size());
3361  }
3362  return input_sizes;
3363 }
3364 
3365 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3366  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3367  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3368  for (const auto& qual : quals) {
3369  const auto rewritten_qual = rewrite_expr(qual.get());
3370  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3371  }
3372  return rewritten_quals;
3373 }
3374 
3375 } // namespace
3376 
3378  const RelCompound* compound,
3379  const SortInfo& sort_info,
3380  const ExecutionOptions& eo) {
3381  std::vector<InputDescriptor> input_descs;
3382  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3383  auto input_to_nest_level = get_input_nest_levels(compound, {});
3384  std::tie(input_descs, input_col_descs, std::ignore) =
3385  get_input_desc(compound, input_to_nest_level, {}, cat_);
3386  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3387  const auto query_infos = get_table_infos(input_descs, executor_);
3388  CHECK_EQ(size_t(1), compound->inputCount());
3389  const auto left_deep_join =
3390  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3391  JoinQualsPerNestingLevel left_deep_join_quals;
3392  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3393  : std::vector<JoinType>{get_join_type(compound)};
3394  std::vector<size_t> input_permutation;
3395  std::vector<size_t> left_deep_join_input_sizes;
3396  if (left_deep_join) {
3397  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3398  left_deep_join_quals = translateLeftDeepJoinFilter(
3399  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3401  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3402  join_types.end()) {
3403  input_permutation = do_table_reordering(input_descs,
3404  input_col_descs,
3405  left_deep_join_quals,
3406  input_to_nest_level,
3407  compound,
3408  query_infos,
3409  executor_);
3410  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3411  std::tie(input_descs, input_col_descs, std::ignore) =
3412  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3413  left_deep_join_quals = translateLeftDeepJoinFilter(
3414  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3415  }
3416  }
3417  RelAlgTranslator translator(cat_,
3418  query_state_,
3419  executor_,
3420  input_to_nest_level,
3421  join_types,
3422  now_,
3423  eo.just_explain);
3424  const auto scalar_sources =
3425  translate_scalar_sources(compound, translator, eo.executor_type);
3426  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3427  const auto quals_cf = translate_quals(compound, translator);
3428  const auto target_exprs = translate_targets(target_exprs_owned_,
3429  scalar_sources,
3430  groupby_exprs,
3431  compound,
3432  translator,
3433  eo.executor_type);
3434  CHECK_EQ(compound->size(), target_exprs.size());
3435  const RelAlgExecutionUnit exe_unit = {
3436  input_descs,
3437  input_col_descs,
3438  quals_cf.simple_quals,
3439  rewrite_quals(quals_cf.quals),
3440  left_deep_join_quals,
3441  groupby_exprs,
3442  target_exprs,
3443  nullptr,
3444  sort_info,
3445  0,
3446  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults(),
3447  false,
3448  std::nullopt,
3449  query_state_};
3450  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3451  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3452  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3453  compound->setOutputMetainfo(targets_meta);
3454  return {rewritten_exe_unit,
3455  compound,
3457  std::move(query_rewriter),
3458  input_permutation,
3459  left_deep_join_input_sizes};
3460 }
3461 
3462 namespace {
3463 
3464 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3465  CHECK(qual_expr);
3466  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3467  if (!bin_oper || bin_oper->getOperator() != kAND) {
3468  return {qual_expr};
3469  }
3470  CHECK_GE(bin_oper->size(), size_t(2));
3471  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3472  for (size_t i = 1; i < bin_oper->size(); ++i) {
3473  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3474  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3475  }
3476  return lhs_cf;
3477 }
3478 
3479 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3480  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3481  const SQLOps sql_op) {
3482  CHECK(!factors.empty());
3483  auto acc = factors.front();
3484  for (size_t i = 1; i < factors.size(); ++i) {
3485  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3486  }
3487  return acc;
3488 }
3489 
3490 template <class QualsList>
3491 bool list_contains_expression(const QualsList& haystack,
3492  const std::shared_ptr<Analyzer::Expr>& needle) {
3493  for (const auto& qual : haystack) {
3494  if (*qual == *needle) {
3495  return true;
3496  }
3497  }
3498  return false;
3499 }
3500 
3501 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3502 // evaluations of `p` and allows use of the original form in joins if `p`
3503 // can be used for hash joins.
3504 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3505  const std::shared_ptr<Analyzer::Expr>& expr) {
3506  const auto expr_terms = qual_to_disjunctive_form(expr);
3507  CHECK_GE(expr_terms.size(), size_t(1));
3508  const auto& first_term = expr_terms.front();
3509  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3510  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3511  // First, collect the conjunctive components common to all the disjunctive components.
3512  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3513  for (const auto& first_term_factor : first_term_factors.quals) {
3514  bool is_common =
3515  expr_terms.size() > 1; // Only report common factors for disjunction.
3516  for (size_t i = 1; i < expr_terms.size(); ++i) {
3517  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3518  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3519  is_common = false;
3520  break;
3521  }
3522  }
3523  if (is_common) {
3524  common_factors.push_back(first_term_factor);
3525  }
3526  }
3527  if (common_factors.empty()) {
3528  return expr;
3529  }
3530  // Now that the common expressions are known, collect the remaining expressions.
3531  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3532  for (const auto& term : expr_terms) {
3533  const auto term_cf = qual_to_conjunctive_form(term);
3534  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3535  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3536  for (const auto& qual : term_cf.quals) {
3537  if (!list_contains_expression(common_factors, qual)) {
3538  remaining_quals.push_back(qual);
3539  }
3540  }
3541  if (!remaining_quals.empty()) {
3542  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3543  }
3544  }
3545  // Reconstruct the expression with the transformation applied.
3546  const auto common_expr = build_logical_expression(common_factors, kAND);
3547  if (remaining_terms.empty()) {
3548  return common_expr;
3549  }
3550  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3551  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3552 }
3553 
3554 } // namespace
3555 
3556 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3557  const RexScalar* join_condition,
3558  const std::vector<JoinType>& join_types,
3559  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3560  const bool just_explain) const {
3561  RelAlgTranslator translator(
3562  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3563  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3564  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3565  for (const auto rex_condition_component : rex_condition_cf) {
3566  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3567  const auto join_condition =
3569  bw_equals ? bw_equals.get() : rex_condition_component));
3570  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3571  join_condition_quals.insert(join_condition_quals.end(),
3572  join_condition_cf.quals.begin(),
3573  join_condition_cf.quals.end());
3574  join_condition_quals.insert(join_condition_quals.end(),
3575  join_condition_cf.simple_quals.begin(),
3576  join_condition_cf.simple_quals.end());
3577  }
3578  return combine_equi_join_conditions(join_condition_quals);
3579 }
3580 
3581 // Translate left deep join filter and separate the conjunctive form qualifiers
3582 // per nesting level. The code generated for hash table lookups on each level
3583 // must dominate its uses in deeper nesting levels.
3585  const RelLeftDeepInnerJoin* join,
3586  const std::vector<InputDescriptor>& input_descs,
3587  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3588  const bool just_explain) {
3589  const auto join_types = left_deep_join_types(join);
3590  const auto join_condition_quals = makeJoinQuals(
3591  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3592  MaxRangeTableIndexVisitor rte_idx_visitor;
3593  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3594  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3595  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3596  const auto outer_condition = join->getOuterCondition(rte_idx);
3597  if (outer_condition) {
3598  result[rte_idx - 1].quals =
3599  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3600  CHECK_LE(rte_idx, join_types.size());
3601  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3602  result[rte_idx - 1].type = JoinType::LEFT;
3603  continue;
3604  }
3605  for (const auto& qual : join_condition_quals) {
3606  if (visited_quals.count(qual)) {
3607  continue;
3608  }
3609  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3610  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3611  const auto it_ok = visited_quals.emplace(qual);
3612  CHECK(it_ok.second);
3613  result[rte_idx - 1].quals.push_back(qual);
3614  }
3615  }
3616  CHECK_LE(rte_idx, join_types.size());
3617  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3618  result[rte_idx - 1].type = JoinType::INNER;
3619  }
3620  return result;
3621 }
3622 
3623 namespace {
3624 
3625 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3626  const RelAlgNode* ra_node,
3627  const size_t nest_level,
3628  const std::vector<TargetMetaInfo>& in_metainfo,
3629  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3630  CHECK_LE(size_t(1), ra_node->inputCount());
3631  CHECK_GE(size_t(2), ra_node->inputCount());
3632  const auto input = ra_node->getInput(nest_level);
3633  const auto it_rte_idx = input_to_nest_level.find(input);
3634  CHECK(it_rte_idx != input_to_nest_level.end());
3635  const int rte_idx = it_rte_idx->second;
3636  const int table_id = table_id_from_ra(input);
3637  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3638  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3639  int input_idx = 0;
3640  for (const auto& input_meta : in_metainfo) {
3641  inputs.push_back(
3642  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3643  table_id,
3644  scan_ra ? input_idx + 1 : input_idx,
3645  rte_idx));
3646  ++input_idx;
3647  }
3648  return inputs;
3649 }
3650 
3651 } // namespace
3652 
3654  const RelAggregate* aggregate,
3655  const SortInfo& sort_info,
3656  const bool just_explain) {
3657  std::vector<InputDescriptor> input_descs;
3658  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3659  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3660  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3661  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3662  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3663  const auto join_type = get_join_type(aggregate);
3664 
3665  RelAlgTranslator translator(cat_,
3666  query_state_,
3667  executor_,
3668  input_to_nest_level,
3669  {join_type},
3670  now_,
3671  just_explain);
3672  CHECK_EQ(size_t(1), aggregate->inputCount());
3673  const auto source = aggregate->getInput(0);
3674  const auto& in_metainfo = source->getOutputMetainfo();
3675  const auto scalar_sources =
3676  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3677  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3678  const auto target_exprs = translate_targets(
3679  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3680  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3681  aggregate->setOutputMetainfo(targets_meta);
3682  return {RelAlgExecutionUnit{
3683  input_descs,
3684  input_col_descs,
3685  {},
3686  {},
3687  {},
3688  groupby_exprs,
3689  target_exprs,
3690  nullptr,
3691  sort_info,
3692  0,
3693  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults(),
3694  false,
3695  std::nullopt,
3696  query_state_},
3697  aggregate,
3699  nullptr};
3700 }
3701 
3703  const RelProject* project,
3704  const SortInfo& sort_info,
3705  const ExecutionOptions& eo) {
3706  std::vector<InputDescriptor> input_descs;
3707  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3708  auto input_to_nest_level = get_input_nest_levels(project, {});
3709  std::tie(input_descs, input_col_descs, std::ignore) =
3710  get_input_desc(project, input_to_nest_level, {}, cat_);
3711  const auto query_infos = get_table_infos(input_descs, executor_);
3712 
3713  const auto left_deep_join =
3714  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3715  JoinQualsPerNestingLevel left_deep_join_quals;
3716  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3717  : std::vector<JoinType>{get_join_type(project)};
3718  std::vector<size_t> input_permutation;
3719  std::vector<size_t> left_deep_join_input_sizes;
3720  if (left_deep_join) {
3721  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3722  const auto query_infos = get_table_infos(input_descs, executor_);
3723  left_deep_join_quals = translateLeftDeepJoinFilter(
3724  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3726  input_permutation = do_table_reordering(input_descs,
3727  input_col_descs,
3728  left_deep_join_quals,
3729  input_to_nest_level,
3730  project,
3731  query_infos,
3732  executor_);
3733  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3734  std::tie(input_descs, input_col_descs, std::ignore) =
3735  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3736  left_deep_join_quals = translateLeftDeepJoinFilter(
3737  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3738  }
3739  }
3740 
3741  RelAlgTranslator translator(cat_,
3742  query_state_,
3743  executor_,
3744  input_to_nest_level,
3745  join_types,
3746  now_,
3747  eo.just_explain);
3748  const auto target_exprs_owned =
3749  translate_scalar_sources(project, translator, eo.executor_type);
3750  target_exprs_owned_.insert(
3751  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3752  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3753 
3754  const RelAlgExecutionUnit exe_unit = {
3755  input_descs,
3756  input_col_descs,
3757  {},
3758  {},
3759  left_deep_join_quals,
3760  {nullptr},
3761  target_exprs,
3762  nullptr,
3763  sort_info,
3764  0,
3765  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults(),
3766  false,
3767  std::nullopt,
3768  query_state_};
3769  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3770  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3771  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3772  project->setOutputMetainfo(targets_meta);
3773  return {rewritten_exe_unit,
3774  project,
3776  std::move(query_rewriter),
3777  input_permutation,
3778  left_deep_join_input_sizes};
3779 }
3780 
3781 namespace {
3782 
3783 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
3784  RelAlgNode const* input_node) {
3785  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
3786  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
3787  const int negative_node_id = -input_node->getId();
3788  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
3789  target_exprs.reserve(tmis.size());
3790  for (size_t i = 0; i < tmis.size(); ++i) {
3791  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
3792  tmis[i].get_type_info(), negative_node_id, i, 0));
3793  }
3794  return target_exprs;
3795 }
3796 
3797 } // namespace
3798 
3800  const RelLogicalUnion* logical_union,
3801  const SortInfo& sort_info,
3802  const ExecutionOptions& eo) {
3803  std::vector<InputDescriptor> input_descs;
3804  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3805  // Map ra input ptr to index (0, 1).
3806  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3807  std::tie(input_descs, input_col_descs, std::ignore) =
3808  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3809  const auto query_infos = get_table_infos(input_descs, executor_);
3810  auto const max_num_tuples =
3811  std::accumulate(query_infos.cbegin(),
3812  query_infos.cend(),
3813  size_t(0),
3814  [](auto max, auto const& query_info) {
3815  return std::max(max, query_info.info.getNumTuples());
3816  });
3817 
3818  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3819  for (auto& pair : input_to_nest_level) {
3820  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3821  }
3822 
3823  RelAlgTranslator translator(
3824  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3825 
3826  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3827  CHECK(!input_exprs_owned.empty())
3828  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3829  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3830  for (auto& input_expr : input_exprs_owned) {
3831  VLOG(3) << " " << input_expr->toString();
3832  }
3833  target_exprs_owned_.insert(
3834  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3835  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3836 
3837  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3838  << " input_col_descs=" << shared::printContainer(input_col_descs)
3839  << " target_exprs.size()=" << target_exprs.size()
3840  << " max_num_tuples=" << max_num_tuples;
3841 
3842  const RelAlgExecutionUnit exe_unit = {
3843  input_descs,
3844  input_col_descs,
3845  {}, // quals_cf.simple_quals,
3846  {}, // rewrite_quals(quals_cf.quals),
3847  {},
3848  {nullptr},
3849  target_exprs,
3850  nullptr,
3851  sort_info,
3852  max_num_tuples,
3853  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults(),
3854  false,
3855  logical_union->isAll(),
3856  query_state_};
3857  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3858  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3859 
3860  RelAlgNode const* input0 = logical_union->getInput(0);
3861  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3862  logical_union->setOutputMetainfo(
3863  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3864  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3865  logical_union->setOutputMetainfo(
3866  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3867  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3868  logical_union->setOutputMetainfo(
3869  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3870  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
3871  logical_union->setOutputMetainfo(
3872  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3873  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
3874  logical_union->setOutputMetainfo(
3875  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3876  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
3877  logical_union->setOutputMetainfo(
3878  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3879  } else if (dynamic_cast<const RelSort*>(input0)) {
3880  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
3881  } else {
3882  throw QueryNotSupported("Unsupported input type: " + input0->toString());
3883  }
3884  VLOG(3) << "logical_union->getOutputMetainfo()="
3885  << shared::printContainer(logical_union->getOutputMetainfo())
3886  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
3887  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
3888 
3889  return {rewritten_exe_unit,
3890  logical_union,
3892  std::move(query_rewriter)};
3893 }
3894 
3896  const RelTableFunction* table_func,
3897  const bool just_explain,
3898  const bool is_gpu) {
3899  std::vector<InputDescriptor> input_descs;
3900  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3901  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3902  std::tie(input_descs, input_col_descs, std::ignore) =
3903  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3904  const auto query_infos = get_table_infos(input_descs, executor_);
3905  RelAlgTranslator translator(
3906  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
3907  const auto input_exprs_owned =
3908  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3909  target_exprs_owned_.insert(
3910  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3911  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3912 
3913  const auto table_function_impl = [=]() {
3914  if (is_gpu) {
3915  try {
3916  return bind_table_function(
3917  table_func->getFunctionName(), input_exprs_owned, is_gpu);
3918  } catch (ExtensionFunctionBindingError& e) {
3919  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
3920  << " Redirecting " << table_func->getFunctionName()
3921  << " to run on CPU.";
3922  throw QueryMustRunOnCpu();
3923  }
3924  } else {
3925  try {
3926  return bind_table_function(
3927  table_func->getFunctionName(), input_exprs_owned, is_gpu);
3928  } catch (ExtensionFunctionBindingError& e) {
3929  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
3930  throw;
3931  }
3932  }
3933  }();
3934 
3935  size_t output_row_sizing_param = 0;
3936  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
3937  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
3938  const auto parameter_index = table_function_impl.getOutputRowSizeParameter();
3939  CHECK_GT(parameter_index, size_t(0));
3940  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3941  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3942  if (!parameter_expr_literal) {
3943  throw std::runtime_error(
3944  "Provided output buffer sizing parameter is not a literal. Only literal "
3945  "values are supported with output buffer sizing configured table "
3946  "functions.");
3947  }
3948  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3949  if (literal_val < 0) {
3950  throw std::runtime_error("Provided output sizing parameter " +
3951  std::to_string(literal_val) +
3952  " must be positive integer.");
3953  }
3954  output_row_sizing_param = static_cast<size_t>(literal_val);
3955  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
3956  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
3957  } else {
3958  UNREACHABLE();
3959  }
3960 
3961  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3962  size_t index = 0;
3963  for (auto input_expr : input_exprs) {
3964  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3965  input_expr->set_type_info(table_function_impl.getInputSQLType(index));
3966  input_col_exprs.push_back(col_var);
3967  }
3968  index++;
3969  }
3970  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3971  std::vector<Analyzer::Expr*> table_func_outputs;
3972  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3973  const auto ti = table_function_impl.getOutputSQLType(i);
3974  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3975  table_func_outputs.push_back(target_exprs_owned_.back().get());
3976  }
3977  const TableFunctionExecutionUnit exe_unit = {
3978  input_descs,
3979  input_col_descs,
3980  input_exprs, // table function inputs
3981  input_col_exprs, // table function column inputs (duplicates w/ above)
3982  table_func_outputs, // table function projected exprs
3983  output_row_sizing_param, // output buffer sizing param
3984  table_function_impl};
3985  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3986  table_func->setOutputMetainfo(targets_meta);
3987  return {exe_unit, table_func};
3988 }
3989 
3990 namespace {
3991 
3992 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3994  const RelAlgTranslator& translator,
3995  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3996  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3997  std::vector<TargetMetaInfo> in_metainfo;
3998  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3999  const auto data_sink_node = get_data_sink(filter);
4000  auto input_it = inputs_owned.begin();
4001  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4002  const auto source = data_sink_node->getInput(nest_level);
4003  const auto scan_source = dynamic_cast<const RelScan*>(source);
4004  if (scan_source) {
4005  CHECK(source->getOutputMetainfo().empty());
4006  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4007  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4008  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4009  }
4010  const auto source_metadata =
4011  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
4012  in_metainfo.insert(
4013  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4014  exprs_owned.insert(
4015  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4016  } else {
4017  const auto& source_metadata = source->getOutputMetainfo();
4018  input_it += source_metadata.size();
4019  in_metainfo.insert(
4020  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4021  const auto scalar_sources_owned = synthesize_inputs(
4022  data_sink_node, nest_level, source_metadata, input_to_nest_level);
4023  exprs_owned.insert(
4024  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4025  }
4026  }
4027  return std::make_pair(in_metainfo, exprs_owned);
4028 }
4029 
4030 } // namespace
4031 
4033  const SortInfo& sort_info,
4034  const bool just_explain) {
4035  CHECK_EQ(size_t(1), filter->inputCount());
4036  std::vector<InputDescriptor> input_descs;
4037  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4038  std::vector<TargetMetaInfo> in_metainfo;
4039  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4040  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4041 
4042  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4043  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4044  get_input_desc(filter, input_to_nest_level, {}, cat_);
4045  const auto join_type = get_join_type(filter);
4046  RelAlgTranslator translator(cat_,
4047  query_state_,
4048  executor_,
4049  input_to_nest_level,
4050  {join_type},
4051  now_,
4052  just_explain);
4053  std::tie(in_metainfo, target_exprs_owned) =
4054  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4055  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4056  const auto qual = fold_expr(filter_expr.get());
4057  target_exprs_owned_.insert(
4058  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4059  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4060  filter->setOutputMetainfo(in_metainfo);
4061  const auto rewritten_qual = rewrite_expr(qual.get());
4062  return {{input_descs,
4063  input_col_descs,
4064  {},
4065  {rewritten_qual ? rewritten_qual : qual},
4066  {},
4067  {nullptr},
4068  target_exprs,
4069  nullptr,
4070  sort_info,
4071  0,
4072  query_dag_ ? query_dag_->getQueryHints() : QueryHint::defaults()},
4073  filter,
4075  nullptr};
4076 }
4077 
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string cat(Ts &&...args)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1223
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1079
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
Definition: sqltypes.h:48
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3026
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ExecutorDeviceType
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
size_t getIndex() const
bool is_metadata_placeholder(const ChunkMetadata &metadata)
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:75
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:113
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:914
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
bool g_enable_union
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3791
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const RexScalar * getCondition() const
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:334
Definition: sqldefs.h:38
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1669
#define UNREACHABLE()
Definition: Logger.h:241
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:893
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1085
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1081
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:259
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
int32_t intval
Definition: sqltypes.h:205
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:287
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:461
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1082
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:199
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, int database_id)
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1083
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1080
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:224
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1071
static SysCatalog & instance()
Definition: SysCatalog.h:286
const bool allow_multifrag
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:84
const bool just_validate
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Classes representing a parse tree.
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
const SortInfo sort_info
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1075
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
static QueryHint defaults()
Definition: QueryHint.h:74
SortAlgorithm
const double gpu_input_mem_limit_percent
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:100
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1077
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1078
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
void executePostExecutionCallback()
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int getDatabaseId() const
Definition: Catalog.h:274
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
bool is_dict_encoded_type() const
Definition: sqltypes.h:516
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1074
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1443
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1098
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1084
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1072
size_t getTableFuncInputsSize() const
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1547
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:211
int get_result_table_id() const
Definition: Analyzer.h:1626
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:272
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:101
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const std::vector< size_t > outer_fragment_indices
const RexScalar * getProjectAt(const size_t idx) const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
Definition: sqltypes.h:51
bool hasInput(const RelAlgNode *needle) const
Definition: sqltypes.h:52
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
const table_functions::TableFunction bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:250
const bool just_calcite_explain
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:922
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::shared_ptr< const query_state::QueryState > query_state_
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
Basic constructors and methods of the row set interface.
Datum get_constval() const
Definition: Analyzer.h:335
Definition: sqldefs.h:76
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
const RelAlgNode * getSourceNode() const
bool isAggregate() const
size_t getLimit() const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
Definition: sqltypes.h:40
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
const RexScalar * getTableFuncInputAt(const size_t idx) const
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1095
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
const unsigned pending_query_interrupt_freq
bool g_enable_bump_allocator
Definition: Execute.cpp:107
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getInnerCondition() const
virtual std::string toString() const =0
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:490
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:81
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
void cleanupPostExecution()
std::vector< std::string > ColumnNameList
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
Definition: sqltypes.h:44
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
bool is_string() const
Definition: sqltypes.h:478
const size_t inputCount() const
const bool allow_runtime_query_interrupt
int8_t * numbersPtr
Definition: sqltypes.h:217
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
static constexpr char const * FOREIGN_TABLE
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Definition: sqldefs.h:74
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:1627
bool sameTypeInfo(std::vector< TargetMetaInfo > const &lhs, std::vector< TargetMetaInfo > const &rhs)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1451
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1076
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:697
const TableDescriptor * getTableDescriptor() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
Definition: sqldefs.h:72
const std::vector< std::shared_ptr< TargetEntry > > & get_targetlist() const
Definition: Analyzer.h:1610
static size_t getArenaBlockSize()
Definition: Execute.cpp:204
Executor * executor_
int getNestLevel() const
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:242
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1542
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:187
bool g_enable_table_functions
Definition: Execute.cpp:102
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
static const Executo