OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 #include "Parser/ParserNode.h"
33 #include "QueryEngine/RexVisitor.h"
36 #include "Shared/measure.h"
37 #include "Shared/misc.h"
38 #include "Shared/shard_key.h"
39 
40 #include <boost/algorithm/cxx11/any_of.hpp>
41 #include <boost/range/adaptor/reversed.hpp>
42 
43 #include <algorithm>
44 #include <functional>
45 #include <numeric>
46 
48 extern bool g_enable_bump_allocator;
49 bool g_enable_interop{false};
50 bool g_enable_union{false};
51 
52 namespace {
53 
54 bool node_is_aggregate(const RelAlgNode* ra) {
55  const auto compound = dynamic_cast<const RelCompound*>(ra);
56  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
57  return ((compound && compound->isAggregate()) || aggregate);
58 }
59 
60 std::unordered_set<PhysicalInput> get_physical_inputs(
62  const RelAlgNode* ra) {
63  auto phys_inputs = get_physical_inputs(ra);
64  std::unordered_set<PhysicalInput> phys_inputs2;
65  for (auto& phi : phys_inputs) {
66  phys_inputs2.insert(
67  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
68  }
69  return phys_inputs2;
70 }
71 
72 bool is_metadata_placeholder(const ChunkMetadata& metadata) {
73  // Only supported type for now
75  return metadata.chunkStats.min.intval > metadata.chunkStats.max.intval;
76 }
77 
79  const RelAlgNode& ra_node,
80  int database_id,
81  const std::shared_ptr<const query_state::QueryState>& query_state_) {
82  // Iterate through ra_node inputs for types that need to be loaded pre-execution
83  // If they do not have valid metadata, load them into CPU memory to generate
84  // the metadata and leave them ready to be used by the query
85  auto catalog = Catalog_Namespace::Catalog::checkedGet((database_id));
86  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
87  int table_id = physical_input.table_id;
88  auto table = catalog->getMetadataForTable(table_id, false);
89  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
90  int col_id = catalog->getColumnIdBySpi(table_id, physical_input.col_id);
91  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
92  auto foreign_table = catalog->getForeignTable(table_id);
93  if (foreign_table->foreign_server->data_wrapper_type ==
95  col_desc->columnType.is_dict_encoded_type()) {
96  CHECK(foreign_table->fragmenter != nullptr);
97  for (const auto& fragment :
98  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
99  ChunkKey chunk_key = {database_id, table_id, col_id, fragment.fragmentId};
100  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
101  CHECK(metadata_map.find(col_id) != metadata_map.end());
102  if (is_metadata_placeholder(*(metadata_map.at(col_id)))) {
103  // When this goes out of scope it will stay in CPU cache but become
104  // evictable
105  std::shared_ptr<Chunk_NS::Chunk> chunk =
106  Chunk_NS::Chunk::getChunk(col_desc,
107  &(catalog->getDataMgr()),
108  chunk_key,
110  0,
111  0,
112  0);
113  }
114  }
115  }
116  }
117  }
118 }
119 
120 } // namespace
121 
123  const ExecutionOptions& eo) {
124  if (eo.find_push_down_candidates) {
125  return 0;
126  }
127 
128  if (eo.just_explain) {
129  return 0;
130  }
131 
132  CHECK(query_dag_);
133 
134  query_dag_->resetQueryExecutionState();
135  const auto& ra = query_dag_->getRootNode();
136 
137  mapd_shared_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
138  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
139  const auto phys_inputs = get_physical_inputs(cat_, &ra);
140  const auto phys_table_ids = get_physical_table_inputs(&ra);
141  executor_->setCatalog(&cat_);
142  executor_->setupCaching(phys_inputs, phys_table_ids);
143 
144  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
145  auto ed_seq = RaExecutionSequence(&ra);
146 
147  if (!getSubqueries().empty()) {
148  return 0;
149  }
150 
151  CHECK(!ed_seq.empty());
152  if (ed_seq.size() > 1) {
153  return 0;
154  }
155 
156  decltype(temporary_tables_)().swap(temporary_tables_);
157  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
158  executor_->catalog_ = &cat_;
159  executor_->temporary_tables_ = &temporary_tables_;
160 
162  auto exec_desc_ptr = ed_seq.getDescriptor(0);
163  CHECK(exec_desc_ptr);
164  auto& exec_desc = *exec_desc_ptr;
165  const auto body = exec_desc.getBody();
166  if (body->isNop()) {
167  return 0;
168  }
169 
170  const auto project = dynamic_cast<const RelProject*>(body);
171  if (project) {
172  auto work_unit =
173  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
174 
175  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
176  executor_);
177  }
178 
179  const auto compound = dynamic_cast<const RelCompound*>(body);
180  if (compound) {
181  if (compound->isDeleteViaSelect()) {
182  return 0;
183  } else if (compound->isUpdateViaSelect()) {
184  return 0;
185  } else {
186  if (compound->isAggregate()) {
187  return 0;
188  }
189 
190  const auto work_unit =
191  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
192 
193  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
194  executor_);
195  }
196  }
197 
198  return 0;
199 }
200 
202  const ExecutionOptions& eo,
203  const bool just_explain_plan,
204  RenderInfo* render_info) {
205  CHECK(query_dag_);
206  auto timer = DEBUG_TIMER(__func__);
208 
209  try {
210  return executeRelAlgQueryNoRetry(co, eo, just_explain_plan, render_info);
211  } catch (const QueryMustRunOnCpu&) {
212  if (!g_allow_cpu_retry) {
213  throw;
214  }
215  }
216  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
217  auto co_cpu = CompilationOptions::makeCpuOnly(co);
218 
219  if (render_info) {
220  render_info->setForceNonInSituData();
221  }
222  return executeRelAlgQueryNoRetry(co_cpu, eo, just_explain_plan, render_info);
223 }
224 
225 namespace {
226 
228  mapd_shared_lock<mapd_shared_mutex> shared_lock;
229  mapd_unique_lock<mapd_shared_mutex> unique_lock;
230 };
231 
232 } // namespace
233 
235  const ExecutionOptions& eo,
236  const bool just_explain_plan,
237  RenderInfo* render_info) {
239  auto timer = DEBUG_TIMER(__func__);
240  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
241 
242  query_dag_->resetQueryExecutionState();
243  const auto& ra = query_dag_->getRootNode();
244 
245  // capture the lock acquistion time
246  auto clock_begin = timer_start();
248  executor_->resetInterrupt();
249  }
250  std::string query_session = "";
251  std::string query_str = "N/A";
253  // a request of query execution without session id can happen, i.e., test query
254  // if so, we turn back to the original way: a runtime query interrupt
255  // without per-session management (as similar to dynamic watchdog)
256  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
257  executor_->executor_session_mutex_);
258  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
259  query_session = query_state_->getConstSessionInfo()->get_session_id();
260  query_str = query_state_->getQueryStr();
261  } else if (executor_->getCurrentQuerySession(session_read_lock) != query_session) {
262  query_session = executor_->getCurrentQuerySession(session_read_lock);
263  }
264 
265  session_read_lock.unlock();
266  if (query_session != "") {
267  // if session is valid, then we allow per-session runtime query interrupt
268  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
269  executor_->executor_session_mutex_);
270  executor_->addToQuerySessionList(query_session, query_str, session_write_lock);
271  session_write_lock.unlock();
272  // hybrid spinlock. if it fails to acquire a lock, then
273  // it sleeps {g_pending_query_interrupt_freq} millisecond.
274  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
275  // failed to get the spinlock: check whether query is interrupted
276  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
277  executor_->executor_session_mutex_);
278  bool isQueryInterrupted =
279  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
280  session_read_lock.unlock();
281  if (isQueryInterrupted) {
282  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
283  executor_->executor_session_mutex_);
284  executor_->removeFromQuerySessionList(query_session, session_write_lock);
285  session_write_lock.unlock();
286  throw std::runtime_error(
287  "Query execution has been interrupted (pending query)");
288  }
289  // here it fails to acquire the lock
290  std::this_thread::sleep_for(
291  std::chrono::milliseconds(g_pending_query_interrupt_freq));
292  };
293  }
294  // currently, atomic_flag does not provide a way to get its current status,
295  // i.e., spinlock.is_locked(), so we additionally lock the execute_mutex_
296  // right after acquiring spinlock to let other part of the code can know
297  // whether there exists a running query on the executor
298  }
299  auto aquire_execute_mutex = [](Executor* executor) -> ExecutorMutexHolder {
300  ExecutorMutexHolder ret;
301  if (executor->executor_id_ == Executor::UNITARY_EXECUTOR_ID) {
302  // Only one unitary executor can run at a time
303  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(executor->execute_mutex_);
304  } else {
305  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(executor->execute_mutex_);
306  }
307  return ret;
308  };
309  auto lock = aquire_execute_mutex(executor_);
310  ScopeGuard clearRuntimeInterruptStatus = [this] {
311  // reset the runtime query interrupt status
313  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
314  executor_->executor_session_mutex_);
315  std::string curSession = executor_->getCurrentQuerySession(session_read_lock);
316  session_read_lock.unlock();
317  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
318  executor_->executor_session_mutex_);
319  executor_->removeFromQuerySessionList(curSession, session_write_lock);
320  executor_->invalidateRunningQuerySession(session_write_lock);
321  executor_->execute_spin_lock_.clear(std::memory_order_release);
322  session_write_lock.unlock();
323  executor_->resetInterrupt();
324  VLOG(1) << "RESET runtime query interrupt status of Executor " << this;
325  }
326  };
327 
329  // check whether this query session is already interrupted
330  // this case occurs when there is very short gap between being interrupted and
331  // taking the execute lock
332  // if so we interrupt the query session and remove it from the running session list
333  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
334  executor_->executor_session_mutex_);
335  bool isAlreadyInterrupted =
336  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
337  session_read_lock.unlock();
338  if (isAlreadyInterrupted) {
339  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
340  executor_->executor_session_mutex_);
341  executor_->removeFromQuerySessionList(query_session, session_write_lock);
342  session_write_lock.unlock();
343  throw std::runtime_error("Query execution has been interrupted");
344  }
345 
346  // make sure to set the running session ID
347  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
348  executor_->executor_session_mutex_);
349  executor_->invalidateRunningQuerySession(session_write_lock);
350  executor_->setCurrentQuerySession(query_session, session_write_lock);
351  session_write_lock.unlock();
352  }
353 
354  // Notify foreign tables to load prior to caching
356 
357  int64_t queue_time_ms = timer_stop(clock_begin);
358  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
359  const auto phys_inputs = get_physical_inputs(cat_, &ra);
360  const auto phys_table_ids = get_physical_table_inputs(&ra);
361  executor_->setCatalog(&cat_);
362  executor_->setupCaching(phys_inputs, phys_table_ids);
363 
364  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
365  auto ed_seq = RaExecutionSequence(&ra);
366 
367  if (just_explain_plan) {
368  std::stringstream ss;
369  std::vector<const RelAlgNode*> nodes;
370  for (size_t i = 0; i < ed_seq.size(); i++) {
371  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
372  }
373  size_t ctr = nodes.size();
374  size_t tab_ctr = 0;
375  for (auto& body : boost::adaptors::reverse(nodes)) {
376  const auto index = ctr--;
377  const auto tabs = std::string(tab_ctr++, '\t');
378  CHECK(body);
379  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
380  if (auto sort = dynamic_cast<const RelSort*>(body)) {
381  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
382  }
383  if (dynamic_cast<const RelProject*>(body) ||
384  dynamic_cast<const RelCompound*>(body)) {
385  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
386  ss << tabs << " : " << join->toString() << "\n";
387  }
388  }
389  }
390  const auto& subqueries = getSubqueries();
391  if (!subqueries.empty()) {
392  ss << "Subqueries: "
393  << "\n";
394  for (const auto& subquery : subqueries) {
395  const auto ra = subquery->getRelAlg();
396  ss << "\t" << ra->toString() << "\n";
397  }
398  }
399  auto rs = std::make_shared<ResultSet>(ss.str());
400  return {rs, {}};
401  }
402 
403  if (render_info) {
404  // set render to be non-insitu in certain situations.
406  ed_seq.size() > 1) {
407  // old logic
408  // disallow if more than one ED
409  render_info->setInSituDataIfUnset(false);
410  }
411  }
412 
413  if (eo.find_push_down_candidates) {
414  // this extra logic is mainly due to current limitations on multi-step queries
415  // and/or subqueries.
417  ed_seq, co, eo, render_info, queue_time_ms);
418  }
419  timer_setup.stop();
420 
421  // Dispatch the subqueries first
422  for (auto subquery : getSubqueries()) {
423  const auto subquery_ra = subquery->getRelAlg();
424  CHECK(subquery_ra);
425  if (subquery_ra->hasContextData()) {
426  continue;
427  }
428  // Execute the subquery and cache the result.
429  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
430  RaExecutionSequence subquery_seq(subquery_ra);
431  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
432  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
433  }
434  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
435 }
436 
438  AggregatedColRange agg_col_range_cache;
439  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
440  return executor_->computeColRangesCache(phys_inputs);
441 }
442 
444  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
445  return executor_->computeStringDictionaryGenerations(phys_inputs);
446 }
447 
449  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
450  return executor_->computeTableGenerations(phys_table_ids);
451 }
452 
453 Executor* RelAlgExecutor::getExecutor() const {
454  return executor_;
455 }
456 
458  CHECK(executor_);
459  executor_->row_set_mem_owner_ = nullptr;
460  executor_->lit_str_dict_proxy_ = nullptr;
461 }
462 
463 namespace {
464 
465 inline void check_sort_node_source_constraint(const RelSort* sort) {
466  CHECK_EQ(size_t(1), sort->inputCount());
467  const auto source = sort->getInput(0);
468  if (dynamic_cast<const RelSort*>(source)) {
469  throw std::runtime_error("Sort node not supported as input to another sort");
470  }
471 }
472 
473 } // namespace
474 
476  const RaExecutionSequence& seq,
477  const size_t step_idx,
478  const CompilationOptions& co,
479  const ExecutionOptions& eo,
480  RenderInfo* render_info) {
481  INJECT_TIMER(executeRelAlgQueryStep);
482  auto exe_desc_ptr = seq.getDescriptor(step_idx);
483  CHECK(exe_desc_ptr);
484  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
485 
486  size_t shard_count{0};
487  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
488  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
489  };
490 
491  if (sort) {
493  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
495  source_work_unit.exe_unit, *executor_->getCatalog());
496  if (!shard_count) {
497  // No point in sorting on the leaf, only execute the input to the sort node.
498  CHECK_EQ(size_t(1), sort->inputCount());
499  const auto source = sort->getInput(0);
500  if (sort->collationCount() || node_is_aggregate(source)) {
501  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
502  CHECK_EQ(temp_seq.size(), size_t(1));
503  ExecutionOptions eo_copy = {
505  eo.allow_multifrag,
506  eo.just_explain,
507  eo.allow_loop_joins,
508  eo.with_watchdog,
509  eo.jit_debug,
510  eo.just_validate || sort->isEmptyResult(),
518  eo.executor_type,
519  };
520  // Use subseq to avoid clearing existing temporary tables
521  return {
522  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
523  merge_type(source),
524  source->getId(),
525  false};
526  }
527  }
528  }
529  return {executeRelAlgSubSeq(seq,
530  std::make_pair(step_idx, step_idx + 1),
531  co,
532  eo,
533  render_info,
535  merge_type(exe_desc_ptr->getBody()),
536  exe_desc_ptr->getBody()->getId(),
537  false};
538 }
539 
541  const AggregatedColRange& agg_col_range,
542  const StringDictionaryGenerations& string_dictionary_generations,
543  const TableGenerations& table_generations) {
544  // capture the lock acquistion time
545  auto clock_begin = timer_start();
547  executor_->resetInterrupt();
548  }
549  queue_time_ms_ = timer_stop(clock_begin);
550  executor_->row_set_mem_owner_ =
551  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
552  executor_->table_generations_ = table_generations;
553  executor_->agg_col_range_cache_ = agg_col_range;
554  executor_->string_dictionary_generations_ = string_dictionary_generations;
555 }
556 
558  const CompilationOptions& co,
559  const ExecutionOptions& eo,
560  RenderInfo* render_info,
561  const int64_t queue_time_ms,
562  const bool with_existing_temp_tables) {
564  auto timer = DEBUG_TIMER(__func__);
565  if (!with_existing_temp_tables) {
566  decltype(temporary_tables_)().swap(temporary_tables_);
567  }
568  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
569  executor_->catalog_ = &cat_;
570  executor_->temporary_tables_ = &temporary_tables_;
571 
572  time(&now_);
573  CHECK(!seq.empty());
574  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
575 
576  for (size_t i = 0; i < exec_desc_count; i++) {
577  VLOG(1) << "Executing query step " << i;
578  // only render on the last step
579  try {
580  executeRelAlgStep(seq,
581  i,
582  co,
583  eo,
584  (i == exec_desc_count - 1) ? render_info : nullptr,
585  queue_time_ms);
586  } catch (const NativeExecutionError&) {
587  if (!g_enable_interop) {
588  throw;
589  }
590  auto eo_extern = eo;
591  eo_extern.executor_type = ::ExecutorType::Extern;
592  auto exec_desc_ptr = seq.getDescriptor(i);
593  const auto body = exec_desc_ptr->getBody();
594  const auto compound = dynamic_cast<const RelCompound*>(body);
595  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
596  LOG(INFO) << "Also failed to run the query using interoperability";
597  throw;
598  }
599  executeRelAlgStep(seq,
600  i,
601  co,
602  eo_extern,
603  (i == exec_desc_count - 1) ? render_info : nullptr,
604  queue_time_ms);
605  }
606  }
607 
608  return seq.getDescriptor(exec_desc_count - 1)->getResult();
609 }
610 
612  const RaExecutionSequence& seq,
613  const std::pair<size_t, size_t> interval,
614  const CompilationOptions& co,
615  const ExecutionOptions& eo,
616  RenderInfo* render_info,
617  const int64_t queue_time_ms) {
619  executor_->catalog_ = &cat_;
620  executor_->temporary_tables_ = &temporary_tables_;
621 
622  time(&now_);
623  for (size_t i = interval.first; i < interval.second; i++) {
624  // only render on the last step
625  executeRelAlgStep(seq,
626  i,
627  co,
628  eo,
629  (i == interval.second - 1) ? render_info : nullptr,
630  queue_time_ms);
631  }
632 
633  return seq.getDescriptor(interval.second - 1)->getResult();
634 }
635 
637  const size_t step_idx,
638  const CompilationOptions& co,
639  const ExecutionOptions& eo,
640  RenderInfo* render_info,
641  const int64_t queue_time_ms) {
643  auto timer = DEBUG_TIMER(__func__);
645  auto exec_desc_ptr = seq.getDescriptor(step_idx);
646  CHECK(exec_desc_ptr);
647  auto& exec_desc = *exec_desc_ptr;
648  const auto body = exec_desc.getBody();
649  if (body->isNop()) {
650  handleNop(exec_desc);
651  return;
652  }
653  const ExecutionOptions eo_work_unit{
655  eo.allow_multifrag,
656  eo.just_explain,
657  eo.allow_loop_joins,
658  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
659  eo.jit_debug,
660  eo.just_validate,
668  eo.executor_type,
669  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
670 
671  // Notify foreign tables to load prior to execution
673 
674  const auto compound = dynamic_cast<const RelCompound*>(body);
675  if (compound) {
676  if (compound->isDeleteViaSelect()) {
677  executeDelete(compound, co, eo_work_unit, queue_time_ms);
678  } else if (compound->isUpdateViaSelect()) {
679  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
680  } else {
681  exec_desc.setResult(
682  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
683  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
684  << static_cast<int>(-compound->getId()) << ", ...)"
685  << " exec_desc.getResult().getDataPtr()->rowCount()="
686  << exec_desc.getResult().getDataPtr()->rowCount();
687  if (exec_desc.getResult().isFilterPushDownEnabled()) {
688  return;
689  }
690  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
691  }
692  return;
693  }
694  const auto project = dynamic_cast<const RelProject*>(body);
695  if (project) {
696  if (project->isDeleteViaSelect()) {
697  executeDelete(project, co, eo_work_unit, queue_time_ms);
698  } else if (project->isUpdateViaSelect()) {
699  executeUpdate(project, co, eo_work_unit, queue_time_ms);
700  } else {
701  std::optional<size_t> prev_count;
702  // Disabling the intermediate count optimization in distributed, as the previous
703  // execution descriptor will likely not hold the aggregated result.
704  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
705  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
706  CHECK(prev_exec_desc);
707  RelAlgNode const* prev_body = prev_exec_desc->getBody();
708  // This optimization needs to be restricted in its application for UNION, which
709  // can have 2 input nodes in which neither should restrict the count of the other.
710  // However some non-UNION queries are measurably slower with this restriction, so
711  // it is only applied when g_enable_union is true.
712  bool const parent_check =
713  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
714  // If the previous node produced a reliable count, skip the pre-flight count
715  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
716  dynamic_cast<const RelLogicalValues*>(prev_body))) {
717  const auto& prev_exe_result = prev_exec_desc->getResult();
718  const auto prev_result = prev_exe_result.getRows();
719  if (prev_result) {
720  prev_count = prev_result->rowCount();
721  VLOG(3) << "Setting output row count for projection node to previous node ("
722  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
723  }
724  }
725  }
726  exec_desc.setResult(executeProject(
727  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
728  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
729  << static_cast<int>(-project->getId()) << ", ...)"
730  << " exec_desc.getResult().getDataPtr()->rowCount()="
731  << exec_desc.getResult().getDataPtr()->rowCount();
732  if (exec_desc.getResult().isFilterPushDownEnabled()) {
733  return;
734  }
735  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
736  }
737  return;
738  }
739  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
740  if (aggregate) {
741  exec_desc.setResult(
742  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
743  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
744  return;
745  }
746  const auto filter = dynamic_cast<const RelFilter*>(body);
747  if (filter) {
748  exec_desc.setResult(
749  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
750  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
751  return;
752  }
753  const auto sort = dynamic_cast<const RelSort*>(body);
754  if (sort) {
755  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
756  if (exec_desc.getResult().isFilterPushDownEnabled()) {
757  return;
758  }
759  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
760  return;
761  }
762  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
763  if (logical_values) {
764  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
765  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
766  return;
767  }
768  const auto modify = dynamic_cast<const RelModify*>(body);
769  if (modify) {
770  exec_desc.setResult(executeModify(modify, eo_work_unit));
771  return;
772  }
773  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
774  if (logical_union) {
775  exec_desc.setResult(
776  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
777  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
778  return;
779  }
780  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
781  if (table_func) {
782  exec_desc.setResult(
783  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
784  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
785  return;
786  }
787  LOG(FATAL) << "Unhandled body type: " << body->toString();
788 }
789 
791  // just set the result of the previous node as the result of no op
792  auto body = ed.getBody();
793  CHECK(dynamic_cast<const RelAggregate*>(body));
794  CHECK_EQ(size_t(1), body->inputCount());
795  const auto input = body->getInput(0);
796  body->setOutputMetainfo(input->getOutputMetainfo());
797  const auto it = temporary_tables_.find(-input->getId());
798  CHECK(it != temporary_tables_.end());
799  // set up temp table as it could be used by the outer query or next step
800  addTemporaryTable(-body->getId(), it->second);
801 
802  ed.setResult({it->second, input->getOutputMetainfo()});
803 }
804 
805 namespace {
806 
807 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
808  public:
810 
811  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
812  return synthesized_physical_inputs_owned;
813  }
814 
815  std::unordered_set<const RexInput*> visitInput(
816  const RexInput* rex_input) const override {
817  const auto input_ra = rex_input->getSourceNode();
818  CHECK(input_ra);
819  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
820  if (scan_ra) {
821  const auto td = scan_ra->getTableDescriptor();
822  if (td) {
823  const auto col_id = rex_input->getIndex();
824  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
825  if (cd && cd->columnType.get_physical_cols() > 0) {
826  CHECK(IS_GEO(cd->columnType.get_type()));
827  std::unordered_set<const RexInput*> synthesized_physical_inputs;
828  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
829  auto physical_input =
830  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
831  synthesized_physical_inputs_owned.emplace_back(physical_input);
832  synthesized_physical_inputs.insert(physical_input);
833  }
834  return synthesized_physical_inputs;
835  }
836  }
837  }
838  return {rex_input};
839  }
840 
841  protected:
842  std::unordered_set<const RexInput*> aggregateResult(
843  const std::unordered_set<const RexInput*>& aggregate,
844  const std::unordered_set<const RexInput*>& next_result) const override {
845  auto result = aggregate;
846  result.insert(next_result.begin(), next_result.end());
847  return result;
848  }
849 
850  private:
851  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
853 };
854 
855 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
856  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
857  return table_func;
858  }
859  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
860  CHECK_EQ(size_t(2), join->inputCount());
861  return join;
862  }
863  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
864  CHECK_EQ(size_t(1), ra_node->inputCount());
865  }
866  auto only_src = ra_node->getInput(0);
867  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
868  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
869  return is_join ? only_src : ra_node;
870 }
871 
872 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
874  RexUsedInputsVisitor visitor(cat);
875  const auto filter_expr = compound->getFilterExpr();
876  std::unordered_set<const RexInput*> used_inputs =
877  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
878  const auto sources_size = compound->getScalarSourcesSize();
879  for (size_t i = 0; i < sources_size; ++i) {
880  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
881  used_inputs.insert(source_inputs.begin(), source_inputs.end());
882  }
883  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
884  return std::make_pair(used_inputs, used_inputs_owned);
885 }
886 
887 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
889  CHECK_EQ(size_t(1), aggregate->inputCount());
890  std::unordered_set<const RexInput*> used_inputs;
891  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
892  const auto source = aggregate->getInput(0);
893  const auto& in_metainfo = source->getOutputMetainfo();
894  const auto group_count = aggregate->getGroupByCount();
895  CHECK_GE(in_metainfo.size(), group_count);
896  for (size_t i = 0; i < group_count; ++i) {
897  auto synthesized_used_input = new RexInput(source, i);
898  used_inputs_owned.emplace_back(synthesized_used_input);
899  used_inputs.insert(synthesized_used_input);
900  }
901  for (const auto& agg_expr : aggregate->getAggExprs()) {
902  for (size_t i = 0; i < agg_expr->size(); ++i) {
903  const auto operand_idx = agg_expr->getOperand(i);
904  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
905  auto synthesized_used_input = new RexInput(source, operand_idx);
906  used_inputs_owned.emplace_back(synthesized_used_input);
907  used_inputs.insert(synthesized_used_input);
908  }
909  }
910  return std::make_pair(used_inputs, used_inputs_owned);
911 }
912 
913 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
915  RexUsedInputsVisitor visitor(cat);
916  std::unordered_set<const RexInput*> used_inputs;
917  for (size_t i = 0; i < project->size(); ++i) {
918  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
919  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
920  }
921  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
922  return std::make_pair(used_inputs, used_inputs_owned);
923 }
924 
925 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
928  RexUsedInputsVisitor visitor(cat);
929  std::unordered_set<const RexInput*> used_inputs;
930  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
931  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
932  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
933  }
934  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
935  return std::make_pair(used_inputs, used_inputs_owned);
936 }
937 
938 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
940  std::unordered_set<const RexInput*> used_inputs;
941  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
942  const auto data_sink_node = get_data_sink(filter);
943  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
944  const auto source = data_sink_node->getInput(nest_level);
945  const auto scan_source = dynamic_cast<const RelScan*>(source);
946  if (scan_source) {
947  CHECK(source->getOutputMetainfo().empty());
948  for (size_t i = 0; i < scan_source->size(); ++i) {
949  auto synthesized_used_input = new RexInput(scan_source, i);
950  used_inputs_owned.emplace_back(synthesized_used_input);
951  used_inputs.insert(synthesized_used_input);
952  }
953  } else {
954  const auto& partial_in_metadata = source->getOutputMetainfo();
955  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
956  auto synthesized_used_input = new RexInput(source, i);
957  used_inputs_owned.emplace_back(synthesized_used_input);
958  used_inputs.insert(synthesized_used_input);
959  }
960  }
961  }
962  return std::make_pair(used_inputs, used_inputs_owned);
963 }
964 
965 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
967  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
968  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
969  used_inputs_owned.reserve(logical_union->inputCount());
970  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
971  auto const n_inputs = logical_union->inputCount();
972  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
973  auto input = logical_union->getInput(nest_level);
974  for (size_t i = 0; i < input->size(); ++i) {
975  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
976  used_inputs.insert(used_inputs_owned.back().get());
977  }
978  }
979  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
980 }
981 
982 int table_id_from_ra(const RelAlgNode* ra_node) {
983  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
984  if (scan_ra) {
985  const auto td = scan_ra->getTableDescriptor();
986  CHECK(td);
987  return td->tableId;
988  }
989  return -ra_node->getId();
990 }
991 
992 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
993  const RelAlgNode* ra_node,
994  const std::vector<size_t>& input_permutation) {
995  const auto data_sink_node = get_data_sink(ra_node);
996  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
997  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
998  const auto input_node_idx =
999  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1000  const auto input_ra = data_sink_node->getInput(input_node_idx);
1001  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1002  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1003  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1004  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1005  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1006  CHECK(it_ok.second);
1007  LOG_IF(INFO, !input_permutation.empty())
1008  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1009  }
1010  return input_to_nest_level;
1011 }
1012 
1013 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1016  const auto data_sink_node = get_data_sink(ra_node);
1017  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1018  CHECK_EQ(join->inputCount(), 2u);
1019  const auto condition = join->getCondition();
1020  RexUsedInputsVisitor visitor(cat);
1021  auto condition_inputs = visitor.visit(condition);
1022  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1023  visitor.get_inputs_owned());
1024  return std::make_pair(condition_inputs, condition_inputs_owned);
1025  }
1026 
1027  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1028  CHECK_GE(left_deep_join->inputCount(), 2u);
1029  const auto condition = left_deep_join->getInnerCondition();
1030  RexUsedInputsVisitor visitor(cat);
1031  auto result = visitor.visit(condition);
1032  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1033  ++nesting_level) {
1034  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1035  if (outer_condition) {
1036  const auto outer_result = visitor.visit(outer_condition);
1037  result.insert(outer_result.begin(), outer_result.end());
1038  }
1039  }
1040  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1041  return std::make_pair(result, used_inputs_owned);
1042  }
1043 
1044  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1045  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1046  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1047  CHECK_GT(ra_node->inputCount(), 0u) << ra_node->toString();
1048  } else {
1049  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1050  }
1051  return std::make_pair(std::unordered_set<const RexInput*>{},
1052  std::vector<std::shared_ptr<RexInput>>{});
1053 }
1054 
1056  std::vector<InputDescriptor>& input_descs,
1058  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1059  const RelAlgNode* ra_node,
1060  const std::unordered_set<const RexInput*>& source_used_inputs,
1061  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1062  VLOG(3) << "ra_node=" << ra_node->toString()
1063  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1064  << " source_used_inputs.size()=" << source_used_inputs.size();
1065  for (const auto used_input : source_used_inputs) {
1066  const auto input_ra = used_input->getSourceNode();
1067  const int table_id = table_id_from_ra(input_ra);
1068  const auto col_id = used_input->getIndex();
1069  auto it = input_to_nest_level.find(input_ra);
1070  if (it != input_to_nest_level.end()) {
1071  const int input_desc = it->second;
1072  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1073  dynamic_cast<const RelScan*>(input_ra)
1074  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1075  : col_id,
1076  table_id,
1077  input_desc));
1078  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1079  throw std::runtime_error("Bushy joins not supported");
1080  }
1081  }
1082 }
1083 
1084 template <class RA>
1085 std::pair<std::vector<InputDescriptor>,
1086  std::list<std::shared_ptr<const InputColDescriptor>>>
1087 get_input_desc_impl(const RA* ra_node,
1088  const std::unordered_set<const RexInput*>& used_inputs,
1089  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1090  const std::vector<size_t>& input_permutation,
1092  std::vector<InputDescriptor> input_descs;
1093  const auto data_sink_node = get_data_sink(ra_node);
1094  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1095  const auto input_node_idx =
1096  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1097  auto input_ra = data_sink_node->getInput(input_node_idx);
1098  const int table_id = table_id_from_ra(input_ra);
1099  input_descs.emplace_back(table_id, input_idx);
1100  }
1101  std::sort(input_descs.begin(),
1102  input_descs.end(),
1103  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1104  return lhs.getNestLevel() < rhs.getNestLevel();
1105  });
1106  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1107  collect_used_input_desc(input_descs,
1108  cat,
1109  input_col_descs_unique, // modified
1110  ra_node,
1111  used_inputs,
1112  input_to_nest_level);
1113  std::unordered_set<const RexInput*> join_source_used_inputs;
1114  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1115  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1116  get_join_source_used_inputs(ra_node, cat);
1117  collect_used_input_desc(input_descs,
1118  cat,
1119  input_col_descs_unique, // modified
1120  ra_node,
1121  join_source_used_inputs,
1122  input_to_nest_level);
1123  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1124  input_col_descs_unique.begin(), input_col_descs_unique.end());
1125 
1126  std::sort(input_col_descs.begin(),
1127  input_col_descs.end(),
1128  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1129  std::shared_ptr<const InputColDescriptor> const& rhs) {
1130  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1131  lhs->getColId(),
1132  lhs->getScanDesc().getTableId()) <
1133  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1134  rhs->getColId(),
1135  rhs->getScanDesc().getTableId());
1136  });
1137  return {input_descs,
1138  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1139  input_col_descs.end())};
1140 }
1141 
1142 template <class RA>
1143 std::tuple<std::vector<InputDescriptor>,
1144  std::list<std::shared_ptr<const InputColDescriptor>>,
1145  std::vector<std::shared_ptr<RexInput>>>
1146 get_input_desc(const RA* ra_node,
1147  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1148  const std::vector<size_t>& input_permutation,
1149  const Catalog_Namespace::Catalog& cat) {
1150  std::unordered_set<const RexInput*> used_inputs;
1151  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1152  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1153  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1154  auto input_desc_pair = get_input_desc_impl(
1155  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1156  return std::make_tuple(
1157  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1158 }
1159 
1160 size_t get_scalar_sources_size(const RelCompound* compound) {
1161  return compound->getScalarSourcesSize();
1162 }
1163 
1164 size_t get_scalar_sources_size(const RelProject* project) {
1165  return project->size();
1166 }
1167 
1168 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1169  return table_func->getTableFuncInputsSize();
1170 }
1171 
1172 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1173  return compound->getScalarSource(i);
1174 }
1175 
1176 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1177  return project->getProjectAt(i);
1178 }
1179 
1180 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1181  return table_func->getTableFuncInputAt(i);
1182 }
1183 
1184 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1185  const std::shared_ptr<Analyzer::Expr> expr) {
1186  const auto& ti = expr->get_type_info();
1187  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1188  return expr;
1189  }
1190  auto transient_dict_ti = ti;
1191  transient_dict_ti.set_compression(kENCODING_DICT);
1192  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1193  transient_dict_ti.set_fixed_size();
1194  return expr->add_cast(transient_dict_ti);
1195 }
1196 
1198  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1199  const std::shared_ptr<Analyzer::Expr>& expr) {
1200  try {
1201  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1202  } catch (...) {
1203  scalar_sources.push_back(fold_expr(expr.get()));
1204  }
1205 }
1206 
1207 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1208  const std::shared_ptr<Analyzer::Expr>& input) {
1209  const auto& input_ti = input->get_type_info();
1210  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1211  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1212  }
1213  return input;
1214 }
1215 
1216 template <class RA>
1217 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1218  const RA* ra_node,
1219  const RelAlgTranslator& translator,
1220  const ::ExecutorType executor_type) {
1221  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1222  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1223  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1224  << ") = " << scalar_sources_size;
1225  for (size_t i = 0; i < scalar_sources_size; ++i) {
1226  const auto scalar_rex = scalar_at(i, ra_node);
1227  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1228  // RexRef are synthetic scalars we append at the end of the real ones
1229  // for the sake of taking memory ownership, no real work needed here.
1230  continue;
1231  }
1232 
1233  const auto scalar_expr =
1234  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1235  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1236  if (executor_type == ExecutorType::Native) {
1237  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1238  } else {
1239  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1240  }
1241  }
1242 
1243  return scalar_sources;
1244 }
1245 
1246 template <class RA>
1247 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1248  const RA* ra_node,
1249  const RelAlgTranslator& translator,
1250  int32_t tableId,
1251  const Catalog_Namespace::Catalog& cat,
1252  const ColumnNameList& colNames,
1253  size_t starting_projection_column_idx) {
1254  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1255  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1256  const auto scalar_rex = scalar_at(i, ra_node);
1257  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1258  // RexRef are synthetic scalars we append at the end of the real ones
1259  // for the sake of taking memory ownership, no real work needed here.
1260  continue;
1261  }
1262 
1263  std::shared_ptr<Analyzer::Expr> translated_expr;
1264  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1265  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1266  tableId,
1267  cat,
1268  colNames[i - starting_projection_column_idx]);
1269  } else {
1270  translated_expr = translator.translateScalarRex(scalar_rex);
1271  }
1272  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1273  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1274  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1275  }
1276 
1277  return scalar_sources;
1278 }
1279 
1280 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1281  const RelCompound* compound,
1282  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1283  if (!compound->isAggregate()) {
1284  return {nullptr};
1285  }
1286  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1287  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1288  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1289  }
1290  return groupby_exprs;
1291 }
1292 
1293 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1294  const RelAggregate* aggregate,
1295  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1296  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1297  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1298  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1299  }
1300  return groupby_exprs;
1301 }
1302 
1304  const RelAlgTranslator& translator) {
1305  const auto filter_rex = compound->getFilterExpr();
1306  const auto filter_expr =
1307  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1308  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1310 }
1311 
1312 std::vector<Analyzer::Expr*> translate_targets(
1313  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1314  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1315  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1316  const RelCompound* compound,
1317  const RelAlgTranslator& translator,
1318  const ExecutorType executor_type) {
1319  std::vector<Analyzer::Expr*> target_exprs;
1320  for (size_t i = 0; i < compound->size(); ++i) {
1321  const auto target_rex = compound->getTargetExpr(i);
1322  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1323  std::shared_ptr<Analyzer::Expr> target_expr;
1324  if (target_rex_agg) {
1325  target_expr =
1326  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1327  } else {
1328  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1329  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1330  if (target_rex_ref) {
1331  const auto ref_idx = target_rex_ref->getIndex();
1332  CHECK_GE(ref_idx, size_t(1));
1333  CHECK_LE(ref_idx, groupby_exprs.size());
1334  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1335  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1336  } else {
1337  target_expr = translator.translateScalarRex(target_rex_scalar);
1338  auto rewritten_expr = rewrite_expr(target_expr.get());
1339  target_expr = fold_expr(rewritten_expr.get());
1340  if (executor_type == ExecutorType::Native) {
1341  try {
1342  target_expr = set_transient_dict(target_expr);
1343  } catch (...) {
1344  // noop
1345  }
1346  } else {
1347  target_expr = cast_dict_to_none(target_expr);
1348  }
1349  }
1350  }
1351  CHECK(target_expr);
1352  target_exprs_owned.push_back(target_expr);
1353  target_exprs.push_back(target_expr.get());
1354  }
1355  return target_exprs;
1356 }
1357 
1358 std::vector<Analyzer::Expr*> translate_targets(
1359  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1360  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1361  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1362  const RelAggregate* aggregate,
1363  const RelAlgTranslator& translator) {
1364  std::vector<Analyzer::Expr*> target_exprs;
1365  size_t group_key_idx = 1;
1366  for (const auto& groupby_expr : groupby_exprs) {
1367  auto target_expr =
1368  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1369  target_exprs_owned.push_back(target_expr);
1370  target_exprs.push_back(target_expr.get());
1371  }
1372 
1373  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1374  auto target_expr =
1375  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1376  CHECK(target_expr);
1377  target_expr = fold_expr(target_expr.get());
1378  target_exprs_owned.push_back(target_expr);
1379  target_exprs.push_back(target_expr.get());
1380  }
1381  return target_exprs;
1382 }
1383 
1385  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1386  return agg_expr && agg_expr->get_is_distinct();
1387 }
1388 
1389 bool is_agg(const Analyzer::Expr* expr) {
1390  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1391  if (agg_expr && agg_expr->get_contains_agg()) {
1392  auto agg_type = agg_expr->get_aggtype();
1393  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1394  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1395  return true;
1396  }
1397  }
1398  return false;
1399 }
1400 
1402  if (is_count_distinct(&expr)) {
1403  return SQLTypeInfo(kBIGINT, false);
1404  } else if (is_agg(&expr)) {
1406  }
1407  return get_logical_type_info(expr.get_type_info());
1408 }
1409 
1410 template <class RA>
1411 std::vector<TargetMetaInfo> get_targets_meta(
1412  const RA* ra_node,
1413  const std::vector<Analyzer::Expr*>& target_exprs) {
1414  std::vector<TargetMetaInfo> targets_meta;
1415  CHECK_EQ(ra_node->size(), target_exprs.size());
1416  for (size_t i = 0; i < ra_node->size(); ++i) {
1417  CHECK(target_exprs[i]);
1418  // TODO(alex): remove the count distinct type fixup.
1419  targets_meta.emplace_back(ra_node->getFieldName(i),
1420  get_logical_type_for_expr(*target_exprs[i]),
1421  target_exprs[i]->get_type_info());
1422  }
1423  return targets_meta;
1424 }
1425 
1426 template <>
1427 std::vector<TargetMetaInfo> get_targets_meta(
1428  const RelFilter* filter,
1429  const std::vector<Analyzer::Expr*>& target_exprs) {
1430  RelAlgNode const* input0 = filter->getInput(0);
1431  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1432  return get_targets_meta(input, target_exprs);
1433  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1434  return get_targets_meta(input, target_exprs);
1435  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1436  return get_targets_meta(input, target_exprs);
1437  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1438  return get_targets_meta(input, target_exprs);
1439  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1440  return get_targets_meta(input, target_exprs);
1441  }
1442  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1443  return {};
1444 }
1445 
1446 } // namespace
1447 
1449  const CompilationOptions& co_in,
1450  const ExecutionOptions& eo_in,
1451  const int64_t queue_time_ms) {
1452  CHECK(node);
1453  auto timer = DEBUG_TIMER(__func__);
1454 
1456 
1457  auto co = co_in;
1458  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1459  // encoded string updates
1460 
1461  auto execute_update_for_node =
1462  [this, &co, &eo_in](const auto node, auto& work_unit, const bool is_aggregate) {
1463  UpdateTransactionParameters update_params(node->getModifiedTableDescriptor(),
1464  node->getTargetColumns(),
1465  node->getOutputMetainfo(),
1466  node->isVarlenUpdateRequired());
1467 
1468  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1469 
1470  auto execute_update_ra_exe_unit =
1471  [this, &co, &eo_in, &table_infos, &update_params](
1472  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1474 
1475  auto eo = eo_in;
1476  if (update_params.tableIsTemporary()) {
1477  eo.output_columnar_hint = true;
1478  co_project.allow_lazy_fetch = false;
1479  co_project.filter_on_deleted_column =
1480  false; // project the entire delete column for columnar update
1481  }
1482 
1483  auto update_callback = yieldUpdateCallback(update_params);
1484  executor_->executeUpdate(ra_exe_unit,
1485  table_infos,
1486  co_project,
1487  eo,
1488  cat_,
1489  executor_->row_set_mem_owner_,
1490  update_callback,
1491  is_aggregate);
1492  update_params.finalizeTransaction();
1493  };
1494 
1495  if (update_params.tableIsTemporary()) {
1496  // hold owned target exprs during execution if rewriting
1497  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1498  // rewrite temp table updates to generate the full column by moving the where
1499  // clause into a case if such a rewrite is not possible, bail on the update
1500  // operation build an expr for the update target
1501  const auto td = update_params.getTableDescriptor();
1502  CHECK(td);
1503  const auto update_column_names = update_params.getUpdateColumnNames();
1504  if (update_column_names.size() > 1) {
1505  throw std::runtime_error(
1506  "Multi-column update is not yet supported for temporary tables.");
1507  }
1508 
1509  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1510  CHECK(cd);
1511  auto projected_column_to_update =
1512  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1513  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1514  work_unit.exe_unit, projected_column_to_update);
1515  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1516  throw std::runtime_error(
1517  "Variable length updates not yet supported on temporary tables.");
1518  }
1519  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1520  } else {
1521  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1522  }
1523  };
1524 
1525  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1526  auto work_unit =
1527  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1528 
1529  execute_update_for_node(compound, work_unit, compound->isAggregate());
1530  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1531  auto work_unit =
1532  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1533 
1534  if (project->isSimple()) {
1535  CHECK_EQ(size_t(1), project->inputCount());
1536  const auto input_ra = project->getInput(0);
1537  if (dynamic_cast<const RelSort*>(input_ra)) {
1538  const auto& input_table =
1539  get_temporary_table(&temporary_tables_, -input_ra->getId());
1540  CHECK(input_table);
1541  work_unit.exe_unit.scan_limit = input_table->rowCount();
1542  }
1543  }
1544 
1545  execute_update_for_node(project, work_unit, false);
1546  } else {
1547  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1548  }
1549 }
1550 
1552  const CompilationOptions& co,
1553  const ExecutionOptions& eo_in,
1554  const int64_t queue_time_ms) {
1555  CHECK(node);
1556  auto timer = DEBUG_TIMER(__func__);
1557 
1559 
1560  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1561  auto& work_unit,
1562  const bool is_aggregate) {
1563  auto* table_descriptor = node->getModifiedTableDescriptor();
1564  CHECK(table_descriptor);
1565  if (!table_descriptor->hasDeletedCol) {
1566  throw std::runtime_error(
1567  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1568  }
1569 
1570  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1571 
1572  auto execute_delete_ra_exe_unit =
1573  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1574  const bool is_aggregate) {
1575  DeleteTransactionParameters delete_params(table_is_temporary(table_descriptor));
1576  auto delete_callback = yieldDeleteCallback(delete_params);
1578 
1579  auto eo = eo_in;
1580  if (delete_params.tableIsTemporary()) {
1581  eo.output_columnar_hint = true;
1582  co_delete.filter_on_deleted_column =
1583  false; // project the entire delete column for columnar update
1584  } else {
1585  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1586  }
1587 
1588  executor_->executeUpdate(exe_unit,
1589  table_infos,
1590  co_delete,
1591  eo,
1592  cat_,
1593  executor_->row_set_mem_owner_,
1594  delete_callback,
1595  is_aggregate);
1596  delete_params.finalizeTransaction();
1597  };
1598 
1599  if (table_is_temporary(table_descriptor)) {
1600  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1601  auto cd = cat_.getDeletedColumn(table_descriptor);
1602  CHECK(cd);
1603  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1604  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1605  const auto rewritten_exe_unit =
1606  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1607  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1608  } else {
1609  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1610  }
1611  };
1612 
1613  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1614  const auto work_unit =
1615  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1616  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1617  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1618  auto work_unit =
1619  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1620  if (project->isSimple()) {
1621  CHECK_EQ(size_t(1), project->inputCount());
1622  const auto input_ra = project->getInput(0);
1623  if (dynamic_cast<const RelSort*>(input_ra)) {
1624  const auto& input_table =
1625  get_temporary_table(&temporary_tables_, -input_ra->getId());
1626  CHECK(input_table);
1627  work_unit.exe_unit.scan_limit = input_table->rowCount();
1628  }
1629  }
1630  execute_delete_for_node(project, work_unit, false);
1631  } else {
1632  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1633  }
1634 }
1635 
1637  const CompilationOptions& co,
1638  const ExecutionOptions& eo,
1639  RenderInfo* render_info,
1640  const int64_t queue_time_ms) {
1641  auto timer = DEBUG_TIMER(__func__);
1642  const auto work_unit =
1643  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1644  CompilationOptions co_compound = co;
1645  return executeWorkUnit(work_unit,
1646  compound->getOutputMetainfo(),
1647  compound->isAggregate(),
1648  co_compound,
1649  eo,
1650  render_info,
1651  queue_time_ms);
1652 }
1653 
1655  const CompilationOptions& co,
1656  const ExecutionOptions& eo,
1657  RenderInfo* render_info,
1658  const int64_t queue_time_ms) {
1659  auto timer = DEBUG_TIMER(__func__);
1660  const auto work_unit = createAggregateWorkUnit(
1661  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1662  return executeWorkUnit(work_unit,
1663  aggregate->getOutputMetainfo(),
1664  true,
1665  co,
1666  eo,
1667  render_info,
1668  queue_time_ms);
1669 }
1670 
1671 namespace {
1672 
1673 // Returns true iff the execution unit contains window functions.
1675  return std::any_of(ra_exe_unit.target_exprs.begin(),
1676  ra_exe_unit.target_exprs.end(),
1677  [](const Analyzer::Expr* expr) {
1678  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1679  });
1680 }
1681 
1682 } // namespace
1683 
1685  const RelProject* project,
1686  const CompilationOptions& co,
1687  const ExecutionOptions& eo,
1688  RenderInfo* render_info,
1689  const int64_t queue_time_ms,
1690  const std::optional<size_t> previous_count) {
1691  auto timer = DEBUG_TIMER(__func__);
1692  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1693  CompilationOptions co_project = co;
1694  if (project->isSimple()) {
1695  CHECK_EQ(size_t(1), project->inputCount());
1696  const auto input_ra = project->getInput(0);
1697  if (dynamic_cast<const RelSort*>(input_ra)) {
1698  co_project.device_type = ExecutorDeviceType::CPU;
1699  const auto& input_table =
1700  get_temporary_table(&temporary_tables_, -input_ra->getId());
1701  CHECK(input_table);
1702  work_unit.exe_unit.scan_limit =
1703  std::min(input_table->getLimit(), input_table->rowCount());
1704  }
1705  }
1706  return executeWorkUnit(work_unit,
1707  project->getOutputMetainfo(),
1708  false,
1709  co_project,
1710  eo,
1711  render_info,
1712  queue_time_ms,
1713  previous_count);
1714 }
1715 
1717  const CompilationOptions& co_in,
1718  const ExecutionOptions& eo,
1719  const int64_t queue_time_ms) {
1721  auto timer = DEBUG_TIMER(__func__);
1722 
1723  auto co = co_in;
1724 
1725  if (g_cluster) {
1726  throw std::runtime_error("Table functions not supported in distributed mode yet");
1727  }
1728  if (!g_enable_table_functions) {
1729  throw std::runtime_error("Table function support is disabled");
1730  }
1731  auto table_func_work_unit = createTableFunctionWorkUnit(
1732  table_func,
1733  eo.just_explain,
1734  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1735  const auto body = table_func_work_unit.body;
1736  CHECK(body);
1737 
1738  const auto table_infos =
1739  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1740 
1741  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1742  co.device_type,
1744  nullptr,
1745  executor_),
1746  {}};
1747 
1748  try {
1749  result = {executor_->executeTableFunction(
1750  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1751  body->getOutputMetainfo()};
1752  } catch (const QueryExecutionError& e) {
1755  throw std::runtime_error("Table function ran out of memory during execution");
1756  }
1757  result.setQueueTime(queue_time_ms);
1758  return result;
1759 }
1760 
1761 namespace {
1762 
1763 // Creates a new expression which has the range table index set to 1. This is needed to
1764 // reuse the hash join construction helpers to generate a hash table for the window
1765 // function partition: create an equals expression with left and right sides identical
1766 // except for the range table index.
1767 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1768  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1769  if (tuple) {
1770  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1771  for (const auto& element : tuple->getTuple()) {
1772  transformed_tuple.push_back(transform_to_inner(element.get()));
1773  }
1774  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1775  }
1776  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1777  if (!col) {
1778  throw std::runtime_error("Only columns supported in the window partition for now");
1779  }
1780  return makeExpr<Analyzer::ColumnVar>(
1781  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1782 }
1783 
1784 } // namespace
1785 
1787  const CompilationOptions& co,
1788  const ExecutionOptions& eo,
1789  ColumnCacheMap& column_cache_map,
1790  const int64_t queue_time_ms) {
1791  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1792  CHECK_EQ(query_infos.size(), size_t(1));
1793  if (query_infos.front().info.fragments.size() != 1) {
1794  throw std::runtime_error(
1795  "Only single fragment tables supported for window functions for now");
1796  }
1797  if (eo.executor_type == ::ExecutorType::Extern) {
1798  return;
1799  }
1800  query_infos.push_back(query_infos.front());
1801  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1802  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1803  ++target_index) {
1804  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1805  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1806  if (!window_func) {
1807  continue;
1808  }
1809  // Always use baseline layout hash tables for now, make the expression a tuple.
1810  const auto& partition_keys = window_func->getPartitionKeys();
1811  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1812  if (partition_keys.size() > 1) {
1813  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1814  } else {
1815  if (partition_keys.empty()) {
1816  throw std::runtime_error(
1817  "Empty window function partitions are not supported yet");
1818  }
1819  CHECK_EQ(partition_keys.size(), size_t(1));
1820  partition_key_tuple = partition_keys.front();
1821  }
1822  // Creates a tautology equality with the partition expression on both sides.
1823  const auto partition_key_cond =
1824  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1825  kBW_EQ,
1826  kONE,
1827  partition_key_tuple,
1828  transform_to_inner(partition_key_tuple.get()));
1829  auto context = createWindowFunctionContext(window_func,
1830  partition_key_cond,
1831  ra_exe_unit,
1832  query_infos,
1833  co,
1834  column_cache_map,
1835  executor_->getRowSetMemoryOwner());
1836  context->compute();
1837  window_project_node_context->addWindowFunctionContext(std::move(context),
1838  target_index);
1839  }
1840 }
1841 
1842 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1843  const Analyzer::WindowFunction* window_func,
1844  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1845  const RelAlgExecutionUnit& ra_exe_unit,
1846  const std::vector<InputTableInfo>& query_infos,
1847  const CompilationOptions& co,
1848  ColumnCacheMap& column_cache_map,
1849  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1850  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1853  const auto join_table_or_err =
1854  executor_->buildHashTableForQualifier(partition_key_cond,
1855  query_infos,
1856  memory_level,
1858  column_cache_map);
1859  if (!join_table_or_err.fail_reason.empty()) {
1860  throw std::runtime_error(join_table_or_err.fail_reason);
1861  }
1862  CHECK(join_table_or_err.hash_table->getHashType() ==
1864  const auto& order_keys = window_func->getOrderKeys();
1865  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1866  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1867  auto context = std::make_unique<WindowFunctionContext>(window_func,
1868  join_table_or_err.hash_table,
1869  elem_count,
1870  co.device_type,
1871  row_set_mem_owner);
1872  for (const auto& order_key : order_keys) {
1873  const auto order_col =
1874  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1875  if (!order_col) {
1876  throw std::runtime_error("Only order by columns supported for now");
1877  }
1878  const int8_t* column;
1879  size_t join_col_elem_count;
1880  std::tie(column, join_col_elem_count) =
1882  *order_col,
1883  query_infos.front().info.fragments.front(),
1884  memory_level,
1885  0,
1886  nullptr,
1887  chunks_owner,
1888  column_cache_map);
1889  CHECK_EQ(join_col_elem_count, elem_count);
1890  context->addOrderColumn(column, order_col.get(), chunks_owner);
1891  }
1892  return context;
1893 }
1894 
1896  const CompilationOptions& co,
1897  const ExecutionOptions& eo,
1898  RenderInfo* render_info,
1899  const int64_t queue_time_ms) {
1900  auto timer = DEBUG_TIMER(__func__);
1901  const auto work_unit =
1902  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1903  return executeWorkUnit(
1904  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1905 }
1906 
1907 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
1908  std::vector<TargetMetaInfo> const& rhs) {
1909  if (lhs.size() == rhs.size()) {
1910  for (size_t i = 0; i < lhs.size(); ++i) {
1911  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
1912  return false;
1913  }
1914  }
1915  return true;
1916  }
1917  return false;
1918 }
1919 
1920 bool isGeometry(TargetMetaInfo const& target_meta_info) {
1921  return target_meta_info.get_type_info().is_geometry();
1922 }
1923 
1925  const RaExecutionSequence& seq,
1926  const CompilationOptions& co,
1927  const ExecutionOptions& eo,
1928  RenderInfo* render_info,
1929  const int64_t queue_time_ms) {
1930  auto timer = DEBUG_TIMER(__func__);
1931  if (!logical_union->isAll()) {
1932  throw std::runtime_error("UNION without ALL is not supported yet.");
1933  }
1934  // Will throw a std::runtime_error if types don't match.
1935  logical_union->checkForMatchingMetaInfoTypes();
1936  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
1937  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
1938  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
1939  }
1940  // Only Projections and Aggregates from a UNION are supported for now.
1941  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
1942  if (node->hasInput(logical_union) &&
1943  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
1944  node)) {
1945  throw std::runtime_error("UNION ALL not yet supported in this context.");
1946  }
1947  });
1948 
1949  auto work_unit =
1950  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
1951  return executeWorkUnit(work_unit,
1952  logical_union->getOutputMetainfo(),
1953  false,
1955  eo,
1956  render_info,
1957  queue_time_ms);
1958 }
1959 
1961  const RelLogicalValues* logical_values,
1962  const ExecutionOptions& eo) {
1963  auto timer = DEBUG_TIMER(__func__);
1964  if (eo.just_explain) {
1965  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1966  }
1967 
1969  logical_values->getNumRows(),
1971  /*is_table_function=*/false);
1972 
1973  auto tuple_type = logical_values->getTupleType();
1974  for (size_t i = 0; i < tuple_type.size(); ++i) {
1975  auto& target_meta_info = tuple_type[i];
1976  if (target_meta_info.get_type_info().is_varlen()) {
1977  throw std::runtime_error("Variable length types not supported in VALUES yet.");
1978  }
1979  if (target_meta_info.get_type_info().get_type() == kNULLT) {
1980  // replace w/ bigint
1981  tuple_type[i] =
1982  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
1983  }
1984  query_mem_desc.addColSlotInfo(
1985  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
1986  }
1987  logical_values->setOutputMetainfo(tuple_type);
1988 
1989  std::vector<TargetInfo> target_infos;
1990  for (const auto& tuple_type_component : tuple_type) {
1991  target_infos.emplace_back(TargetInfo{false,
1992  kCOUNT,
1993  tuple_type_component.get_type_info(),
1994  SQLTypeInfo(kNULLT, false),
1995  false,
1996  false});
1997  }
1998 
1999  std::shared_ptr<ResultSet> rs{
2000  ResultSetLogicalValuesBuilder{logical_values,
2001  target_infos,
2004  executor_->getRowSetMemoryOwner(),
2005  executor_}
2006  .build()};
2007 
2008  return {rs, tuple_type};
2009 }
2010 
2011 namespace {
2012 
2013 template <class T>
2014 int64_t insert_one_dict_str(T* col_data,
2015  const std::string& columnName,
2016  const SQLTypeInfo& columnType,
2017  const Analyzer::Constant* col_cv,
2018  const Catalog_Namespace::Catalog& catalog) {
2019  if (col_cv->get_is_null()) {
2020  *col_data = inline_fixed_encoding_null_val(columnType);
2021  } else {
2022  const int dict_id = columnType.get_comp_param();
2023  const auto col_datum = col_cv->get_constval();
2024  const auto& str = *col_datum.stringval;
2025  const auto dd = catalog.getMetadataForDict(dict_id);
2026  CHECK(dd && dd->stringDict);
2027  int32_t str_id = dd->stringDict->getOrAdd(str);
2028  if (!dd->dictIsTemp) {
2029  const auto checkpoint_ok = dd->stringDict->checkpoint();
2030  if (!checkpoint_ok) {
2031  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2032  columnName);
2033  }
2034  }
2035  const bool invalid = str_id > max_valid_int_value<T>();
2036  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2037  if (invalid) {
2038  LOG(ERROR) << "Could not encode string: " << str
2039  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2040  << " bits. Will store NULL instead.";
2041  }
2042  str_id = inline_fixed_encoding_null_val(columnType);
2043  }
2044  *col_data = str_id;
2045  }
2046  return *col_data;
2047 }
2048 
2049 template <class T>
2050 int64_t insert_one_dict_str(T* col_data,
2051  const ColumnDescriptor* cd,
2052  const Analyzer::Constant* col_cv,
2053  const Catalog_Namespace::Catalog& catalog) {
2054  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2055 }
2056 
2057 } // namespace
2058 
2060  const ExecutionOptions& eo) {
2061  auto timer = DEBUG_TIMER(__func__);
2062  if (eo.just_explain) {
2063  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2064  }
2065 
2066  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2069  executor_->getRowSetMemoryOwner(),
2070  executor_);
2071 
2072  std::vector<TargetMetaInfo> empty_targets;
2073  return {rs, empty_targets};
2074 }
2075 
2077  // Note: We currently obtain an executor for this method, but we do not need it.
2078  // Therefore, we skip the executor state setup in the regular execution path. In the
2079  // future, we will likely want to use the executor to evaluate expressions in the insert
2080  // statement.
2081 
2082  const auto& targets = query.get_targetlist();
2083  const int table_id = query.get_result_table_id();
2084  const auto& col_id_list = query.get_result_col_list();
2085 
2086  std::vector<const ColumnDescriptor*> col_descriptors;
2087  std::vector<int> col_ids;
2088  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2089  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2090  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2091 
2092  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2093  CHECK(table_descriptor);
2094  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
2095  const TableDescriptor* shard{nullptr};
2096 
2097  for (const int col_id : col_id_list) {
2098  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2099  const auto col_enc = cd->columnType.get_compression();
2100  if (cd->columnType.is_string()) {
2101  switch (col_enc) {
2102  case kENCODING_NONE: {
2103  auto it_ok =
2104  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2105  CHECK(it_ok.second);
2106  break;
2107  }
2108  case kENCODING_DICT: {
2109  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2110  CHECK(dd);
2111  const auto it_ok = col_buffers.emplace(
2112  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2113  CHECK(it_ok.second);
2114  break;
2115  }
2116  default:
2117  CHECK(false);
2118  }
2119  } else if (cd->columnType.is_geometry()) {
2120  auto it_ok =
2121  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2122  CHECK(it_ok.second);
2123  } else if (cd->columnType.is_array()) {
2124  auto it_ok =
2125  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2126  CHECK(it_ok.second);
2127  } else {
2128  const auto it_ok = col_buffers.emplace(
2129  col_id,
2130  std::unique_ptr<uint8_t[]>(
2131  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2132  // the buffer
2133  CHECK(it_ok.second);
2134  }
2135  col_descriptors.push_back(cd);
2136  col_ids.push_back(col_id);
2137  }
2138  size_t col_idx = 0;
2140  insert_data.databaseId = cat_.getCurrentDB().dbId;
2141  insert_data.tableId = table_id;
2142  int64_t int_col_val{0};
2143  for (auto target_entry : targets) {
2144  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2145  if (!col_cv) {
2146  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2147  CHECK(col_cast);
2148  CHECK_EQ(kCAST, col_cast->get_optype());
2149  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2150  }
2151  CHECK(col_cv);
2152  const auto cd = col_descriptors[col_idx];
2153  auto col_datum = col_cv->get_constval();
2154  auto col_type = cd->columnType.get_type();
2155  uint8_t* col_data_bytes{nullptr};
2156  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2157  (!cd->columnType.is_string() ||
2158  cd->columnType.get_compression() == kENCODING_DICT)) {
2159  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2160  CHECK(col_data_bytes_it != col_buffers.end());
2161  col_data_bytes = col_data_bytes_it->second.get();
2162  }
2163  switch (col_type) {
2164  case kBOOLEAN: {
2165  auto col_data = col_data_bytes;
2166  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2167  : (col_datum.boolval ? 1 : 0);
2168  break;
2169  }
2170  case kTINYINT: {
2171  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2172  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2173  : col_datum.tinyintval;
2174  int_col_val = col_datum.tinyintval;
2175  break;
2176  }
2177  case kSMALLINT: {
2178  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2179  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2180  : col_datum.smallintval;
2181  int_col_val = col_datum.smallintval;
2182  break;
2183  }
2184  case kINT: {
2185  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2186  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2187  : col_datum.intval;
2188  int_col_val = col_datum.intval;
2189  break;
2190  }
2191  case kBIGINT:
2192  case kDECIMAL:
2193  case kNUMERIC: {
2194  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2195  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2196  : col_datum.bigintval;
2197  int_col_val = col_datum.bigintval;
2198  break;
2199  }
2200  case kFLOAT: {
2201  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2202  *col_data = col_datum.floatval;
2203  break;
2204  }
2205  case kDOUBLE: {
2206  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2207  *col_data = col_datum.doubleval;
2208  break;
2209  }
2210  case kTEXT:
2211  case kVARCHAR:
2212  case kCHAR: {
2213  switch (cd->columnType.get_compression()) {
2214  case kENCODING_NONE:
2215  str_col_buffers[col_ids[col_idx]].push_back(
2216  col_datum.stringval ? *col_datum.stringval : "");
2217  break;
2218  case kENCODING_DICT: {
2219  switch (cd->columnType.get_size()) {
2220  case 1:
2221  int_col_val = insert_one_dict_str(
2222  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2223  break;
2224  case 2:
2225  int_col_val = insert_one_dict_str(
2226  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2227  break;
2228  case 4:
2229  int_col_val = insert_one_dict_str(
2230  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2231  break;
2232  default:
2233  CHECK(false);
2234  }
2235  break;
2236  }
2237  default:
2238  CHECK(false);
2239  }
2240  break;
2241  }
2242  case kTIME:
2243  case kTIMESTAMP:
2244  case kDATE: {
2245  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2246  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2247  : col_datum.bigintval;
2248  break;
2249  }
2250  case kARRAY: {
2251  const auto is_null = col_cv->get_is_null();
2252  const auto size = cd->columnType.get_size();
2253  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2254  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2255  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2256  if (is_null && !is_point_coords) {
2257  if (size > 0) {
2258  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2259  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2260  throw std::runtime_error("Column " + cd->columnName +
2261  " doesn't accept NULL values");
2262  }
2263  int8_t* buf = (int8_t*)checked_malloc(size);
2264  put_null_array(static_cast<void*>(buf), elem_ti, "");
2265  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2266  p += elem_ti.get_size()) {
2267  put_null(static_cast<void*>(p), elem_ti, "");
2268  }
2269  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2270  } else {
2271  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2272  }
2273  break;
2274  }
2275  const auto l = col_cv->get_value_list();
2276  size_t len = l.size() * elem_ti.get_size();
2277  if (size > 0 && static_cast<size_t>(size) != len) {
2278  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2279  std::to_string(size / elem_ti.get_size()) +
2280  " values, " + "received " + std::to_string(l.size()));
2281  }
2282  if (elem_ti.is_string()) {
2283  CHECK(kENCODING_DICT == elem_ti.get_compression());
2284  CHECK(4 == elem_ti.get_size());
2285 
2286  int8_t* buf = (int8_t*)checked_malloc(len);
2287  int32_t* p = reinterpret_cast<int32_t*>(buf);
2288 
2289  int elemIndex = 0;
2290  for (auto& e : l) {
2291  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2292  CHECK(c);
2293 
2294  int_col_val = insert_one_dict_str(
2295  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2296 
2297  elemIndex++;
2298  }
2299  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2300 
2301  } else {
2302  int8_t* buf = (int8_t*)checked_malloc(len);
2303  int8_t* p = buf;
2304  for (auto& e : l) {
2305  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2306  CHECK(c);
2307  p = appendDatum(p, c->get_constval(), elem_ti);
2308  }
2309  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2310  }
2311  break;
2312  }
2313  case kPOINT:
2314  case kLINESTRING:
2315  case kPOLYGON:
2316  case kMULTIPOLYGON:
2317  str_col_buffers[col_ids[col_idx]].push_back(
2318  col_datum.stringval ? *col_datum.stringval : "");
2319  break;
2320  default:
2321  CHECK(false);
2322  }
2323  ++col_idx;
2324  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2325  const auto shard_count = shard_tables.size();
2326  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2327  shard = shard_tables[shard_idx];
2328  }
2329  }
2330  for (const auto& kv : col_buffers) {
2331  insert_data.columnIds.push_back(kv.first);
2332  DataBlockPtr p;
2333  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2334  insert_data.data.push_back(p);
2335  }
2336  for (auto& kv : str_col_buffers) {
2337  insert_data.columnIds.push_back(kv.first);
2338  DataBlockPtr p;
2339  p.stringsPtr = &kv.second;
2340  insert_data.data.push_back(p);
2341  }
2342  for (auto& kv : arr_col_buffers) {
2343  insert_data.columnIds.push_back(kv.first);
2344  DataBlockPtr p;
2345  p.arraysPtr = &kv.second;
2346  insert_data.data.push_back(p);
2347  }
2348  insert_data.numRows = 1;
2349  if (shard) {
2350  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2351  } else {
2352  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2353  }
2354 
2355  // Ensure checkpoint happens across all shards, if not in distributed
2356  // mode (aggregator handles checkpointing in distributed mode)
2357  if (!g_cluster &&
2358  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2359  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2360  }
2361 
2362  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2365  executor_->getRowSetMemoryOwner(),
2366  executor_);
2367  std::vector<TargetMetaInfo> empty_targets;
2368  return {rs, empty_targets};
2369 }
2370 
2371 namespace {
2372 
2373 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2374 // the executor interface to use the collation directly and remove this conversion.
2375 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2376  std::list<Analyzer::OrderEntry> result;
2377  for (size_t i = 0; i < sort->collationCount(); ++i) {
2378  const auto sort_field = sort->getCollation(i);
2379  result.emplace_back(sort_field.getField() + 1,
2380  sort_field.getSortDir() == SortDirection::Descending,
2381  sort_field.getNullsPosition() == NullSortedPosition::First);
2382  }
2383  return result;
2384 }
2385 
2386 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2387  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2388  if (aggregate) {
2389  return 0;
2390  }
2391  const auto compound = dynamic_cast<const RelCompound*>(ra);
2392  return (compound && compound->isAggregate()) ? 0 : limit;
2393 }
2394 
2395 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2396  return !order_entries.empty() && order_entries.front().is_desc;
2397 }
2398 
2399 } // namespace
2400 
2402  const CompilationOptions& co,
2403  const ExecutionOptions& eo,
2404  RenderInfo* render_info,
2405  const int64_t queue_time_ms) {
2406  auto timer = DEBUG_TIMER(__func__);
2408  const auto source = sort->getInput(0);
2409  const bool is_aggregate = node_is_aggregate(source);
2410  auto it = leaf_results_.find(sort->getId());
2411  if (it != leaf_results_.end()) {
2412  // Add any transient string literals to the sdp on the agg
2413  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2415  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2416  // Handle push-down for LIMIT for multi-node
2417  auto& aggregated_result = it->second;
2418  auto& result_rows = aggregated_result.rs;
2419  const size_t limit = sort->getLimit();
2420  const size_t offset = sort->getOffset();
2421  const auto order_entries = get_order_entries(sort);
2422  if (limit || offset) {
2423  if (!order_entries.empty()) {
2424  result_rows->sort(order_entries, limit + offset);
2425  }
2426  result_rows->dropFirstN(offset);
2427  if (limit) {
2428  result_rows->keepFirstN(limit);
2429  }
2430  }
2431  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2432  sort->setOutputMetainfo(aggregated_result.targets_meta);
2433  return result;
2434  }
2435 
2436  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2437  bool is_desc{false};
2438 
2439  auto execute_sort_query = [this,
2440  sort,
2441  &source,
2442  &is_aggregate,
2443  &eo,
2444  &co,
2445  render_info,
2446  queue_time_ms,
2447  &groupby_exprs,
2448  &is_desc]() -> ExecutionResult {
2449  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2450  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2451  ExecutionOptions eo_copy = {
2453  eo.allow_multifrag,
2454  eo.just_explain,
2455  eo.allow_loop_joins,
2456  eo.with_watchdog,
2457  eo.jit_debug,
2458  eo.just_validate || sort->isEmptyResult(),
2459  eo.with_dynamic_watchdog,
2460  eo.dynamic_watchdog_time_limit,
2461  eo.find_push_down_candidates,
2462  eo.just_calcite_explain,
2463  eo.gpu_input_mem_limit_percent,
2464  eo.allow_runtime_query_interrupt,
2465  eo.pending_query_interrupt_freq,
2466  eo.executor_type,
2467  };
2468 
2469  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2470  auto source_result = executeWorkUnit(source_work_unit,
2471  source->getOutputMetainfo(),
2472  is_aggregate,
2473  co,
2474  eo_copy,
2475  render_info,
2476  queue_time_ms);
2477  if (render_info && render_info->isPotentialInSituRender()) {
2478  return source_result;
2479  }
2480  if (source_result.isFilterPushDownEnabled()) {
2481  return source_result;
2482  }
2483  auto rows_to_sort = source_result.getRows();
2484  if (eo.just_explain) {
2485  return {rows_to_sort, {}};
2486  }
2487  const size_t limit = sort->getLimit();
2488  const size_t offset = sort->getOffset();
2489  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2490  !use_speculative_top_n(source_work_unit.exe_unit,
2491  rows_to_sort->getQueryMemDesc())) {
2492  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
2493  limit + offset);
2494  }
2495  if (limit || offset) {
2496  if (g_cluster && sort->collationCount() == 0) {
2497  if (offset >= rows_to_sort->rowCount()) {
2498  rows_to_sort->dropFirstN(offset);
2499  } else {
2500  rows_to_sort->keepFirstN(limit + offset);
2501  }
2502  } else {
2503  rows_to_sort->dropFirstN(offset);
2504  if (limit) {
2505  rows_to_sort->keepFirstN(limit);
2506  }
2507  }
2508  }
2509  return {rows_to_sort, source_result.getTargetsMeta()};
2510  };
2511 
2512  try {
2513  return execute_sort_query();
2514  } catch (const SpeculativeTopNFailed& e) {
2515  CHECK_EQ(size_t(1), groupby_exprs.size());
2516  CHECK(groupby_exprs.front());
2517  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2518  return execute_sort_query();
2519  }
2520 }
2521 
2523  const RelSort* sort,
2524  const ExecutionOptions& eo) {
2525  const auto source = sort->getInput(0);
2526  const size_t limit = sort->getLimit();
2527  const size_t offset = sort->getOffset();
2528  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2529  const size_t scan_total_limit =
2530  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2531  size_t max_groups_buffer_entry_guess{
2532  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2534  const auto order_entries = get_order_entries(sort);
2535  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2536  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2537  const auto& source_exe_unit = source_work_unit.exe_unit;
2538 
2539  // we do not allow sorting geometry or array types
2540  for (auto order_entry : order_entries) {
2541  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2542  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2543  const auto& ti = get_target_info(te, false);
2544  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2545  throw std::runtime_error(
2546  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2547  }
2548  }
2549 
2550  if (source_exe_unit.groupby_exprs.size() == 1) {
2551  if (!source_exe_unit.groupby_exprs.front()) {
2552  sort_algorithm = SortAlgorithm::StreamingTopN;
2553  } else {
2554  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2555  first_oe_is_desc(order_entries))) {
2556  sort_algorithm = SortAlgorithm::Default;
2557  }
2558  }
2559  }
2560 
2561  sort->setOutputMetainfo(source->getOutputMetainfo());
2562  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2563  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2564  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2565  std::move(source_exe_unit.input_col_descs),
2566  source_exe_unit.simple_quals,
2567  source_exe_unit.quals,
2568  source_exe_unit.join_quals,
2569  source_exe_unit.groupby_exprs,
2570  source_exe_unit.target_exprs,
2571  nullptr,
2572  {sort_info.order_entries, sort_algorithm, limit, offset},
2573  scan_total_limit,
2574  source_exe_unit.use_bump_allocator,
2575  source_exe_unit.union_all,
2576  source_exe_unit.query_state},
2577  source,
2578  max_groups_buffer_entry_guess,
2579  std::move(source_work_unit.query_rewriter),
2580  source_work_unit.input_permutation,
2581  source_work_unit.left_deep_join_input_sizes};
2582 }
2583 
2584 namespace {
2585 
2592 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2593  CHECK(!table_infos.empty());
2594  const auto& first_table = table_infos.front();
2595  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2596  for (const auto& table_info : table_infos) {
2597  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2598  max_num_groups = table_info.info.getNumTuplesUpperBound();
2599  }
2600  }
2601  return std::max(max_num_groups, size_t(1));
2602 }
2603 
2611  for (const auto target_expr : ra_exe_unit.target_exprs) {
2612  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2613  return false;
2614  }
2615  }
2616  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2617  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2618  return true;
2619  }
2620  return false;
2621 }
2622 
2623 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2624  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2625  ra_exe_unit.simple_quals.empty());
2626 }
2627 
2629  const RelAlgExecutionUnit& ra_exe_unit_in,
2630  const std::vector<InputTableInfo>& table_infos,
2631  const Executor* executor,
2632  const ExecutorDeviceType device_type_in,
2633  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2634  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2635  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2636  const auto target_expr = ra_exe_unit.target_exprs[i];
2637  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2638  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2639  continue;
2640  }
2641  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2642  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2643  CHECK(arg);
2644  const auto& arg_ti = arg->get_type_info();
2645  // Avoid calling getExpressionRange for variable length types (string and array),
2646  // it'd trigger an assertion since that API expects to be called only for types
2647  // for which the notion of range is well-defined. A bit of a kludge, but the
2648  // logic to reject these types anyway is at lower levels in the stack and not
2649  // really worth pulling into a separate function for now.
2650  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2651  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2652  continue;
2653  }
2654  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2655  if (arg_range.getType() != ExpressionRangeType::Integer) {
2656  continue;
2657  }
2658  // When running distributed, the threshold for using the precise implementation
2659  // must be consistent across all leaves, otherwise we could have a mix of precise
2660  // and approximate bitmaps and we cannot aggregate them.
2661  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2662  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2663  const auto sub_bitmap_count =
2664  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2665  int64_t approx_bitmap_sz_bits{0};
2666  const auto error_rate =
2667  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2668  if (error_rate) {
2669  CHECK(error_rate->get_type_info().get_type() == kINT);
2670  CHECK_GE(error_rate->get_constval().intval, 1);
2671  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2672  } else {
2673  approx_bitmap_sz_bits = g_hll_precision_bits;
2674  }
2675  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2676  arg_range.getIntMin(),
2677  approx_bitmap_sz_bits,
2678  true,
2679  device_type,
2680  sub_bitmap_count};
2681  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2682  arg_range.getIntMin(),
2683  bitmap_sz_bits,
2684  false,
2685  device_type,
2686  sub_bitmap_count};
2687  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2688  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2689  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2690  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2691  target_exprs_owned.push_back(precise_count_distinct);
2692  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2693  }
2694  }
2695  return ra_exe_unit;
2696 }
2697 
2699  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2700  const std::vector<TargetMetaInfo>& targets_meta) {
2701  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2702  render_info.targets.clear();
2703  for (size_t i = 0; i < targets_meta.size(); ++i) {
2704  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2705  targets_meta[i].get_resname(),
2706  work_unit_target_exprs[i]->get_shared_ptr(),
2707  false));
2708  }
2709 }
2710 
2711 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2712  const CompilationOptions& co,
2713  const ExecutionOptions& eo) {
2715  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2716 }
2717 
2718 } // namespace
2719 
2721  const RelAlgExecutor::WorkUnit& work_unit,
2722  const std::vector<TargetMetaInfo>& targets_meta,
2723  const bool is_agg,
2724  const CompilationOptions& co_in,
2725  const ExecutionOptions& eo,
2726  RenderInfo* render_info,
2727  const int64_t queue_time_ms,
2728  const std::optional<size_t> previous_count) {
2730  auto timer = DEBUG_TIMER(__func__);
2731 
2732  auto co = co_in;
2733  ColumnCacheMap column_cache;
2734  if (is_window_execution_unit(work_unit.exe_unit)) {
2736  throw std::runtime_error("Window functions support is disabled");
2737  }
2739  co.allow_lazy_fetch = false;
2740  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2741  }
2742  if (!eo.just_explain && eo.find_push_down_candidates) {
2743  // find potential candidates:
2744  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2745  if (!selected_filters.empty() || eo.just_calcite_explain) {
2746  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2747  }
2748  }
2749  if (render_info && render_info->isPotentialInSituRender()) {
2750  co.allow_lazy_fetch = false;
2751  }
2752  const auto body = work_unit.body;
2753  CHECK(body);
2754  auto it = leaf_results_.find(body->getId());
2755  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2756  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2757  if (it != leaf_results_.end()) {
2759  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2760  auto& aggregated_result = it->second;
2761  auto& result_rows = aggregated_result.rs;
2762  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2763  body->setOutputMetainfo(aggregated_result.targets_meta);
2764  if (render_info) {
2765  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2766  }
2767  return result;
2768  }
2769  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2770 
2772  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2773  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2774  if (is_window_execution_unit(ra_exe_unit)) {
2775  CHECK_EQ(table_infos.size(), size_t(1));
2776  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2777  max_groups_buffer_entry_guess =
2778  table_infos.front().info.fragments.front().getNumTuples();
2779  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2780  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2781  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2782  ra_exe_unit.scan_limit = *previous_count;
2783  } else {
2784  // TODO(adb): enable bump allocator path for render queries
2785  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2786  ra_exe_unit.scan_limit = 0;
2787  ra_exe_unit.use_bump_allocator = true;
2788  } else if (eo.executor_type == ::ExecutorType::Extern) {
2789  ra_exe_unit.scan_limit = 0;
2790  } else if (!eo.just_explain) {
2791  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2792  if (filter_count_all) {
2793  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2794  }
2795  }
2796  }
2797  }
2798  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2799  co.device_type,
2801  nullptr,
2802  executor_),
2803  {}};
2804 
2805  auto execute_and_handle_errors =
2806  [&](const auto max_groups_buffer_entry_guess_in,
2807  const bool has_cardinality_estimation) -> ExecutionResult {
2808  // Note that the groups buffer entry guess may be modified during query execution.
2809  // Create a local copy so we can track those changes if we need to attempt a retry
2810  // due to OOM
2811  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2812  try {
2813  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2814  is_agg,
2815  table_infos,
2816  ra_exe_unit,
2817  co,
2818  eo,
2819  cat_,
2820  render_info,
2821  has_cardinality_estimation,
2822  column_cache),
2823  targets_meta};
2824  } catch (const QueryExecutionError& e) {
2826  return handleOutOfMemoryRetry(
2827  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2828  targets_meta,
2829  is_agg,
2830  co,
2831  eo,
2832  render_info,
2834  queue_time_ms);
2835  }
2836  };
2837 
2838  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2839  try {
2840  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2841  auto card = cached_cardinality.second;
2842  if (cached_cardinality.first && card >= 0) {
2843  result = execute_and_handle_errors(card, true);
2844  } else {
2845  result = execute_and_handle_errors(
2846  max_groups_buffer_entry_guess,
2848  }
2849  } catch (const CardinalityEstimationRequired& e) {
2850  // check the cardinality cache
2851  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2852  auto card = cached_cardinality.second;
2853  if (cached_cardinality.first && card >= 0) {
2854  result = execute_and_handle_errors(card, true);
2855  } else {
2856  const auto estimated_groups_buffer_entry_guess =
2857  2 * std::min(groups_approx_upper_bound(table_infos),
2858  getNDVEstimation(work_unit, e.range(), is_agg, co, eo));
2859  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2860  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2861  if (!(eo.just_validate || eo.just_explain)) {
2862  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
2863  }
2864  }
2865  }
2866 
2867  result.setQueueTime(queue_time_ms);
2868  if (render_info) {
2869  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2870  if (render_info->isPotentialInSituRender()) {
2871  // return an empty result (with the same queue time, and zero render time)
2872  return {std::make_shared<ResultSet>(
2873  queue_time_ms,
2874  0,
2875  executor_->row_set_mem_owner_
2876  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2877  : nullptr),
2878  {}};
2879  }
2880  }
2881  return result;
2882 }
2883 
2884 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
2885  const bool is_agg,
2886  const CompilationOptions& co,
2887  const ExecutionOptions& eo) {
2888  const auto count =
2889  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2890  kCOUNT,
2891  nullptr,
2892  false,
2893  nullptr);
2894  const auto count_all_exe_unit =
2895  create_count_all_execution_unit(work_unit.exe_unit, count);
2896  size_t one{1};
2897  ResultSetPtr count_all_result;
2898  try {
2899  ColumnCacheMap column_cache;
2900  count_all_result =
2901  executor_->executeWorkUnit(one,
2902  is_agg,
2903  get_table_infos(work_unit.exe_unit, executor_),
2904  count_all_exe_unit,
2905  co,
2906  eo,
2907  cat_,
2908  nullptr,
2909  false,
2910  column_cache);
2911  } catch (const std::exception& e) {
2912  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2913  return std::nullopt;
2914  }
2915  const auto count_row = count_all_result->getNextRow(false, false);
2916  CHECK_EQ(size_t(1), count_row.size());
2917  const auto& count_tv = count_row.front();
2918  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2919  CHECK(count_scalar_tv);
2920  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2921  CHECK(count_ptr);
2922  CHECK_GE(*count_ptr, 0);
2923  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2924  return std::max(count_upper_bound, size_t(1));
2925 }
2926 
2927 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2928  const auto& ra_exe_unit = work_unit.exe_unit;
2929  if (ra_exe_unit.input_descs.size() != 1) {
2930  return false;
2931  }
2932  const auto& table_desc = ra_exe_unit.input_descs.front();
2933  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2934  return false;
2935  }
2936  const int table_id = table_desc.getTableId();
2937  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
2938  const auto comp_expr =
2939  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2940  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2941  return false;
2942  }
2943  const auto lhs = comp_expr->get_left_operand();
2944  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2945  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2946  return false;
2947  }
2948  const auto rhs = comp_expr->get_right_operand();
2949  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2950  if (!rhs_const) {
2951  return false;
2952  }
2953  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2954  if (cd->isVirtualCol) {
2955  CHECK_EQ("rowid", cd->columnName);
2956  return true;
2957  }
2958  }
2959  return false;
2960 }
2961 
2963  const RelAlgExecutor::WorkUnit& work_unit,
2964  const std::vector<TargetMetaInfo>& targets_meta,
2965  const bool is_agg,
2966  const CompilationOptions& co,
2967  const ExecutionOptions& eo,
2968  RenderInfo* render_info,
2969  const bool was_multifrag_kernel_launch,
2970  const int64_t queue_time_ms) {
2971  // Disable the bump allocator
2972  // Note that this will have basically the same affect as using the bump allocator for
2973  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2974  // path and the bump allocator path for kernel per fragment execution.
2975  auto ra_exe_unit_in = work_unit.exe_unit;
2976  ra_exe_unit_in.use_bump_allocator = false;
2977 
2978  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2979  co.device_type,
2981  nullptr,
2982  executor_),
2983  {}};
2984 
2985  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2986  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2987  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2988  false,
2989  false,
2990  eo.allow_loop_joins,
2991  eo.with_watchdog,
2992  eo.jit_debug,
2993  false,
2996  false,
2997  false,
2999  false,
3001  eo.executor_type,
3003 
3004  if (was_multifrag_kernel_launch) {
3005  try {
3006  // Attempt to retry using the kernel per fragment path. The smaller input size
3007  // required may allow the entire kernel to execute in GPU memory.
3008  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3009  "kernels disabled.";
3010  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3011  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3012  ColumnCacheMap column_cache;
3013  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3014  is_agg,
3015  table_infos,
3016  ra_exe_unit,
3017  co,
3018  eo_no_multifrag,
3019  cat_,
3020  nullptr,
3021  true,
3022  column_cache),
3023  targets_meta};
3024  result.setQueueTime(queue_time_ms);
3025  } catch (const QueryExecutionError& e) {
3027  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3028  }
3029  }
3030 
3031  if (render_info) {
3032  render_info->setForceNonInSituData();
3033  }
3034 
3035  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3036  // Only reset the group buffer entry guess if we ran out of slots, which
3037  // suggests a
3038  // highly pathological input which prevented a good estimation of distinct tuple
3039  // count. For projection queries, this will force a per-fragment scan limit, which is
3040  // compatible with the CPU path
3041  VLOG(1) << "Resetting max groups buffer entry guess.";
3042  max_groups_buffer_entry_guess = 0;
3043 
3044  int iteration_ctr = -1;
3045  while (true) {
3046  iteration_ctr++;
3048  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3049  ColumnCacheMap column_cache;
3050  try {
3051  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3052  is_agg,
3053  table_infos,
3054  ra_exe_unit,
3055  co_cpu,
3056  eo_no_multifrag,
3057  cat_,
3058  nullptr,
3059  true,
3060  column_cache),
3061  targets_meta};
3062  } catch (const QueryExecutionError& e) {
3063  // Ran out of slots
3064  if (e.getErrorCode() < 0) {
3065  // Even the conservative guess failed; it should only happen when we group
3066  // by a huge cardinality array. Maybe we should throw an exception instead?
3067  // Such a heavy query is entirely capable of exhausting all the host memory.
3068  CHECK(max_groups_buffer_entry_guess);
3069  // Only allow two iterations of increasingly large entry guesses up to a maximum
3070  // of 512MB per column per kernel
3071  if (g_enable_watchdog || iteration_ctr > 1) {
3072  throw std::runtime_error("Query ran out of output slots in the result");
3073  }
3074  max_groups_buffer_entry_guess *= 2;
3075  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3076  "groups buffer entry "
3077  "guess equal to "
3078  << max_groups_buffer_entry_guess;
3079  } else {
3081  }
3082  continue;
3083  }
3084  result.setQueueTime(queue_time_ms);
3085  return result;
3086  }
3087  return result;
3088 }
3089 
3090 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3091  LOG(ERROR) << "Query execution failed with error "
3092  << getErrorMessageFromCode(error_code);
3093  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3094  // We ran out of GPU memory, this doesn't count as an error if the query is
3095  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3096  // --allow-cpu-retry.
3097  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3098  if (!g_allow_cpu_retry) {
3099  throw std::runtime_error(
3100  "Query ran out of GPU memory, unable to automatically retry on CPU");
3101  }
3102  return;
3103  }
3104  throw std::runtime_error(getErrorMessageFromCode(error_code));
3105 }
3106 
3107 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3108  if (error_code < 0) {
3109  return "Ran out of slots in the query output buffer";
3110  }
3111  switch (error_code) {
3113  return "Division by zero";
3115  return "Query couldn't keep the entire working set of columns in GPU memory";
3117  return "Self joins not supported yet";
3119  return "Not enough host memory to execute the query";
3121  return "Overflow or underflow";
3123  return "Query execution has exceeded the time limit";
3125  return "Query execution has been interrupted";
3127  return "Columnar conversion not supported for variable length types";
3129  return "Too many literals in the query";
3131  return "NONE ENCODED String types are not supported as input result set.";
3133  return "Not enough OpenGL memory to render the query results";
3135  return "Streaming-Top-N not supported in Render Query";
3137  return "Multiple distinct values encountered";
3138  case Executor::ERR_GEOS:
3139  return "Geos call failure";
3140  }
3141  return "Other error: code " + std::to_string(error_code);
3142 }
3143 
3145  const SortInfo& sort_info,
3146  const ExecutionOptions& eo) {
3147  const auto compound = dynamic_cast<const RelCompound*>(node);
3148  if (compound) {
3149  return createCompoundWorkUnit(compound, sort_info, eo);
3150  }
3151  const auto project = dynamic_cast<const RelProject*>(node);
3152  if (project) {
3153  return createProjectWorkUnit(project, sort_info, eo);
3154  }
3155  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3156  if (aggregate) {
3157  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3158  }
3159  const auto filter = dynamic_cast<const RelFilter*>(node);
3160  if (filter) {
3161  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3162  }
3163  LOG(FATAL) << "Unhandled node type: " << node->toString();
3164  return {};
3165 }
3166 
3167 namespace {
3168 
3170  auto sink = get_data_sink(ra);
3171  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3172  return join->getJoinType();
3173  }
3174  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3175  return JoinType::INNER;
3176  }
3177 
3178  return JoinType::INVALID;
3179 }
3180 
3181 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3182  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3183  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3184  return nullptr;
3185  }
3186  const auto equi_join_condition =
3187  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3188  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3189  return nullptr;
3190  }
3191  const auto both_are_null_condition =
3192  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3193  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3194  both_are_null_condition->size() != 2) {
3195  return nullptr;
3196  }
3197  const auto lhs_is_null =
3198  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3199  const auto rhs_is_null =
3200  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3201  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3202  rhs_is_null->getOperator() != kISNULL) {
3203  return nullptr;
3204  }
3205  CHECK_EQ(size_t(1), lhs_is_null->size());
3206  CHECK_EQ(size_t(1), rhs_is_null->size());
3207  CHECK_EQ(size_t(2), equi_join_condition->size());
3208  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3209  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3210  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3211  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3212  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3213  return nullptr;
3214  }
3215  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3216  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3217  RexDeepCopyVisitor deep_copy_visitor;
3218  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3219  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3220  eq_operands.emplace_back(lhs_op_copy.release());
3221  eq_operands.emplace_back(rhs_op_copy.release());
3222  return boost::make_unique<const RexOperator>(
3223  kBW_EQ, eq_operands, equi_join_condition->getType());
3224  }
3225  return nullptr;
3226 }
3227 
3228 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3229  const RexScalar* scalar) {
3230  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3231  if (condition && condition->getOperator() == kAND) {
3232  CHECK_GE(condition->size(), size_t(2));
3233  auto acc = get_bitwise_equals(condition->getOperand(0));
3234  if (!acc) {
3235  return nullptr;
3236  }
3237  for (size_t i = 1; i < condition->size(); ++i) {
3238  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3239  and_operands.emplace_back(std::move(acc));
3240  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3241  acc =
3242  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3243  }
3244  return acc;
3245  }
3246  return get_bitwise_equals(scalar);
3247 }
3248 
3249 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3250  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3251  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3252  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3253  ++nesting_level) {
3254  if (left_deep_join->getOuterCondition(nesting_level)) {
3255  join_types[nesting_level - 1] = JoinType::LEFT;
3256  }
3257  }
3258  return join_types;
3259 }
3260 
3261 template <class RA>
3262 std::vector<size_t> do_table_reordering(
3263  std::vector<InputDescriptor>& input_descs,
3264  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3265  const JoinQualsPerNestingLevel& left_deep_join_quals,
3266  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3267  const RA* node,
3268  const std::vector<InputTableInfo>& query_infos,
3269  const Executor* executor) {
3270  if (g_cluster) {
3271  // Disable table reordering in distributed mode. The aggregator does not have enough
3272  // information to break ties
3273  return {};
3274  }
3275  const auto& cat = *executor->getCatalog();
3276  for (const auto& table_info : query_infos) {
3277  if (table_info.table_id < 0) {
3278  continue;
3279  }
3280  const auto td = cat.getMetadataForTable(table_info.table_id);
3281  CHECK(td);
3282  if (table_is_replicated(td)) {
3283  return {};
3284  }
3285  }
3286  const auto input_permutation =
3287  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3288  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3289  std::tie(input_descs, input_col_descs, std::ignore) =
3290  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3291  return input_permutation;
3292 }
3293 
3295  const RelLeftDeepInnerJoin* left_deep_join) {
3296  std::vector<size_t> input_sizes;
3297  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3298  const auto inputs = get_node_output(left_deep_join->getInput(i));
3299  input_sizes.push_back(inputs.size());
3300  }
3301  return input_sizes;
3302 }
3303 
3304 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3305  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3306  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3307  for (const auto& qual : quals) {
3308  const auto rewritten_qual = rewrite_expr(qual.get());
3309  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3310  }
3311  return rewritten_quals;
3312 }
3313 
3314 } // namespace
3315 
3317  const RelCompound* compound,
3318  const SortInfo& sort_info,
3319  const ExecutionOptions& eo) {
3320  std::vector<InputDescriptor> input_descs;
3321  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3322  auto input_to_nest_level = get_input_nest_levels(compound, {});
3323  std::tie(input_descs, input_col_descs, std::ignore) =
3324  get_input_desc(compound, input_to_nest_level, {}, cat_);
3325  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3326  const auto query_infos = get_table_infos(input_descs, executor_);
3327  CHECK_EQ(size_t(1), compound->inputCount());
3328  const auto left_deep_join =
3329  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3330  JoinQualsPerNestingLevel left_deep_join_quals;
3331  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3332  : std::vector<JoinType>{get_join_type(compound)};
3333  std::vector<size_t> input_permutation;
3334  std::vector<size_t> left_deep_join_input_sizes;
3335  if (left_deep_join) {
3336  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3337  left_deep_join_quals = translateLeftDeepJoinFilter(
3338  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3340  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3341  join_types.end()) {
3342  input_permutation = do_table_reordering(input_descs,
3343  input_col_descs,
3344  left_deep_join_quals,
3345  input_to_nest_level,
3346  compound,
3347  query_infos,
3348  executor_);
3349  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3350  std::tie(input_descs, input_col_descs, std::ignore) =
3351  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3352  left_deep_join_quals = translateLeftDeepJoinFilter(
3353  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3354  }
3355  }
3356  RelAlgTranslator translator(cat_,
3357  query_state_,
3358  executor_,
3359  input_to_nest_level,
3360  join_types,
3361  now_,
3362  eo.just_explain);
3363  const auto scalar_sources =
3364  translate_scalar_sources(compound, translator, eo.executor_type);
3365  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3366  const auto quals_cf = translate_quals(compound, translator);
3367  const auto target_exprs = translate_targets(target_exprs_owned_,
3368  scalar_sources,
3369  groupby_exprs,
3370  compound,
3371  translator,
3372  eo.executor_type);
3373  CHECK_EQ(compound->size(), target_exprs.size());
3374  const RelAlgExecutionUnit exe_unit = {input_descs,
3375  input_col_descs,
3376  quals_cf.simple_quals,
3377  rewrite_quals(quals_cf.quals),
3378  left_deep_join_quals,
3379  groupby_exprs,
3380  target_exprs,
3381  nullptr,
3382  sort_info,
3383  0,
3384  false,
3385  std::nullopt,
3386  query_state_};
3387  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3388  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3389  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3390  compound->setOutputMetainfo(targets_meta);
3391  return {rewritten_exe_unit,
3392  compound,
3394  std::move(query_rewriter),
3395  input_permutation,
3396  left_deep_join_input_sizes};
3397 }
3398 
3399 namespace {
3400 
3401 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3402  CHECK(qual_expr);
3403  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3404  if (!bin_oper || bin_oper->getOperator() != kAND) {
3405  return {qual_expr};
3406  }
3407  CHECK_GE(bin_oper->size(), size_t(2));
3408  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3409  for (size_t i = 1; i < bin_oper->size(); ++i) {
3410  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3411  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3412  }
3413  return lhs_cf;
3414 }
3415 
3416 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3417  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3418  const SQLOps sql_op) {
3419  CHECK(!factors.empty());
3420  auto acc = factors.front();
3421  for (size_t i = 1; i < factors.size(); ++i) {
3422  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3423  }
3424  return acc;
3425 }
3426 
3427 template <class QualsList>
3428 bool list_contains_expression(const QualsList& haystack,
3429  const std::shared_ptr<Analyzer::Expr>& needle) {
3430  for (const auto& qual : haystack) {
3431  if (*qual == *needle) {
3432  return true;
3433  }
3434  }
3435  return false;
3436 }
3437 
3438 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3439 // evaluations of `p` and allows use of the original form in joins if `p`
3440 // can be used for hash joins.
3441 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3442  const std::shared_ptr<Analyzer::Expr>& expr) {
3443  const auto expr_terms = qual_to_disjunctive_form(expr);
3444  CHECK_GE(expr_terms.size(), size_t(1));
3445  const auto& first_term = expr_terms.front();
3446  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3447  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3448  // First, collect the conjunctive components common to all the disjunctive components.
3449  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3450  for (const auto& first_term_factor : first_term_factors.quals) {
3451  bool is_common =
3452  expr_terms.size() > 1; // Only report common factors for disjunction.
3453  for (size_t i = 1; i < expr_terms.size(); ++i) {
3454  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3455  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3456  is_common = false;
3457  break;
3458  }
3459  }
3460  if (is_common) {
3461  common_factors.push_back(first_term_factor);
3462  }
3463  }
3464  if (common_factors.empty()) {
3465  return expr;
3466  }
3467  // Now that the common expressions are known, collect the remaining expressions.
3468  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3469  for (const auto& term : expr_terms) {
3470  const auto term_cf = qual_to_conjunctive_form(term);
3471  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3472  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3473  for (const auto& qual : term_cf.quals) {
3474  if (!list_contains_expression(common_factors, qual)) {
3475  remaining_quals.push_back(qual);
3476  }
3477  }
3478  if (!remaining_quals.empty()) {
3479  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3480  }
3481  }
3482  // Reconstruct the expression with the transformation applied.
3483  const auto common_expr = build_logical_expression(common_factors, kAND);
3484  if (remaining_terms.empty()) {
3485  return common_expr;
3486  }
3487  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3488  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3489 }
3490 
3491 } // namespace
3492 
3493 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3494  const RexScalar* join_condition,
3495  const std::vector<JoinType>& join_types,
3496  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3497  const bool just_explain) const {
3498  RelAlgTranslator translator(
3499  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3500  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3501  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3502  for (const auto rex_condition_component : rex_condition_cf) {
3503  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3504  const auto join_condition =
3506  bw_equals ? bw_equals.get() : rex_condition_component));
3507  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3508  join_condition_quals.insert(join_condition_quals.end(),
3509  join_condition_cf.quals.begin(),
3510  join_condition_cf.quals.end());
3511  join_condition_quals.insert(join_condition_quals.end(),
3512  join_condition_cf.simple_quals.begin(),
3513  join_condition_cf.simple_quals.end());
3514  }
3515  return combine_equi_join_conditions(join_condition_quals);
3516 }
3517 
3518 // Translate left deep join filter and separate the conjunctive form qualifiers
3519 // per nesting level. The code generated for hash table lookups on each level
3520 // must dominate its uses in deeper nesting levels.
3522  const RelLeftDeepInnerJoin* join,
3523  const std::vector<InputDescriptor>& input_descs,
3524  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3525  const bool just_explain) {
3526  const auto join_types = left_deep_join_types(join);
3527  const auto join_condition_quals = makeJoinQuals(
3528  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3529  MaxRangeTableIndexVisitor rte_idx_visitor;
3530  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3531  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3532  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3533  const auto outer_condition = join->getOuterCondition(rte_idx);
3534  if (outer_condition) {
3535  result[rte_idx - 1].quals =
3536  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3537  CHECK_LE(rte_idx, join_types.size());
3538  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3539  result[rte_idx - 1].type = JoinType::LEFT;
3540  continue;
3541  }
3542  for (const auto& qual : join_condition_quals) {
3543  if (visited_quals.count(qual)) {
3544  continue;
3545  }
3546  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3547  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3548  const auto it_ok = visited_quals.emplace(qual);
3549  CHECK(it_ok.second);
3550  result[rte_idx - 1].quals.push_back(qual);
3551  }
3552  }
3553  CHECK_LE(rte_idx, join_types.size());
3554  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3555  result[rte_idx - 1].type = JoinType::INNER;
3556  }
3557  return result;
3558 }
3559 
3560 namespace {
3561 
3562 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3563  const RelAlgNode* ra_node,
3564  const size_t nest_level,
3565  const std::vector<TargetMetaInfo>& in_metainfo,
3566  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3567  CHECK_LE(size_t(1), ra_node->inputCount());
3568  CHECK_GE(size_t(2), ra_node->inputCount());
3569  const auto input = ra_node->getInput(nest_level);
3570  const auto it_rte_idx = input_to_nest_level.find(input);
3571  CHECK(it_rte_idx != input_to_nest_level.end());
3572  const int rte_idx = it_rte_idx->second;
3573  const int table_id = table_id_from_ra(input);
3574  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3575  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3576  int input_idx = 0;
3577  for (const auto& input_meta : in_metainfo) {
3578  inputs.push_back(
3579  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3580  table_id,
3581  scan_ra ? input_idx + 1 : input_idx,
3582  rte_idx));
3583  ++input_idx;
3584  }
3585  return inputs;
3586 }
3587 
3588 } // namespace
3589 
3591  const RelAggregate* aggregate,
3592  const SortInfo& sort_info,
3593  const bool just_explain) {
3594  std::vector<InputDescriptor> input_descs;
3595  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3596  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3597  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3598  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3599  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3600  const auto join_type = get_join_type(aggregate);
3601 
3602  RelAlgTranslator translator(cat_,
3603  query_state_,
3604  executor_,
3605  input_to_nest_level,
3606  {join_type},
3607  now_,
3608  just_explain);
3609  CHECK_EQ(size_t(1), aggregate->inputCount());
3610  const auto source = aggregate->getInput(0);
3611  const auto& in_metainfo = source->getOutputMetainfo();
3612  const auto scalar_sources =
3613  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3614  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3615  const auto target_exprs = translate_targets(
3616  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3617  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3618  aggregate->setOutputMetainfo(targets_meta);
3619  return {RelAlgExecutionUnit{input_descs,
3620  input_col_descs,
3621  {},
3622  {},
3623  {},
3624  groupby_exprs,
3625  target_exprs,
3626  nullptr,
3627  sort_info,
3628  0,
3629  false,
3630  std::nullopt,
3631  query_state_},
3632  aggregate,
3634  nullptr};
3635 }
3636 
3638  const RelProject* project,
3639  const SortInfo& sort_info,
3640  const ExecutionOptions& eo) {
3641  std::vector<InputDescriptor> input_descs;
3642  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3643  auto input_to_nest_level = get_input_nest_levels(project, {});
3644  std::tie(input_descs, input_col_descs, std::ignore) =
3645  get_input_desc(project, input_to_nest_level, {}, cat_);
3646  const auto query_infos = get_table_infos(input_descs, executor_);
3647 
3648  const auto left_deep_join =
3649  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3650  JoinQualsPerNestingLevel left_deep_join_quals;
3651  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3652  : std::vector<JoinType>{get_join_type(project)};
3653  std::vector<size_t> input_permutation;
3654  std::vector<size_t> left_deep_join_input_sizes;
3655  if (left_deep_join) {
3656  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3657  const auto query_infos = get_table_infos(input_descs, executor_);
3658  left_deep_join_quals = translateLeftDeepJoinFilter(
3659  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3661  input_permutation = do_table_reordering(input_descs,
3662  input_col_descs,
3663  left_deep_join_quals,
3664  input_to_nest_level,
3665  project,
3666  query_infos,
3667  executor_);
3668  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3669  std::tie(input_descs, input_col_descs, std::ignore) =
3670  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3671  left_deep_join_quals = translateLeftDeepJoinFilter(
3672  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3673  }
3674  }
3675 
3676  RelAlgTranslator translator(cat_,
3677  query_state_,
3678  executor_,
3679  input_to_nest_level,
3680  join_types,
3681  now_,
3682  eo.just_explain);
3683  const auto target_exprs_owned =
3684  translate_scalar_sources(project, translator, eo.executor_type);
3685  target_exprs_owned_.insert(
3686  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3687  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3688 
3689  const RelAlgExecutionUnit exe_unit = {input_descs,
3690  input_col_descs,
3691  {},
3692  {},
3693  left_deep_join_quals,
3694  {nullptr},
3695  target_exprs,
3696  nullptr,
3697  sort_info,
3698  0,
3699  false,
3700  std::nullopt,
3701  query_state_};
3702  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3703  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3704  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3705  project->setOutputMetainfo(targets_meta);
3706  return {rewritten_exe_unit,
3707  project,
3709  std::move(query_rewriter),
3710  input_permutation,
3711  left_deep_join_input_sizes};
3712 }
3713 
3714 namespace {
3715 
3716 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
3717  RelAlgNode const* input_node) {
3718  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
3719  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
3720  const int negative_node_id = -input_node->getId();
3721  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
3722  target_exprs.reserve(tmis.size());
3723  for (size_t i = 0; i < tmis.size(); ++i) {
3724  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
3725  tmis[i].get_type_info(), negative_node_id, i, 0));
3726  }
3727  return target_exprs;
3728 }
3729 
3730 } // namespace
3731 
3733  const RelLogicalUnion* logical_union,
3734  const SortInfo& sort_info,
3735  const ExecutionOptions& eo) {
3736  std::vector<InputDescriptor> input_descs;
3737  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3738  // Map ra input ptr to index (0, 1).
3739  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3740  std::tie(input_descs, input_col_descs, std::ignore) =
3741  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3742  const auto query_infos = get_table_infos(input_descs, executor_);
3743  auto const max_num_tuples =
3744  std::accumulate(query_infos.cbegin(),
3745  query_infos.cend(),
3746  size_t(0),
3747  [](auto max, auto const& query_info) {
3748  return std::max(max, query_info.info.getNumTuples());
3749  });
3750 
3751  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3752  for (auto& pair : input_to_nest_level) {
3753  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3754  }
3755 
3756  RelAlgTranslator translator(
3757  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3758 
3759  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3760  CHECK(!input_exprs_owned.empty())
3761  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3762  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3763  for (auto& input_expr : input_exprs_owned) {
3764  VLOG(3) << " " << input_expr->toString();
3765  }
3766  target_exprs_owned_.insert(
3767  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3768  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3769 
3770  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3771  << " input_col_descs=" << shared::printContainer(input_col_descs)
3772  << " target_exprs.size()=" << target_exprs.size()
3773  << " max_num_tuples=" << max_num_tuples;
3774 
3775  const RelAlgExecutionUnit exe_unit = {input_descs,
3776  input_col_descs,
3777  {}, // quals_cf.simple_quals,
3778  {}, // rewrite_quals(quals_cf.quals),
3779  {},
3780  {nullptr},
3781  target_exprs,
3782  nullptr,
3783  sort_info,
3784  max_num_tuples,
3785  false,
3786  logical_union->isAll(),
3787  query_state_};
3788  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3789  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3790 
3791  RelAlgNode const* input0 = logical_union->getInput(0);
3792  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3793  logical_union->setOutputMetainfo(
3794  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3795  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3796  logical_union->setOutputMetainfo(
3797  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3798  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3799  logical_union->setOutputMetainfo(
3800  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3801  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
3802  logical_union->setOutputMetainfo(
3803  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3804  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
3805  logical_union->setOutputMetainfo(
3806  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3807  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
3808  logical_union->setOutputMetainfo(
3809  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3810  } else if (dynamic_cast<const RelSort*>(input0)) {
3811  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
3812  } else {
3813  throw QueryNotSupported("Unsupported input type: " + input0->toString());
3814  }
3815  VLOG(3) << "logical_union->getOutputMetainfo()="
3816  << shared::printContainer(logical_union->getOutputMetainfo())
3817  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
3818  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
3819 
3820  return {rewritten_exe_unit,
3821  logical_union,
3823  std::move(query_rewriter)};
3824 }
3825 
3827  const RelTableFunction* table_func,
3828  const bool just_explain,
3829  const bool is_gpu) {
3830  std::vector<InputDescriptor> input_descs;
3831  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3832  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3833  std::tie(input_descs, input_col_descs, std::ignore) =
3834  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3835  const auto query_infos = get_table_infos(input_descs, executor_);
3836  RelAlgTranslator translator(
3837  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
3838  const auto input_exprs_owned =
3839  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3840  target_exprs_owned_.insert(
3841  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3842  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3843 
3844  const auto table_function_impl = [=]() {
3845  if (is_gpu) {
3846  try {
3847  return bind_table_function(
3848  table_func->getFunctionName(), input_exprs_owned, is_gpu);
3849  } catch (std::runtime_error& e) {
3850  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
3851  << " Redirecting " << table_func->getFunctionName()
3852  << " to run on CPU.";
3853  throw QueryMustRunOnCpu();
3854  }
3855  } else {
3856  return bind_table_function(
3857  table_func->getFunctionName(), input_exprs_owned, is_gpu);
3858  }
3859  }();
3860 
3861  size_t output_row_sizing_param = 0;
3862  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
3863  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
3864  const auto parameter_index = table_function_impl.getOutputRowSizeParameter();
3865  CHECK_GT(parameter_index, size_t(0));
3866  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3867  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3868  if (!parameter_expr_literal) {
3869  throw std::runtime_error(
3870  "Provided output buffer sizing parameter is not a literal. Only literal "
3871  "values are supported with output buffer sizing configured table "
3872  "functions.");
3873  }
3874  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3875  if (literal_val < 0) {
3876  throw std::runtime_error("Provided output sizing parameter " +
3877  std::to_string(literal_val) +
3878  " must be positive integer.");
3879  }
3880  output_row_sizing_param = static_cast<size_t>(literal_val);
3881  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
3882  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
3883  } else {
3884  UNREACHABLE();
3885  }
3886 
3887  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3888  size_t index = 0;
3889  for (auto input_expr : input_exprs) {
3890  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3891  input_expr->set_type_info(table_function_impl.getInputSQLType(index));
3892  input_col_exprs.push_back(col_var);
3893  }
3894  index++;
3895  }
3896  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3897  std::vector<Analyzer::Expr*> table_func_outputs;
3898  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3899  const auto ti = table_function_impl.getOutputSQLType(i);
3900  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3901  table_func_outputs.push_back(target_exprs_owned_.back().get());
3902  }
3903  const TableFunctionExecutionUnit exe_unit = {
3904  input_descs,
3905  input_col_descs,
3906  input_exprs, // table function inputs
3907  input_col_exprs, // table function column inputs (duplicates w/ above)
3908  table_func_outputs, // table function projected exprs
3909  output_row_sizing_param, // output buffer sizing param
3910  table_function_impl};
3911  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3912  table_func->setOutputMetainfo(targets_meta);
3913  return {exe_unit, table_func};
3914 }
3915 
3916 namespace {
3917 
3918 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3920  const RelAlgTranslator& translator,
3921  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3922  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3923  std::vector<TargetMetaInfo> in_metainfo;
3924  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3925  const auto data_sink_node = get_data_sink(filter);
3926  auto input_it = inputs_owned.begin();
3927  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3928  const auto source = data_sink_node->getInput(nest_level);
3929  const auto scan_source = dynamic_cast<const RelScan*>(source);
3930  if (scan_source) {
3931  CHECK(source->getOutputMetainfo().empty());
3932  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3933  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3934  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3935  }
3936  const auto source_metadata =
3937  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3938  in_metainfo.insert(
3939  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3940  exprs_owned.insert(
3941  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3942  } else {
3943  const auto& source_metadata = source->getOutputMetainfo();
3944  input_it += source_metadata.size();
3945  in_metainfo.insert(
3946  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3947  const auto scalar_sources_owned = synthesize_inputs(
3948  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3949  exprs_owned.insert(
3950  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3951  }
3952  }
3953  return std::make_pair(in_metainfo, exprs_owned);
3954 }
3955 
3956 } // namespace
3957 
3959  const SortInfo& sort_info,
3960  const bool just_explain) {
3961  CHECK_EQ(size_t(1), filter->inputCount());
3962  std::vector<InputDescriptor> input_descs;
3963  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3964  std::vector<TargetMetaInfo> in_metainfo;
3965  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3966  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3967 
3968  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3969  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3970  get_input_desc(filter, input_to_nest_level, {}, cat_);
3971  const auto join_type = get_join_type(filter);
3972  RelAlgTranslator translator(cat_,
3973  query_state_,
3974  executor_,
3975  input_to_nest_level,
3976  {join_type},
3977  now_,
3978  just_explain);
3979  std::tie(in_metainfo, target_exprs_owned) =
3980  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3981  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3982  const auto qual = fold_expr(filter_expr.get());
3983  target_exprs_owned_.insert(
3984  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3985  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3986  filter->setOutputMetainfo(in_metainfo);
3987  const auto rewritten_qual = rewrite_expr(qual.get());
3988  return {{input_descs,
3989  input_col_descs,
3990  {},
3991  {rewritten_qual ? rewritten_qual : qual},
3992  {},
3993  {nullptr},
3994  target_exprs,
3995  nullptr,
3996  sort_info,
3997  0},
3998  filter,
4000  nullptr};
4001 }
4002 
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
JoinType
Definition: sqldefs.h:107
HOST DEVICE int get_size() const
Definition: sqltypes.h:340
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string cat(Ts &&...args)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1191
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:993
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
Definition: sqltypes.h:51
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2889
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, int database_id, const std::shared_ptr< const query_state::QueryState > &query_state_)
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
size_t getIndex() const
bool is_metadata_placeholder(const ChunkMetadata &metadata)
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:74
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:109
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:920
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
bool g_enable_union
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3655
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:98
const RexScalar * getCondition() const
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:334
Definition: sqldefs.h:38
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1669
#define UNREACHABLE()
Definition: Logger.h:241
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:899
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:999
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:995
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:74
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:226
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:330
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
int32_t intval
Definition: sqltypes.h:208
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:287
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:41
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:409
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:996
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:997
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:994
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:191
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:985
const bool allow_multifrag
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:81
const bool just_validate
unsigned getId() const
Classes representing a parse tree.
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:210
const SortInfo sort_info
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:989
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
SortAlgorithm
const double gpu_input_mem_limit_percent
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:96
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:991
const table_functions::TableFunction bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs)
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:992
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
int getDatabaseId() const
Definition: Catalog.h:248
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
bool is_dict_encoded_type() const
Definition: sqltypes.h:522
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:988
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3846
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1460
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1098
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:998
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:986
size_t getTableFuncInputsSize() const
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1556
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:214
int get_result_table_id() const
Definition: Analyzer.h:1626
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:276
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:97
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const std::vector< size_t > outer_fragment_indices
const RexScalar * getProjectAt(const size_t idx) const
Definition: sqltypes.h:54
bool hasInput(const RelAlgNode *needle) const
Definition: sqltypes.h:55
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:269
const bool just_calcite_explain
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:950
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:338
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::shared_ptr< const query_state::QueryState > query_state_
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
Basic constructors and methods of the row set interface.
Datum get_constval() const
Definition: Analyzer.h:335
Definition: sqldefs.h:76
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
const RelAlgNode * getSourceNode() const
bool isAggregate() const
size_t getLimit() const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
Definition: sqltypes.h:43
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:339
const RexScalar * getTableFuncInputAt(const size_t idx) const
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1095
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
const unsigned pending_query_interrupt_freq
bool g_enable_bump_allocator
Definition: Execute.cpp:103
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getInnerCondition() const
virtual std::string toString() const =0
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:499
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:78
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
void cleanupPostExecution()
std::vector< std::string > ColumnNameList
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
Definition: sqltypes.h:47
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
bool is_string() const
Definition: sqltypes.h:487
const size_t inputCount() const
specifies the content in-memory of a row in the table metadata table
const bool allow_runtime_query_interrupt
int8_t * numbersPtr
Definition: sqltypes.h:220
static constexpr char const * FOREIGN_TABLE
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Definition: sqldefs.h:74
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:1627
bool sameTypeInfo(std::vector< TargetMetaInfo > const &lhs, std::vector< TargetMetaInfo > const &rhs)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1451
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:990
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:703
const TableDescriptor * getTableDescriptor() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
Definition: sqldefs.h:72
const std::vector< std::shared_ptr< TargetEntry > > & get_targetlist() const
Definition: Analyzer.h:1610
static size_t getArenaBlockSize()
Definition: Execute.cpp:197
Executor * executor_
int getNestLevel() const
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:245
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:107
static constexpr char const * PARQUET
Definition: ForeignServer.h:36
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1551
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:154
bool g_enable_table_functions
Definition: Execute.cpp:98
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:337
const RexScalar * getScalarSource(const size_t i) const
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)