OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 #include "Parser/ParserNode.h"
31 #include "QueryEngine/RexVisitor.h"
35 #include "Shared/measure.h"
36 #include "Shared/misc.h"
37 #include "Shared/shard_key.h"
38 
39 #include <boost/algorithm/cxx11/any_of.hpp>
40 #include <boost/range/adaptor/reversed.hpp>
41 
42 #include <algorithm>
43 #include <functional>
44 #include <numeric>
45 
47 extern bool g_enable_bump_allocator;
48 bool g_enable_interop{false};
49 bool g_enable_union{false};
50 
51 namespace {
52 
53 bool node_is_aggregate(const RelAlgNode* ra) {
54  const auto compound = dynamic_cast<const RelCompound*>(ra);
55  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
56  return ((compound && compound->isAggregate()) || aggregate);
57 }
58 
59 std::unordered_set<PhysicalInput> get_physical_inputs(
61  const RelAlgNode* ra) {
62  auto phys_inputs = get_physical_inputs(ra);
63  std::unordered_set<PhysicalInput> phys_inputs2;
64  for (auto& phi : phys_inputs) {
65  phys_inputs2.insert(
66  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
67  }
68  return phys_inputs2;
69 }
70 
71 } // namespace
72 
74  const ExecutionOptions& eo) {
76  return 0;
77  }
78 
79  if (eo.just_explain) {
80  return 0;
81  }
82 
84 
85  query_dag_->resetQueryExecutionState();
86  const auto& ra = query_dag_->getRootNode();
87 
88  mapd_shared_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
89  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
90  const auto phys_inputs = get_physical_inputs(cat_, &ra);
91  const auto phys_table_ids = get_physical_table_inputs(&ra);
92  executor_->setCatalog(&cat_);
93  executor_->setupCaching(phys_inputs, phys_table_ids);
94 
95  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
96  auto ed_seq = RaExecutionSequence(&ra);
97 
98  if (!getSubqueries().empty()) {
99  return 0;
100  }
101 
102  CHECK(!ed_seq.empty());
103  if (ed_seq.size() > 1) {
104  return 0;
105  }
106 
107  decltype(temporary_tables_)().swap(temporary_tables_);
108  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
109  executor_->catalog_ = &cat_;
110  executor_->temporary_tables_ = &temporary_tables_;
111 
113  auto exec_desc_ptr = ed_seq.getDescriptor(0);
114  CHECK(exec_desc_ptr);
115  auto& exec_desc = *exec_desc_ptr;
116  const auto body = exec_desc.getBody();
117  if (body->isNop()) {
118  return 0;
119  }
120 
121  const auto project = dynamic_cast<const RelProject*>(body);
122  if (project) {
123  auto work_unit =
124  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
125 
126  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
127  executor_);
128  }
129 
130  const auto compound = dynamic_cast<const RelCompound*>(body);
131  if (compound) {
132  if (compound->isDeleteViaSelect()) {
133  return 0;
134  } else if (compound->isUpdateViaSelect()) {
135  return 0;
136  } else {
137  if (compound->isAggregate()) {
138  return 0;
139  }
140 
141  const auto work_unit =
142  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
143 
144  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
145  executor_);
146  }
147  }
148 
149  return 0;
150 }
151 
153  const ExecutionOptions& eo,
154  const bool just_explain_plan,
155  RenderInfo* render_info) {
156  CHECK(query_dag_);
157  auto timer = DEBUG_TIMER(__func__);
159 
160  try {
161  return executeRelAlgQueryNoRetry(co, eo, just_explain_plan, render_info);
162  } catch (const QueryMustRunOnCpu&) {
163  if (!g_allow_cpu_retry) {
164  throw;
165  }
166  }
167  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
168  auto co_cpu = CompilationOptions::makeCpuOnly(co);
169 
170  if (render_info) {
171  render_info->setForceNonInSituData();
172  }
173  return executeRelAlgQueryNoRetry(co_cpu, eo, just_explain_plan, render_info);
174 }
175 
176 namespace {
177 
179  mapd_shared_lock<mapd_shared_mutex> shared_lock;
180  mapd_unique_lock<mapd_shared_mutex> unique_lock;
181 };
182 
183 } // namespace
184 
186  const ExecutionOptions& eo,
187  const bool just_explain_plan,
188  RenderInfo* render_info) {
190  auto timer = DEBUG_TIMER(__func__);
191  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
192 
193  query_dag_->resetQueryExecutionState();
194  const auto& ra = query_dag_->getRootNode();
195 
196  // capture the lock acquistion time
197  auto clock_begin = timer_start();
199  executor_->resetInterrupt();
200  }
201 
202  std::string query_session = "";
204  // a request of query execution without session id can happen, i.e., test query
205  // if so, we turn back to the original way: a runtime query interrupt
206  // without per-session management (as similar to dynamic watchdog)
207  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
208  executor_->executor_session_mutex_);
209  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
210  query_session = query_state_->getConstSessionInfo()->get_session_id();
211  } else if (executor_->getCurrentQuerySession(session_read_lock) != query_session) {
212  query_session = executor_->getCurrentQuerySession(session_read_lock);
213  }
214  session_read_lock.unlock();
215  if (query_session != "") {
216  // if session is valid, then we allow per-session runtime query interrupt
217  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
218  executor_->executor_session_mutex_);
219  executor_->addToQuerySessionList(query_session, session_write_lock);
220  session_write_lock.unlock();
221  // hybrid spinlock. if it fails to acquire a lock, then
222  // it sleeps {g_runtime_query_interrupt_frequency} millisecond.
223  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
224  // failed to get the spinlock: check whether query is interrupted
225  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
226  executor_->executor_session_mutex_);
227  bool isQueryInterrupted =
228  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
229  session_read_lock.unlock();
230  if (isQueryInterrupted) {
231  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
232  executor_->executor_session_mutex_);
233  executor_->removeFromQuerySessionList(query_session, session_write_lock);
234  session_write_lock.unlock();
235  VLOG(1) << "Kill the pending query session: " << query_session;
236  throw std::runtime_error(
237  "Query execution has been interrupted (pending query)");
238  }
239  // here it fails to acquire the lock
240  std::this_thread::sleep_for(
241  std::chrono::milliseconds(g_runtime_query_interrupt_frequency));
242  };
243  }
244  // currently, atomic_flag does not provide a way to get its current status,
245  // i.e., spinlock.is_locked(), so we additionally lock the execute_mutex_
246  // right after acquiring spinlock to let other part of the code can know
247  // whether there exists a running query on the executor
248  }
249  auto aquire_execute_mutex = [](Executor* executor) -> ExecutorMutexHolder {
250  ExecutorMutexHolder ret;
251  if (executor->executor_id_ == Executor::UNITARY_EXECUTOR_ID) {
252  // Only one unitary executor can run at a time
253  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(executor->execute_mutex_);
254  } else {
255  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(executor->execute_mutex_);
256  }
257  return ret;
258  };
259  auto lock = aquire_execute_mutex(executor_);
260  ScopeGuard clearRuntimeInterruptStatus = [this] {
261  // reset the runtime query interrupt status
263  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
264  executor_->executor_session_mutex_);
265  std::string curSession = executor_->getCurrentQuerySession(session_read_lock);
266  session_read_lock.unlock();
267  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
268  executor_->executor_session_mutex_);
269  executor_->removeFromQuerySessionList(curSession, session_write_lock);
270  executor_->invalidateQuerySession(session_write_lock);
271  executor_->execute_spin_lock_.clear(std::memory_order_release);
272  session_write_lock.unlock();
273  executor_->resetInterrupt();
274  VLOG(1) << "RESET runtime query interrupt status of Executor " << this;
275  }
276  };
277 
279  // make sure to set the running session ID
280  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
281  executor_->executor_session_mutex_);
282  executor_->invalidateQuerySession(session_write_lock);
283  executor_->setCurrentQuerySession(query_session, session_write_lock);
284  session_write_lock.unlock();
285  }
286 
287  int64_t queue_time_ms = timer_stop(clock_begin);
288  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
289  const auto phys_inputs = get_physical_inputs(cat_, &ra);
290  const auto phys_table_ids = get_physical_table_inputs(&ra);
291  executor_->setCatalog(&cat_);
292  executor_->setupCaching(phys_inputs, phys_table_ids);
293 
294  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
295  auto ed_seq = RaExecutionSequence(&ra);
296 
297  if (just_explain_plan) {
298  std::stringstream ss;
299  std::vector<const RelAlgNode*> nodes;
300  for (size_t i = 0; i < ed_seq.size(); i++) {
301  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
302  }
303  size_t ctr = nodes.size();
304  size_t tab_ctr = 0;
305  for (auto& body : boost::adaptors::reverse(nodes)) {
306  const auto index = ctr--;
307  const auto tabs = std::string(tab_ctr++, '\t');
308  CHECK(body);
309  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
310  if (auto sort = dynamic_cast<const RelSort*>(body)) {
311  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
312  }
313  if (dynamic_cast<const RelProject*>(body) ||
314  dynamic_cast<const RelCompound*>(body)) {
315  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
316  ss << tabs << " : " << join->toString() << "\n";
317  }
318  }
319  }
320  auto rs = std::make_shared<ResultSet>(ss.str());
321  return {rs, {}};
322  }
323 
324  if (render_info) {
325  // set render to be non-insitu in certain situations.
327  ed_seq.size() > 1) {
328  // old logic
329  // disallow if more than one ED
330  render_info->setInSituDataIfUnset(false);
331  }
332  }
333 
334  if (eo.find_push_down_candidates) {
335  // this extra logic is mainly due to current limitations on multi-step queries
336  // and/or subqueries.
338  ed_seq, co, eo, render_info, queue_time_ms);
339  }
340  timer_setup.stop();
341 
342  // Dispatch the subqueries first
343  for (auto subquery : getSubqueries()) {
344  const auto subquery_ra = subquery->getRelAlg();
345  CHECK(subquery_ra);
346  if (subquery_ra->hasContextData()) {
347  continue;
348  }
349  // Execute the subquery and cache the result.
350  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
351  RaExecutionSequence subquery_seq(subquery_ra);
352  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
353  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
354  }
355  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
356 }
357 
359  AggregatedColRange agg_col_range_cache;
360  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
361  return executor_->computeColRangesCache(phys_inputs);
362 }
363 
365  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
366  return executor_->computeStringDictionaryGenerations(phys_inputs);
367 }
368 
370  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
371  return executor_->computeTableGenerations(phys_table_ids);
372 }
373 
374 Executor* RelAlgExecutor::getExecutor() const {
375  return executor_;
376 }
377 
379  CHECK(executor_);
380  executor_->row_set_mem_owner_ = nullptr;
381  executor_->lit_str_dict_proxy_ = nullptr;
382 }
383 
384 namespace {
385 
386 inline void check_sort_node_source_constraint(const RelSort* sort) {
387  CHECK_EQ(size_t(1), sort->inputCount());
388  const auto source = sort->getInput(0);
389  if (dynamic_cast<const RelSort*>(source)) {
390  throw std::runtime_error("Sort node not supported as input to another sort");
391  }
392 }
393 
394 } // namespace
395 
397  const RaExecutionSequence& seq,
398  const size_t step_idx,
399  const CompilationOptions& co,
400  const ExecutionOptions& eo,
401  RenderInfo* render_info) {
402  INJECT_TIMER(executeRelAlgQueryStep);
403  auto exe_desc_ptr = seq.getDescriptor(step_idx);
404  CHECK(exe_desc_ptr);
405  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
406 
407  size_t shard_count{0};
408  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
409  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
410  };
411 
412  if (sort) {
414  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
416  source_work_unit.exe_unit, *executor_->getCatalog());
417  if (!shard_count) {
418  // No point in sorting on the leaf, only execute the input to the sort node.
419  CHECK_EQ(size_t(1), sort->inputCount());
420  const auto source = sort->getInput(0);
421  if (sort->collationCount() || node_is_aggregate(source)) {
422  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
423  CHECK_EQ(temp_seq.size(), size_t(1));
424  // Use subseq to avoid clearing existing temporary tables
425  return {executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo, nullptr, 0),
426  merge_type(source),
427  source->getId(),
428  false};
429  }
430  }
431  }
432  return {executeRelAlgSubSeq(seq,
433  std::make_pair(step_idx, step_idx + 1),
434  co,
435  eo,
436  render_info,
438  merge_type(exe_desc_ptr->getBody()),
439  exe_desc_ptr->getBody()->getId(),
440  false};
441 }
442 
444  const AggregatedColRange& agg_col_range,
445  const StringDictionaryGenerations& string_dictionary_generations,
446  const TableGenerations& table_generations) {
447  // capture the lock acquistion time
448  auto clock_begin = timer_start();
450  executor_->resetInterrupt();
451  }
452  queue_time_ms_ = timer_stop(clock_begin);
453  executor_->row_set_mem_owner_ =
454  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
455  executor_->table_generations_ = table_generations;
456  executor_->agg_col_range_cache_ = agg_col_range;
457  executor_->string_dictionary_generations_ = string_dictionary_generations;
458 }
459 
461  const CompilationOptions& co,
462  const ExecutionOptions& eo,
463  RenderInfo* render_info,
464  const int64_t queue_time_ms,
465  const bool with_existing_temp_tables) {
467  auto timer = DEBUG_TIMER(__func__);
468  if (!with_existing_temp_tables) {
469  decltype(temporary_tables_)().swap(temporary_tables_);
470  }
471  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
472  executor_->catalog_ = &cat_;
473  executor_->temporary_tables_ = &temporary_tables_;
474 
475  time(&now_);
476  CHECK(!seq.empty());
477  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
478 
479  for (size_t i = 0; i < exec_desc_count; i++) {
480  VLOG(1) << "Executing query step " << i;
481  // only render on the last step
482  try {
483  executeRelAlgStep(seq,
484  i,
485  co,
486  eo,
487  (i == exec_desc_count - 1) ? render_info : nullptr,
488  queue_time_ms);
489  } catch (const NativeExecutionError&) {
490  if (!g_enable_interop) {
491  throw;
492  }
493  auto eo_extern = eo;
494  eo_extern.executor_type = ::ExecutorType::Extern;
495  auto exec_desc_ptr = seq.getDescriptor(i);
496  const auto body = exec_desc_ptr->getBody();
497  const auto compound = dynamic_cast<const RelCompound*>(body);
498  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
499  LOG(INFO) << "Also failed to run the query using interoperability";
500  throw;
501  }
502  executeRelAlgStep(seq,
503  i,
504  co,
505  eo_extern,
506  (i == exec_desc_count - 1) ? render_info : nullptr,
507  queue_time_ms);
508  }
509  }
510 
511  return seq.getDescriptor(exec_desc_count - 1)->getResult();
512 }
513 
515  const RaExecutionSequence& seq,
516  const std::pair<size_t, size_t> interval,
517  const CompilationOptions& co,
518  const ExecutionOptions& eo,
519  RenderInfo* render_info,
520  const int64_t queue_time_ms) {
522  executor_->catalog_ = &cat_;
523  executor_->temporary_tables_ = &temporary_tables_;
524 
525  time(&now_);
526  for (size_t i = interval.first; i < interval.second; i++) {
527  // only render on the last step
528  executeRelAlgStep(seq,
529  i,
530  co,
531  eo,
532  (i == interval.second - 1) ? render_info : nullptr,
533  queue_time_ms);
534  }
535 
536  return seq.getDescriptor(interval.second - 1)->getResult();
537 }
538 
540  const size_t step_idx,
541  const CompilationOptions& co,
542  const ExecutionOptions& eo,
543  RenderInfo* render_info,
544  const int64_t queue_time_ms) {
546  auto timer = DEBUG_TIMER(__func__);
548  auto exec_desc_ptr = seq.getDescriptor(step_idx);
549  CHECK(exec_desc_ptr);
550  auto& exec_desc = *exec_desc_ptr;
551  const auto body = exec_desc.getBody();
552  if (body->isNop()) {
553  handleNop(exec_desc);
554  return;
555  }
556  const ExecutionOptions eo_work_unit{
558  eo.allow_multifrag,
559  eo.just_explain,
560  eo.allow_loop_joins,
561  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
562  eo.jit_debug,
563  eo.just_validate,
571  eo.executor_type,
572  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
573 
574  const auto compound = dynamic_cast<const RelCompound*>(body);
575  if (compound) {
576  if (compound->isDeleteViaSelect()) {
577  executeDelete(compound, co, eo_work_unit, queue_time_ms);
578  } else if (compound->isUpdateViaSelect()) {
579  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
580  } else {
581  exec_desc.setResult(
582  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
583  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
584  << static_cast<int>(-compound->getId()) << ", ...)"
585  << " exec_desc.getResult().getDataPtr()->rowCount()="
586  << exec_desc.getResult().getDataPtr()->rowCount();
587  if (exec_desc.getResult().isFilterPushDownEnabled()) {
588  return;
589  }
590  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
591  }
592  return;
593  }
594  const auto project = dynamic_cast<const RelProject*>(body);
595  if (project) {
596  if (project->isDeleteViaSelect()) {
597  executeDelete(project, co, eo_work_unit, queue_time_ms);
598  } else if (project->isUpdateViaSelect()) {
599  executeUpdate(project, co, eo_work_unit, queue_time_ms);
600  } else {
601  ssize_t prev_count = -1;
602  // Disabling the intermediate count optimization in distributed, as the previous
603  // execution descriptor will likely not hold the aggregated result.
604  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
605  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
606  CHECK(prev_exec_desc);
607  RelAlgNode const* prev_body = prev_exec_desc->getBody();
608  // This optimization needs to be restricted in its application for UNION, which
609  // can have 2 input nodes in which neither should restrict the count of the other.
610  // However some non-UNION queries are measurably slower with this restriction, so
611  // it is only applied when g_enable_union is true.
612  bool const parent_check =
613  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
614  // If the previous node produced a reliable count, skip the pre-flight count
615  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
616  dynamic_cast<const RelLogicalValues*>(prev_body))) {
617  const auto& prev_exe_result = prev_exec_desc->getResult();
618  const auto prev_result = prev_exe_result.getRows();
619  if (prev_result) {
620  prev_count = static_cast<ssize_t>(prev_result->rowCount());
621  }
622  VLOG(3) << "prev_exec_desc->getBody()->toString()="
623  << prev_exec_desc->getBody()->toString()
624  << " prev_count=" << prev_count;
625  }
626  }
627  exec_desc.setResult(executeProject(
628  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
629  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
630  << static_cast<int>(-project->getId()) << ", ...)"
631  << " exec_desc.getResult().getDataPtr()->rowCount()="
632  << exec_desc.getResult().getDataPtr()->rowCount();
633  if (exec_desc.getResult().isFilterPushDownEnabled()) {
634  return;
635  }
636  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
637  }
638  return;
639  }
640  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
641  if (aggregate) {
642  exec_desc.setResult(
643  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
644  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
645  return;
646  }
647  const auto filter = dynamic_cast<const RelFilter*>(body);
648  if (filter) {
649  exec_desc.setResult(
650  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
651  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
652  return;
653  }
654  const auto sort = dynamic_cast<const RelSort*>(body);
655  if (sort) {
656  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
657  if (exec_desc.getResult().isFilterPushDownEnabled()) {
658  return;
659  }
660  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
661  return;
662  }
663  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
664  if (logical_values) {
665  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
666  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
667  return;
668  }
669  const auto modify = dynamic_cast<const RelModify*>(body);
670  if (modify) {
671  exec_desc.setResult(executeModify(modify, eo_work_unit));
672  return;
673  }
674  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
675  if (logical_union) {
676  exec_desc.setResult(
677  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
678  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
679  return;
680  }
681  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
682  if (table_func) {
683  exec_desc.setResult(
684  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
685  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
686  return;
687  }
688  LOG(FATAL) << "Unhandled body type: " << body->toString();
689 }
690 
692  // just set the result of the previous node as the result of no op
693  auto body = ed.getBody();
694  CHECK(dynamic_cast<const RelAggregate*>(body));
695  CHECK_EQ(size_t(1), body->inputCount());
696  const auto input = body->getInput(0);
697  body->setOutputMetainfo(input->getOutputMetainfo());
698  const auto it = temporary_tables_.find(-input->getId());
699  CHECK(it != temporary_tables_.end());
700  // set up temp table as it could be used by the outer query or next step
701  addTemporaryTable(-body->getId(), it->second);
702 
703  ed.setResult({it->second, input->getOutputMetainfo()});
704 }
705 
706 namespace {
707 
708 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
709  public:
711 
712  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
713  return synthesized_physical_inputs_owned;
714  }
715 
716  std::unordered_set<const RexInput*> visitInput(
717  const RexInput* rex_input) const override {
718  const auto input_ra = rex_input->getSourceNode();
719  CHECK(input_ra);
720  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
721  if (scan_ra) {
722  const auto td = scan_ra->getTableDescriptor();
723  if (td) {
724  const auto col_id = rex_input->getIndex();
725  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
726  if (cd && cd->columnType.get_physical_cols() > 0) {
727  CHECK(IS_GEO(cd->columnType.get_type()));
728  std::unordered_set<const RexInput*> synthesized_physical_inputs;
729  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
730  auto physical_input =
731  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
732  synthesized_physical_inputs_owned.emplace_back(physical_input);
733  synthesized_physical_inputs.insert(physical_input);
734  }
735  return synthesized_physical_inputs;
736  }
737  }
738  }
739  return {rex_input};
740  }
741 
742  protected:
743  std::unordered_set<const RexInput*> aggregateResult(
744  const std::unordered_set<const RexInput*>& aggregate,
745  const std::unordered_set<const RexInput*>& next_result) const override {
746  auto result = aggregate;
747  result.insert(next_result.begin(), next_result.end());
748  return result;
749  }
750 
751  private:
752  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
754 };
755 
756 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
757  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
758  CHECK_EQ(size_t(2), join->inputCount());
759  return join;
760  }
761  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
762  CHECK_EQ(size_t(1), ra_node->inputCount());
763  }
764  auto only_src = ra_node->getInput(0);
765  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
766  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
767  return is_join ? only_src : ra_node;
768 }
769 
770 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
772  RexUsedInputsVisitor visitor(cat);
773  const auto filter_expr = compound->getFilterExpr();
774  std::unordered_set<const RexInput*> used_inputs =
775  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
776  const auto sources_size = compound->getScalarSourcesSize();
777  for (size_t i = 0; i < sources_size; ++i) {
778  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
779  used_inputs.insert(source_inputs.begin(), source_inputs.end());
780  }
781  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
782  return std::make_pair(used_inputs, used_inputs_owned);
783 }
784 
785 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
787  CHECK_EQ(size_t(1), aggregate->inputCount());
788  std::unordered_set<const RexInput*> used_inputs;
789  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
790  const auto source = aggregate->getInput(0);
791  const auto& in_metainfo = source->getOutputMetainfo();
792  const auto group_count = aggregate->getGroupByCount();
793  CHECK_GE(in_metainfo.size(), group_count);
794  for (size_t i = 0; i < group_count; ++i) {
795  auto synthesized_used_input = new RexInput(source, i);
796  used_inputs_owned.emplace_back(synthesized_used_input);
797  used_inputs.insert(synthesized_used_input);
798  }
799  for (const auto& agg_expr : aggregate->getAggExprs()) {
800  for (size_t i = 0; i < agg_expr->size(); ++i) {
801  const auto operand_idx = agg_expr->getOperand(i);
802  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
803  auto synthesized_used_input = new RexInput(source, operand_idx);
804  used_inputs_owned.emplace_back(synthesized_used_input);
805  used_inputs.insert(synthesized_used_input);
806  }
807  }
808  return std::make_pair(used_inputs, used_inputs_owned);
809 }
810 
811 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
813  RexUsedInputsVisitor visitor(cat);
814  std::unordered_set<const RexInput*> used_inputs;
815  for (size_t i = 0; i < project->size(); ++i) {
816  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
817  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
818  }
819  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
820  return std::make_pair(used_inputs, used_inputs_owned);
821 }
822 
823 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
826  RexUsedInputsVisitor visitor(cat);
827  std::unordered_set<const RexInput*> used_inputs;
828  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
829  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
830  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
831  }
832  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
833  return std::make_pair(used_inputs, used_inputs_owned);
834 }
835 
836 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
838  std::unordered_set<const RexInput*> used_inputs;
839  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
840  const auto data_sink_node = get_data_sink(filter);
841  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
842  const auto source = data_sink_node->getInput(nest_level);
843  const auto scan_source = dynamic_cast<const RelScan*>(source);
844  if (scan_source) {
845  CHECK(source->getOutputMetainfo().empty());
846  for (size_t i = 0; i < scan_source->size(); ++i) {
847  auto synthesized_used_input = new RexInput(scan_source, i);
848  used_inputs_owned.emplace_back(synthesized_used_input);
849  used_inputs.insert(synthesized_used_input);
850  }
851  } else {
852  const auto& partial_in_metadata = source->getOutputMetainfo();
853  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
854  auto synthesized_used_input = new RexInput(source, i);
855  used_inputs_owned.emplace_back(synthesized_used_input);
856  used_inputs.insert(synthesized_used_input);
857  }
858  }
859  }
860  return std::make_pair(used_inputs, used_inputs_owned);
861 }
862 
863 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
865  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
866  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
867  used_inputs_owned.reserve(logical_union->inputCount());
868  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
869  auto const n_inputs = logical_union->inputCount();
870  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
871  auto input = logical_union->getInput(nest_level);
872  for (size_t i = 0; i < input->size(); ++i) {
873  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
874  used_inputs.insert(used_inputs_owned.back().get());
875  }
876  }
877  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
878 }
879 
880 int table_id_from_ra(const RelAlgNode* ra_node) {
881  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
882  if (scan_ra) {
883  const auto td = scan_ra->getTableDescriptor();
884  CHECK(td);
885  return td->tableId;
886  }
887  return -ra_node->getId();
888 }
889 
890 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
891  const RelAlgNode* ra_node,
892  const std::vector<size_t>& input_permutation) {
893  const auto data_sink_node = get_data_sink(ra_node);
894  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
895  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
896  const auto input_node_idx =
897  input_permutation.empty() ? input_idx : input_permutation[input_idx];
898  const auto input_ra = data_sink_node->getInput(input_node_idx);
899  // Having a non-zero mapped value (input_idx) results in the query being interpretted
900  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
901  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
902  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
903  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
904  CHECK(it_ok.second);
905  LOG_IF(INFO, !input_permutation.empty())
906  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
907  }
908  return input_to_nest_level;
909 }
910 
911 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
914  const auto data_sink_node = get_data_sink(ra_node);
915  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
916  CHECK_EQ(join->inputCount(), 2u);
917  const auto condition = join->getCondition();
918  RexUsedInputsVisitor visitor(cat);
919  auto condition_inputs = visitor.visit(condition);
920  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
921  visitor.get_inputs_owned());
922  return std::make_pair(condition_inputs, condition_inputs_owned);
923  }
924 
925  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
926  CHECK_GE(left_deep_join->inputCount(), 2u);
927  const auto condition = left_deep_join->getInnerCondition();
928  RexUsedInputsVisitor visitor(cat);
929  auto result = visitor.visit(condition);
930  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
931  ++nesting_level) {
932  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
933  if (outer_condition) {
934  const auto outer_result = visitor.visit(outer_condition);
935  result.insert(outer_result.begin(), outer_result.end());
936  }
937  }
938  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
939  return std::make_pair(result, used_inputs_owned);
940  }
941 
942  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
943  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
944  } else {
945  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
946  }
947  return std::make_pair(std::unordered_set<const RexInput*>{},
948  std::vector<std::shared_ptr<RexInput>>{});
949 }
950 
952  std::vector<InputDescriptor>& input_descs,
954  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
955  const RelAlgNode* ra_node,
956  const std::unordered_set<const RexInput*>& source_used_inputs,
957  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
958  VLOG(3) << "ra_node=" << ra_node->toString()
959  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
960  << " source_used_inputs.size()=" << source_used_inputs.size();
961  for (const auto used_input : source_used_inputs) {
962  const auto input_ra = used_input->getSourceNode();
963  const int table_id = table_id_from_ra(input_ra);
964  const auto col_id = used_input->getIndex();
965  auto it = input_to_nest_level.find(input_ra);
966  if (it != input_to_nest_level.end()) {
967  const int input_desc = it->second;
968  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
969  dynamic_cast<const RelScan*>(input_ra)
970  ? cat.getColumnIdBySpi(table_id, col_id + 1)
971  : col_id,
972  table_id,
973  input_desc));
974  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
975  throw std::runtime_error("Bushy joins not supported");
976  }
977  }
978 }
979 
980 template <class RA>
981 std::pair<std::vector<InputDescriptor>,
982  std::list<std::shared_ptr<const InputColDescriptor>>>
983 get_input_desc_impl(const RA* ra_node,
984  const std::unordered_set<const RexInput*>& used_inputs,
985  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
986  const std::vector<size_t>& input_permutation,
988  std::vector<InputDescriptor> input_descs;
989  const auto data_sink_node = get_data_sink(ra_node);
990  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
991  const auto input_node_idx =
992  input_permutation.empty() ? input_idx : input_permutation[input_idx];
993  auto input_ra = data_sink_node->getInput(input_node_idx);
994  const int table_id = table_id_from_ra(input_ra);
995  input_descs.emplace_back(table_id, input_idx);
996  }
997  std::sort(input_descs.begin(),
998  input_descs.end(),
999  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1000  return lhs.getNestLevel() < rhs.getNestLevel();
1001  });
1002  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1003  collect_used_input_desc(input_descs,
1004  cat,
1005  input_col_descs_unique, // modified
1006  ra_node,
1007  used_inputs,
1008  input_to_nest_level);
1009  std::unordered_set<const RexInput*> join_source_used_inputs;
1010  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1011  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1012  get_join_source_used_inputs(ra_node, cat);
1013  collect_used_input_desc(input_descs,
1014  cat,
1015  input_col_descs_unique, // modified
1016  ra_node,
1017  join_source_used_inputs,
1018  input_to_nest_level);
1019  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1020  input_col_descs_unique.begin(), input_col_descs_unique.end());
1021 
1022  std::sort(input_col_descs.begin(),
1023  input_col_descs.end(),
1024  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1025  std::shared_ptr<const InputColDescriptor> const& rhs) {
1026  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1027  lhs->getColId(),
1028  lhs->getScanDesc().getTableId()) <
1029  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1030  rhs->getColId(),
1031  rhs->getScanDesc().getTableId());
1032  });
1033  return {input_descs,
1034  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1035  input_col_descs.end())};
1036 }
1037 
1038 template <class RA>
1039 std::tuple<std::vector<InputDescriptor>,
1040  std::list<std::shared_ptr<const InputColDescriptor>>,
1041  std::vector<std::shared_ptr<RexInput>>>
1042 get_input_desc(const RA* ra_node,
1043  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1044  const std::vector<size_t>& input_permutation,
1045  const Catalog_Namespace::Catalog& cat) {
1046  std::unordered_set<const RexInput*> used_inputs;
1047  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1048  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1049  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1050  auto input_desc_pair = get_input_desc_impl(
1051  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1052  return std::make_tuple(
1053  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1054 }
1055 
1056 size_t get_scalar_sources_size(const RelCompound* compound) {
1057  return compound->getScalarSourcesSize();
1058 }
1059 
1060 size_t get_scalar_sources_size(const RelProject* project) {
1061  return project->size();
1062 }
1063 
1064 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1065  return table_func->getTableFuncInputsSize();
1066 }
1067 
1068 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1069  return compound->getScalarSource(i);
1070 }
1071 
1072 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1073  return project->getProjectAt(i);
1074 }
1075 
1076 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1077  return table_func->getTableFuncInputAt(i);
1078 }
1079 
1080 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1081  const std::shared_ptr<Analyzer::Expr> expr) {
1082  const auto& ti = expr->get_type_info();
1083  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1084  return expr;
1085  }
1086  auto transient_dict_ti = ti;
1087  transient_dict_ti.set_compression(kENCODING_DICT);
1088  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1089  transient_dict_ti.set_fixed_size();
1090  return expr->add_cast(transient_dict_ti);
1091 }
1092 
1094  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1095  const std::shared_ptr<Analyzer::Expr>& expr) {
1096  try {
1097  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1098  } catch (...) {
1099  scalar_sources.push_back(fold_expr(expr.get()));
1100  }
1101 }
1102 
1103 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1104  const std::shared_ptr<Analyzer::Expr>& input) {
1105  const auto& input_ti = input->get_type_info();
1106  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1107  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1108  }
1109  return input;
1110 }
1111 
1112 template <class RA>
1113 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1114  const RA* ra_node,
1115  const RelAlgTranslator& translator,
1116  const ::ExecutorType executor_type) {
1117  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1118  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1119  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1120  << ") = " << scalar_sources_size;
1121  for (size_t i = 0; i < scalar_sources_size; ++i) {
1122  const auto scalar_rex = scalar_at(i, ra_node);
1123  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1124  // RexRef are synthetic scalars we append at the end of the real ones
1125  // for the sake of taking memory ownership, no real work needed here.
1126  continue;
1127  }
1128 
1129  const auto scalar_expr =
1130  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1131  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1132  if (executor_type == ExecutorType::Native) {
1133  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1134  } else {
1135  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1136  }
1137  }
1138 
1139  return scalar_sources;
1140 }
1141 
1142 template <class RA>
1143 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1144  const RA* ra_node,
1145  const RelAlgTranslator& translator,
1146  int32_t tableId,
1147  const Catalog_Namespace::Catalog& cat,
1148  const ColumnNameList& colNames,
1149  size_t starting_projection_column_idx) {
1150  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1151  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1152  const auto scalar_rex = scalar_at(i, ra_node);
1153  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1154  // RexRef are synthetic scalars we append at the end of the real ones
1155  // for the sake of taking memory ownership, no real work needed here.
1156  continue;
1157  }
1158 
1159  std::shared_ptr<Analyzer::Expr> translated_expr;
1160  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1161  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1162  tableId,
1163  cat,
1164  colNames[i - starting_projection_column_idx]);
1165  } else {
1166  translated_expr = translator.translateScalarRex(scalar_rex);
1167  }
1168  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1169  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1170  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1171  }
1172 
1173  return scalar_sources;
1174 }
1175 
1176 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1177  const RelCompound* compound,
1178  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1179  if (!compound->isAggregate()) {
1180  return {nullptr};
1181  }
1182  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1183  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1184  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1185  }
1186  return groupby_exprs;
1187 }
1188 
1189 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1190  const RelAggregate* aggregate,
1191  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1192  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1193  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1194  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1195  }
1196  return groupby_exprs;
1197 }
1198 
1200  const RelAlgTranslator& translator) {
1201  const auto filter_rex = compound->getFilterExpr();
1202  const auto filter_expr =
1203  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1204  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1206 }
1207 
1208 std::vector<Analyzer::Expr*> translate_targets(
1209  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1210  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1211  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1212  const RelCompound* compound,
1213  const RelAlgTranslator& translator,
1214  const ExecutorType executor_type) {
1215  std::vector<Analyzer::Expr*> target_exprs;
1216  for (size_t i = 0; i < compound->size(); ++i) {
1217  const auto target_rex = compound->getTargetExpr(i);
1218  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1219  std::shared_ptr<Analyzer::Expr> target_expr;
1220  if (target_rex_agg) {
1221  target_expr =
1222  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1223  } else {
1224  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1225  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1226  if (target_rex_ref) {
1227  const auto ref_idx = target_rex_ref->getIndex();
1228  CHECK_GE(ref_idx, size_t(1));
1229  CHECK_LE(ref_idx, groupby_exprs.size());
1230  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1231  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1232  } else {
1233  target_expr = translator.translateScalarRex(target_rex_scalar);
1234  auto rewritten_expr = rewrite_expr(target_expr.get());
1235  target_expr = fold_expr(rewritten_expr.get());
1236  if (executor_type == ExecutorType::Native) {
1237  try {
1238  target_expr = set_transient_dict(target_expr);
1239  } catch (...) {
1240  // noop
1241  }
1242  } else {
1243  target_expr = cast_dict_to_none(target_expr);
1244  }
1245  }
1246  }
1247  CHECK(target_expr);
1248  target_exprs_owned.push_back(target_expr);
1249  target_exprs.push_back(target_expr.get());
1250  }
1251  return target_exprs;
1252 }
1253 
1254 std::vector<Analyzer::Expr*> translate_targets(
1255  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1256  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1257  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1258  const RelAggregate* aggregate,
1259  const RelAlgTranslator& translator) {
1260  std::vector<Analyzer::Expr*> target_exprs;
1261  size_t group_key_idx = 1;
1262  for (const auto& groupby_expr : groupby_exprs) {
1263  auto target_expr =
1264  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1265  target_exprs_owned.push_back(target_expr);
1266  target_exprs.push_back(target_expr.get());
1267  }
1268 
1269  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1270  auto target_expr =
1271  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1272  CHECK(target_expr);
1273  target_expr = fold_expr(target_expr.get());
1274  target_exprs_owned.push_back(target_expr);
1275  target_exprs.push_back(target_expr.get());
1276  }
1277  return target_exprs;
1278 }
1279 
1281  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1282  return agg_expr && agg_expr->get_is_distinct();
1283 }
1284 
1285 bool is_agg(const Analyzer::Expr* expr) {
1286  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1287  if (agg_expr && agg_expr->get_contains_agg()) {
1288  auto agg_type = agg_expr->get_aggtype();
1289  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1290  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1291  return true;
1292  }
1293  }
1294  return false;
1295 }
1296 
1298  if (is_count_distinct(&expr)) {
1299  return SQLTypeInfo(kBIGINT, false);
1300  } else if (is_agg(&expr)) {
1302  }
1303  return get_logical_type_info(expr.get_type_info());
1304 }
1305 
1306 template <class RA>
1307 std::vector<TargetMetaInfo> get_targets_meta(
1308  const RA* ra_node,
1309  const std::vector<Analyzer::Expr*>& target_exprs) {
1310  std::vector<TargetMetaInfo> targets_meta;
1311  for (size_t i = 0; i < ra_node->size(); ++i) {
1312  CHECK(target_exprs[i]);
1313  // TODO(alex): remove the count distinct type fixup.
1314  targets_meta.emplace_back(ra_node->getFieldName(i),
1315  get_logical_type_for_expr(*target_exprs[i]),
1316  target_exprs[i]->get_type_info());
1317  }
1318  return targets_meta;
1319 }
1320 
1321 template <>
1322 std::vector<TargetMetaInfo> get_targets_meta(
1323  const RelFilter* filter,
1324  const std::vector<Analyzer::Expr*>& target_exprs) {
1325  RelAlgNode const* input0 = filter->getInput(0);
1326  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1327  return get_targets_meta(input, target_exprs);
1328  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1329  return get_targets_meta(input, target_exprs);
1330  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1331  return get_targets_meta(input, target_exprs);
1332  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1333  return get_targets_meta(input, target_exprs);
1334  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1335  return get_targets_meta(input, target_exprs);
1336  }
1337  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1338  return {};
1339 }
1340 
1341 } // namespace
1342 
1344  const CompilationOptions& co_in,
1345  const ExecutionOptions& eo_in,
1346  const int64_t queue_time_ms) {
1347  CHECK(node);
1348  auto timer = DEBUG_TIMER(__func__);
1349 
1351 
1352  auto co = co_in;
1353  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1354  // encoded string updates
1355 
1356  auto execute_update_for_node =
1357  [this, &co, &eo_in](const auto node, auto& work_unit, const bool is_aggregate) {
1358  UpdateTransactionParameters update_params(node->getModifiedTableDescriptor(),
1359  node->getTargetColumns(),
1360  node->getOutputMetainfo(),
1361  node->isVarlenUpdateRequired());
1362 
1363  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1364 
1365  auto execute_update_ra_exe_unit =
1366  [this, &co, &eo_in, &table_infos, &update_params](
1367  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1369 
1370  auto eo = eo_in;
1371  if (update_params.tableIsTemporary()) {
1372  eo.output_columnar_hint = true;
1373  co_project.allow_lazy_fetch = false;
1374  }
1375 
1376  auto update_callback = yieldUpdateCallback(update_params);
1377  executor_->executeUpdate(ra_exe_unit,
1378  table_infos,
1379  co_project,
1380  eo,
1381  cat_,
1382  executor_->row_set_mem_owner_,
1383  update_callback,
1384  is_aggregate);
1385  update_params.finalizeTransaction();
1386  };
1387 
1388  if (update_params.tableIsTemporary()) {
1389  // hold owned target exprs during execution if rewriting
1390  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1391  // rewrite temp table updates to generate the full column by moving the where
1392  // clause into a case if such a rewrite is not possible, bail on the update
1393  // operation build an expr for the update target
1394  const auto td = update_params.getTableDescriptor();
1395  CHECK(td);
1396  const auto update_column_names = update_params.getUpdateColumnNames();
1397  if (update_column_names.size() > 1) {
1398  throw std::runtime_error(
1399  "Multi-column update is not yet supported for temporary tables.");
1400  }
1401 
1402  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1403  CHECK(cd);
1404  auto projected_column_to_update =
1405  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1406  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1407  work_unit.exe_unit, projected_column_to_update);
1408  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1409  throw std::runtime_error(
1410  "Variable length updates not yet supported on temporary tables.");
1411  }
1412  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1413  } else {
1414  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1415  }
1416  };
1417 
1418  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1419  auto work_unit =
1420  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1421 
1422  execute_update_for_node(compound, work_unit, compound->isAggregate());
1423  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1424  auto work_unit =
1425  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1426 
1427  if (project->isSimple()) {
1428  CHECK_EQ(size_t(1), project->inputCount());
1429  const auto input_ra = project->getInput(0);
1430  if (dynamic_cast<const RelSort*>(input_ra)) {
1431  const auto& input_table =
1432  get_temporary_table(&temporary_tables_, -input_ra->getId());
1433  CHECK(input_table);
1434  work_unit.exe_unit.scan_limit = input_table->rowCount();
1435  }
1436  }
1437 
1438  execute_update_for_node(project, work_unit, false);
1439  } else {
1440  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1441  }
1442 }
1443 
1445  const CompilationOptions& co,
1446  const ExecutionOptions& eo_in,
1447  const int64_t queue_time_ms) {
1448  CHECK(node);
1449  auto timer = DEBUG_TIMER(__func__);
1450 
1452 
1453  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1454  auto& work_unit,
1455  const bool is_aggregate) {
1456  auto* table_descriptor = node->getModifiedTableDescriptor();
1457  CHECK(table_descriptor);
1458  if (!table_descriptor->hasDeletedCol) {
1459  throw std::runtime_error(
1460  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1461  }
1462 
1463  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1464 
1465  auto execute_delete_ra_exe_unit =
1466  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1467  const bool is_aggregate) {
1468  DeleteTransactionParameters delete_params(table_is_temporary(table_descriptor));
1469  auto delete_callback = yieldDeleteCallback(delete_params);
1471 
1472  auto eo = eo_in;
1473  if (delete_params.tableIsTemporary()) {
1474  eo.output_columnar_hint = true;
1475  co_delete.add_delete_column =
1476  false; // project the entire delete column for columnar update
1477  } else {
1478  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1479  }
1480 
1481  executor_->executeUpdate(exe_unit,
1482  table_infos,
1483  co_delete,
1484  eo,
1485  cat_,
1486  executor_->row_set_mem_owner_,
1487  delete_callback,
1488  is_aggregate);
1489  delete_params.finalizeTransaction();
1490  };
1491 
1492  if (table_is_temporary(table_descriptor)) {
1493  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1494  auto cd = cat_.getDeletedColumn(table_descriptor);
1495  CHECK(cd);
1496  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1497  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1498  const auto rewritten_exe_unit =
1499  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1500  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1501  } else {
1502  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1503  }
1504  };
1505 
1506  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1507  const auto work_unit =
1508  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1509  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1510  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1511  auto work_unit =
1512  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1513  if (project->isSimple()) {
1514  CHECK_EQ(size_t(1), project->inputCount());
1515  const auto input_ra = project->getInput(0);
1516  if (dynamic_cast<const RelSort*>(input_ra)) {
1517  const auto& input_table =
1518  get_temporary_table(&temporary_tables_, -input_ra->getId());
1519  CHECK(input_table);
1520  work_unit.exe_unit.scan_limit = input_table->rowCount();
1521  }
1522  }
1523  execute_delete_for_node(project, work_unit, false);
1524  } else {
1525  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1526  }
1527 }
1528 
1530  const CompilationOptions& co,
1531  const ExecutionOptions& eo,
1532  RenderInfo* render_info,
1533  const int64_t queue_time_ms) {
1534  auto timer = DEBUG_TIMER(__func__);
1535  const auto work_unit =
1536  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1537  CompilationOptions co_compound = co;
1538  return executeWorkUnit(work_unit,
1539  compound->getOutputMetainfo(),
1540  compound->isAggregate(),
1541  co_compound,
1542  eo,
1543  render_info,
1544  queue_time_ms);
1545 }
1546 
1548  const CompilationOptions& co,
1549  const ExecutionOptions& eo,
1550  RenderInfo* render_info,
1551  const int64_t queue_time_ms) {
1552  auto timer = DEBUG_TIMER(__func__);
1553  const auto work_unit = createAggregateWorkUnit(
1554  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1555  return executeWorkUnit(work_unit,
1556  aggregate->getOutputMetainfo(),
1557  true,
1558  co,
1559  eo,
1560  render_info,
1561  queue_time_ms);
1562 }
1563 
1564 namespace {
1565 
1566 // Returns true iff the execution unit contains window functions.
1568  return std::any_of(ra_exe_unit.target_exprs.begin(),
1569  ra_exe_unit.target_exprs.end(),
1570  [](const Analyzer::Expr* expr) {
1571  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1572  });
1573 }
1574 
1575 } // namespace
1576 
1578  const CompilationOptions& co,
1579  const ExecutionOptions& eo,
1580  RenderInfo* render_info,
1581  const int64_t queue_time_ms,
1582  const ssize_t previous_count) {
1583  auto timer = DEBUG_TIMER(__func__);
1584  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1585  CompilationOptions co_project = co;
1586  if (project->isSimple()) {
1587  CHECK_EQ(size_t(1), project->inputCount());
1588  const auto input_ra = project->getInput(0);
1589  if (dynamic_cast<const RelSort*>(input_ra)) {
1590  co_project.device_type = ExecutorDeviceType::CPU;
1591  const auto& input_table =
1592  get_temporary_table(&temporary_tables_, -input_ra->getId());
1593  CHECK(input_table);
1594  work_unit.exe_unit.scan_limit =
1595  std::min(input_table->getLimit(), input_table->rowCount());
1596  }
1597  }
1598  return executeWorkUnit(work_unit,
1599  project->getOutputMetainfo(),
1600  false,
1601  co_project,
1602  eo,
1603  render_info,
1604  queue_time_ms,
1605  previous_count);
1606 }
1607 
1609  const CompilationOptions& co_in,
1610  const ExecutionOptions& eo,
1611  const int64_t queue_time_ms) {
1613  auto timer = DEBUG_TIMER(__func__);
1614 
1615  auto co = co_in;
1616 
1617  if (g_cluster) {
1618  throw std::runtime_error("Table functions not supported in distributed mode yet");
1619  }
1620  if (!g_enable_table_functions) {
1621  throw std::runtime_error("Table function support is disabled");
1622  }
1623 
1624  auto table_func_work_unit = createTableFunctionWorkUnit(table_func, eo.just_explain);
1625  const auto body = table_func_work_unit.body;
1626  CHECK(body);
1627 
1628  const auto table_infos =
1629  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1630 
1631  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1632  co.device_type,
1634  nullptr,
1635  executor_),
1636  {}};
1637 
1638  try {
1639  result = {executor_->executeTableFunction(
1640  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1641  body->getOutputMetainfo()};
1642  } catch (const QueryExecutionError& e) {
1645  throw std::runtime_error("Table function ran out of memory during execution");
1646  }
1647  result.setQueueTime(queue_time_ms);
1648  return result;
1649 }
1650 
1651 namespace {
1652 
1653 // Creates a new expression which has the range table index set to 1. This is needed to
1654 // reuse the hash join construction helpers to generate a hash table for the window
1655 // function partition: create an equals expression with left and right sides identical
1656 // except for the range table index.
1657 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1658  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1659  if (tuple) {
1660  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1661  for (const auto& element : tuple->getTuple()) {
1662  transformed_tuple.push_back(transform_to_inner(element.get()));
1663  }
1664  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1665  }
1666  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1667  if (!col) {
1668  throw std::runtime_error("Only columns supported in the window partition for now");
1669  }
1670  return makeExpr<Analyzer::ColumnVar>(
1671  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1672 }
1673 
1674 } // namespace
1675 
1677  const CompilationOptions& co,
1678  const ExecutionOptions& eo,
1679  ColumnCacheMap& column_cache_map,
1680  const int64_t queue_time_ms) {
1681  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1682  CHECK_EQ(query_infos.size(), size_t(1));
1683  if (query_infos.front().info.fragments.size() != 1) {
1684  throw std::runtime_error(
1685  "Only single fragment tables supported for window functions for now");
1686  }
1687  if (eo.executor_type == ::ExecutorType::Extern) {
1688  return;
1689  }
1690  query_infos.push_back(query_infos.front());
1691  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1692  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1693  ++target_index) {
1694  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1695  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1696  if (!window_func) {
1697  continue;
1698  }
1699  // Always use baseline layout hash tables for now, make the expression a tuple.
1700  const auto& partition_keys = window_func->getPartitionKeys();
1701  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1702  if (partition_keys.size() > 1) {
1703  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1704  } else {
1705  if (partition_keys.empty()) {
1706  throw std::runtime_error(
1707  "Empty window function partitions are not supported yet");
1708  }
1709  CHECK_EQ(partition_keys.size(), size_t(1));
1710  partition_key_tuple = partition_keys.front();
1711  }
1712  // Creates a tautology equality with the partition expression on both sides.
1713  const auto partition_key_cond =
1714  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1715  kBW_EQ,
1716  kONE,
1717  partition_key_tuple,
1718  transform_to_inner(partition_key_tuple.get()));
1719  auto context = createWindowFunctionContext(window_func,
1720  partition_key_cond,
1721  ra_exe_unit,
1722  query_infos,
1723  co,
1724  column_cache_map,
1725  executor_->getRowSetMemoryOwner());
1726  context->compute();
1727  window_project_node_context->addWindowFunctionContext(std::move(context),
1728  target_index);
1729  }
1730 }
1731 
1732 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1733  const Analyzer::WindowFunction* window_func,
1734  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1735  const RelAlgExecutionUnit& ra_exe_unit,
1736  const std::vector<InputTableInfo>& query_infos,
1737  const CompilationOptions& co,
1738  ColumnCacheMap& column_cache_map,
1739  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1740  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1743  const auto join_table_or_err =
1744  executor_->buildHashTableForQualifier(partition_key_cond,
1745  query_infos,
1746  memory_level,
1748  column_cache_map);
1749  if (!join_table_or_err.fail_reason.empty()) {
1750  throw std::runtime_error(join_table_or_err.fail_reason);
1751  }
1752  CHECK(join_table_or_err.hash_table->getHashType() ==
1754  const auto& order_keys = window_func->getOrderKeys();
1755  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1756  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1757  auto context = std::make_unique<WindowFunctionContext>(window_func,
1758  join_table_or_err.hash_table,
1759  elem_count,
1760  co.device_type,
1761  row_set_mem_owner);
1762  for (const auto& order_key : order_keys) {
1763  const auto order_col =
1764  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1765  if (!order_col) {
1766  throw std::runtime_error("Only order by columns supported for now");
1767  }
1768  const int8_t* column;
1769  size_t join_col_elem_count;
1770  std::tie(column, join_col_elem_count) =
1772  *order_col,
1773  query_infos.front().info.fragments.front(),
1774  memory_level,
1775  0,
1776  nullptr,
1777  chunks_owner,
1778  column_cache_map);
1779  CHECK_EQ(join_col_elem_count, elem_count);
1780  context->addOrderColumn(column, order_col.get(), chunks_owner);
1781  }
1782  return context;
1783 }
1784 
1786  const CompilationOptions& co,
1787  const ExecutionOptions& eo,
1788  RenderInfo* render_info,
1789  const int64_t queue_time_ms) {
1790  auto timer = DEBUG_TIMER(__func__);
1791  const auto work_unit =
1792  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1793  return executeWorkUnit(
1794  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1795 }
1796 
1797 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
1798  std::vector<TargetMetaInfo> const& rhs) {
1799  if (lhs.size() == rhs.size()) {
1800  for (size_t i = 0; i < lhs.size(); ++i) {
1801  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
1802  return false;
1803  }
1804  }
1805  return true;
1806  }
1807  return false;
1808 }
1809 
1810 bool isGeometry(TargetMetaInfo const& target_meta_info) {
1811  return target_meta_info.get_type_info().is_geometry();
1812 }
1813 
1815  const RaExecutionSequence& seq,
1816  const CompilationOptions& co,
1817  const ExecutionOptions& eo,
1818  RenderInfo* render_info,
1819  const int64_t queue_time_ms) {
1820  auto timer = DEBUG_TIMER(__func__);
1821  if (!logical_union->isAll()) {
1822  throw std::runtime_error("UNION without ALL is not supported yet.");
1823  }
1824  if (!logical_union->inputMetainfoTypesMatch()) {
1825  throw std::runtime_error("Subqueries of a UNION must have exact same data types.");
1826  }
1827  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
1828  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
1829  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
1830  }
1831  // Only Projections and Aggregates from a UNION are supported for now.
1832  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
1833  if (node->hasInput(logical_union) &&
1834  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
1835  node)) {
1836  throw std::runtime_error("UNION ALL not yet supported in this context.");
1837  }
1838  });
1839 
1840  auto work_unit =
1841  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
1842  return executeWorkUnit(work_unit,
1843  logical_union->getOutputMetainfo(),
1844  false,
1846  eo,
1847  render_info,
1848  queue_time_ms);
1849 }
1850 
1852  const RelLogicalValues* logical_values,
1853  const ExecutionOptions& eo) {
1854  auto timer = DEBUG_TIMER(__func__);
1855  if (eo.just_explain) {
1856  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1857  }
1858 
1860  logical_values->getNumRows(),
1862  /*is_table_function=*/false);
1863 
1864  auto tuple_type = logical_values->getTupleType();
1865  for (size_t i = 0; i < tuple_type.size(); ++i) {
1866  auto& target_meta_info = tuple_type[i];
1867  if (target_meta_info.get_type_info().is_varlen()) {
1868  throw std::runtime_error("Variable length types not supported in VALUES yet.");
1869  }
1870  if (target_meta_info.get_type_info().get_type() == kNULLT) {
1871  // replace w/ bigint
1872  tuple_type[i] =
1873  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
1874  }
1875  query_mem_desc.addColSlotInfo(
1876  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
1877  }
1878  logical_values->setOutputMetainfo(tuple_type);
1879 
1880  std::vector<TargetInfo> target_infos;
1881  for (const auto& tuple_type_component : tuple_type) {
1882  target_infos.emplace_back(TargetInfo{false,
1883  kCOUNT,
1884  tuple_type_component.get_type_info(),
1885  SQLTypeInfo(kNULLT, false),
1886  false,
1887  false});
1888  }
1889  auto rs = std::make_shared<ResultSet>(target_infos,
1892  executor_->getRowSetMemoryOwner(),
1893  executor_);
1894 
1895  if (logical_values->hasRows()) {
1896  CHECK_EQ(logical_values->getRowsSize(), logical_values->size());
1897 
1898  auto storage = rs->allocateStorage();
1899  auto buff = storage->getUnderlyingBuffer();
1900 
1901  for (size_t i = 0; i < logical_values->getNumRows(); i++) {
1902  std::vector<std::shared_ptr<Analyzer::Expr>> row_literals;
1903  int8_t* ptr = buff + i * query_mem_desc.getRowSize();
1904 
1905  for (size_t j = 0; j < logical_values->getRowsSize(); j++) {
1906  auto rex_literal =
1907  dynamic_cast<const RexLiteral*>(logical_values->getValueAt(i, j));
1908  CHECK(rex_literal);
1909  const auto expr = RelAlgTranslator::translateLiteral(rex_literal);
1910  const auto constant = std::dynamic_pointer_cast<Analyzer::Constant>(expr);
1911  CHECK(constant);
1912 
1913  if (constant->get_is_null()) {
1914  CHECK(!target_infos[j].sql_type.is_varlen());
1915  *reinterpret_cast<int64_t*>(ptr) =
1916  inline_int_null_val(target_infos[j].sql_type);
1917  } else {
1918  const auto ti = constant->get_type_info();
1919  const auto datum = constant->get_constval();
1920 
1921  // Initialize the entire 8-byte slot
1922  *reinterpret_cast<int64_t*>(ptr) = EMPTY_KEY_64;
1923 
1924  const auto sz = ti.get_size();
1925  CHECK_GE(sz, int(0));
1926  std::memcpy(ptr, &datum, sz);
1927  }
1928  ptr += 8;
1929  }
1930  }
1931  }
1932  return {rs, tuple_type};
1933 }
1934 
1935 namespace {
1936 
1937 template <class T>
1938 int64_t insert_one_dict_str(T* col_data,
1939  const std::string& columnName,
1940  const SQLTypeInfo& columnType,
1941  const Analyzer::Constant* col_cv,
1942  const Catalog_Namespace::Catalog& catalog) {
1943  if (col_cv->get_is_null()) {
1944  *col_data = inline_fixed_encoding_null_val(columnType);
1945  } else {
1946  const int dict_id = columnType.get_comp_param();
1947  const auto col_datum = col_cv->get_constval();
1948  const auto& str = *col_datum.stringval;
1949  const auto dd = catalog.getMetadataForDict(dict_id);
1950  CHECK(dd && dd->stringDict);
1951  int32_t str_id = dd->stringDict->getOrAdd(str);
1952  if (!dd->dictIsTemp) {
1953  const auto checkpoint_ok = dd->stringDict->checkpoint();
1954  if (!checkpoint_ok) {
1955  throw std::runtime_error("Failed to checkpoint dictionary for column " +
1956  columnName);
1957  }
1958  }
1959  const bool invalid = str_id > max_valid_int_value<T>();
1960  if (invalid || str_id == inline_int_null_value<int32_t>()) {
1961  if (invalid) {
1962  LOG(ERROR) << "Could not encode string: " << str
1963  << ", the encoded value doesn't fit in " << sizeof(T) * 8
1964  << " bits. Will store NULL instead.";
1965  }
1966  str_id = inline_fixed_encoding_null_val(columnType);
1967  }
1968  *col_data = str_id;
1969  }
1970  return *col_data;
1971 }
1972 
1973 template <class T>
1974 int64_t insert_one_dict_str(T* col_data,
1975  const ColumnDescriptor* cd,
1976  const Analyzer::Constant* col_cv,
1977  const Catalog_Namespace::Catalog& catalog) {
1978  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
1979 }
1980 
1981 } // namespace
1982 
1984  const ExecutionOptions& eo) {
1985  auto timer = DEBUG_TIMER(__func__);
1986  if (eo.just_explain) {
1987  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
1988  }
1989 
1990  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
1993  executor_->getRowSetMemoryOwner(),
1994  executor_);
1995 
1996  std::vector<TargetMetaInfo> empty_targets;
1997  return {rs, empty_targets};
1998 }
1999 
2001  // Note: We currently obtain an executor for this method, but we do not need it.
2002  // Therefore, we skip the executor state setup in the regular execution path. In the
2003  // future, we will likely want to use the executor to evaluate expressions in the insert
2004  // statement.
2005 
2006  const auto& targets = query.get_targetlist();
2007  const int table_id = query.get_result_table_id();
2008  const auto& col_id_list = query.get_result_col_list();
2009 
2010  std::vector<const ColumnDescriptor*> col_descriptors;
2011  std::vector<int> col_ids;
2012  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2013  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2014  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2015 
2016  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2017  CHECK(table_descriptor);
2018  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
2019  const TableDescriptor* shard{nullptr};
2020 
2021  for (const int col_id : col_id_list) {
2022  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2023  const auto col_enc = cd->columnType.get_compression();
2024  if (cd->columnType.is_string()) {
2025  switch (col_enc) {
2026  case kENCODING_NONE: {
2027  auto it_ok =
2028  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2029  CHECK(it_ok.second);
2030  break;
2031  }
2032  case kENCODING_DICT: {
2033  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2034  CHECK(dd);
2035  const auto it_ok = col_buffers.emplace(
2036  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2037  CHECK(it_ok.second);
2038  break;
2039  }
2040  default:
2041  CHECK(false);
2042  }
2043  } else if (cd->columnType.is_geometry()) {
2044  auto it_ok =
2045  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2046  CHECK(it_ok.second);
2047  } else if (cd->columnType.is_array()) {
2048  auto it_ok =
2049  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2050  CHECK(it_ok.second);
2051  } else {
2052  const auto it_ok = col_buffers.emplace(
2053  col_id,
2054  std::unique_ptr<uint8_t[]>(
2055  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2056  // the buffer
2057  CHECK(it_ok.second);
2058  }
2059  col_descriptors.push_back(cd);
2060  col_ids.push_back(col_id);
2061  }
2062  size_t col_idx = 0;
2064  insert_data.databaseId = cat_.getCurrentDB().dbId;
2065  insert_data.tableId = table_id;
2066  int64_t int_col_val{0};
2067  for (auto target_entry : targets) {
2068  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2069  if (!col_cv) {
2070  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2071  CHECK(col_cast);
2072  CHECK_EQ(kCAST, col_cast->get_optype());
2073  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2074  }
2075  CHECK(col_cv);
2076  const auto cd = col_descriptors[col_idx];
2077  auto col_datum = col_cv->get_constval();
2078  auto col_type = cd->columnType.get_type();
2079  uint8_t* col_data_bytes{nullptr};
2080  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2081  (!cd->columnType.is_string() ||
2082  cd->columnType.get_compression() == kENCODING_DICT)) {
2083  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2084  CHECK(col_data_bytes_it != col_buffers.end());
2085  col_data_bytes = col_data_bytes_it->second.get();
2086  }
2087  switch (col_type) {
2088  case kBOOLEAN: {
2089  auto col_data = col_data_bytes;
2090  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2091  : (col_datum.boolval ? 1 : 0);
2092  break;
2093  }
2094  case kTINYINT: {
2095  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2096  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2097  : col_datum.tinyintval;
2098  int_col_val = col_datum.tinyintval;
2099  break;
2100  }
2101  case kSMALLINT: {
2102  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2103  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2104  : col_datum.smallintval;
2105  int_col_val = col_datum.smallintval;
2106  break;
2107  }
2108  case kINT: {
2109  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2110  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2111  : col_datum.intval;
2112  int_col_val = col_datum.intval;
2113  break;
2114  }
2115  case kBIGINT:
2116  case kDECIMAL:
2117  case kNUMERIC: {
2118  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2119  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2120  : col_datum.bigintval;
2121  int_col_val = col_datum.bigintval;
2122  break;
2123  }
2124  case kFLOAT: {
2125  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2126  *col_data = col_datum.floatval;
2127  break;
2128  }
2129  case kDOUBLE: {
2130  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2131  *col_data = col_datum.doubleval;
2132  break;
2133  }
2134  case kTEXT:
2135  case kVARCHAR:
2136  case kCHAR: {
2137  switch (cd->columnType.get_compression()) {
2138  case kENCODING_NONE:
2139  str_col_buffers[col_ids[col_idx]].push_back(
2140  col_datum.stringval ? *col_datum.stringval : "");
2141  break;
2142  case kENCODING_DICT: {
2143  switch (cd->columnType.get_size()) {
2144  case 1:
2145  int_col_val = insert_one_dict_str(
2146  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2147  break;
2148  case 2:
2149  int_col_val = insert_one_dict_str(
2150  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2151  break;
2152  case 4:
2153  int_col_val = insert_one_dict_str(
2154  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2155  break;
2156  default:
2157  CHECK(false);
2158  }
2159  break;
2160  }
2161  default:
2162  CHECK(false);
2163  }
2164  break;
2165  }
2166  case kTIME:
2167  case kTIMESTAMP:
2168  case kDATE: {
2169  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2170  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2171  : col_datum.bigintval;
2172  break;
2173  }
2174  case kARRAY: {
2175  const auto is_null = col_cv->get_is_null();
2176  const auto size = cd->columnType.get_size();
2177  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2178  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2179  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2180  if (is_null && !is_point_coords) {
2181  if (size > 0) {
2182  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2183  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2184  throw std::runtime_error("Column " + cd->columnName +
2185  " doesn't accept NULL values");
2186  }
2187  int8_t* buf = (int8_t*)checked_malloc(size);
2188  put_null_array(static_cast<void*>(buf), elem_ti, "");
2189  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2190  p += elem_ti.get_size()) {
2191  put_null(static_cast<void*>(p), elem_ti, "");
2192  }
2193  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2194  } else {
2195  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2196  }
2197  break;
2198  }
2199  const auto l = col_cv->get_value_list();
2200  size_t len = l.size() * elem_ti.get_size();
2201  if (size > 0 && static_cast<size_t>(size) != len) {
2202  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2203  std::to_string(size / elem_ti.get_size()) +
2204  " values, " + "received " + std::to_string(l.size()));
2205  }
2206  if (elem_ti.is_string()) {
2207  CHECK(kENCODING_DICT == elem_ti.get_compression());
2208  CHECK(4 == elem_ti.get_size());
2209 
2210  int8_t* buf = (int8_t*)checked_malloc(len);
2211  int32_t* p = reinterpret_cast<int32_t*>(buf);
2212 
2213  int elemIndex = 0;
2214  for (auto& e : l) {
2215  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2216  CHECK(c);
2217 
2218  int_col_val = insert_one_dict_str(
2219  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2220 
2221  elemIndex++;
2222  }
2223  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2224 
2225  } else {
2226  int8_t* buf = (int8_t*)checked_malloc(len);
2227  int8_t* p = buf;
2228  for (auto& e : l) {
2229  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2230  CHECK(c);
2231  p = appendDatum(p, c->get_constval(), elem_ti);
2232  }
2233  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2234  }
2235  break;
2236  }
2237  case kPOINT:
2238  case kLINESTRING:
2239  case kPOLYGON:
2240  case kMULTIPOLYGON:
2241  str_col_buffers[col_ids[col_idx]].push_back(
2242  col_datum.stringval ? *col_datum.stringval : "");
2243  break;
2244  default:
2245  CHECK(false);
2246  }
2247  ++col_idx;
2248  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2249  const auto shard_count = shard_tables.size();
2250  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2251  shard = shard_tables[shard_idx];
2252  }
2253  }
2254  for (const auto& kv : col_buffers) {
2255  insert_data.columnIds.push_back(kv.first);
2256  DataBlockPtr p;
2257  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2258  insert_data.data.push_back(p);
2259  }
2260  for (auto& kv : str_col_buffers) {
2261  insert_data.columnIds.push_back(kv.first);
2262  DataBlockPtr p;
2263  p.stringsPtr = &kv.second;
2264  insert_data.data.push_back(p);
2265  }
2266  for (auto& kv : arr_col_buffers) {
2267  insert_data.columnIds.push_back(kv.first);
2268  DataBlockPtr p;
2269  p.arraysPtr = &kv.second;
2270  insert_data.data.push_back(p);
2271  }
2272  insert_data.numRows = 1;
2273  if (shard) {
2274  shard->fragmenter->insertData(insert_data);
2275  } else {
2276  table_descriptor->fragmenter->insertData(insert_data);
2277  }
2278 
2279  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2282  executor_->getRowSetMemoryOwner(),
2283  executor_);
2284  std::vector<TargetMetaInfo> empty_targets;
2285  return {rs, empty_targets};
2286 }
2287 
2288 namespace {
2289 
2290 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2291 // the executor interface to use the collation directly and remove this conversion.
2292 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2293  std::list<Analyzer::OrderEntry> result;
2294  for (size_t i = 0; i < sort->collationCount(); ++i) {
2295  const auto sort_field = sort->getCollation(i);
2296  result.emplace_back(sort_field.getField() + 1,
2297  sort_field.getSortDir() == SortDirection::Descending,
2298  sort_field.getNullsPosition() == NullSortedPosition::First);
2299  }
2300  return result;
2301 }
2302 
2303 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2304  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2305  if (aggregate) {
2306  return 0;
2307  }
2308  const auto compound = dynamic_cast<const RelCompound*>(ra);
2309  return (compound && compound->isAggregate()) ? 0 : limit;
2310 }
2311 
2312 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2313  return !order_entries.empty() && order_entries.front().is_desc;
2314 }
2315 
2316 } // namespace
2317 
2319  const CompilationOptions& co,
2320  const ExecutionOptions& eo,
2321  RenderInfo* render_info,
2322  const int64_t queue_time_ms) {
2323  auto timer = DEBUG_TIMER(__func__);
2325  const auto source = sort->getInput(0);
2326  const bool is_aggregate = node_is_aggregate(source);
2327  auto it = leaf_results_.find(sort->getId());
2328  if (it != leaf_results_.end()) {
2329  // Add any transient string literals to the sdp on the agg
2330  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2332  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2333  // Handle push-down for LIMIT for multi-node
2334  auto& aggregated_result = it->second;
2335  auto& result_rows = aggregated_result.rs;
2336  const size_t limit = sort->getLimit();
2337  const size_t offset = sort->getOffset();
2338  const auto order_entries = get_order_entries(sort);
2339  if (limit || offset) {
2340  if (!order_entries.empty()) {
2341  result_rows->sort(order_entries, limit + offset);
2342  }
2343  result_rows->dropFirstN(offset);
2344  if (limit) {
2345  result_rows->keepFirstN(limit);
2346  }
2347  }
2348  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2349  sort->setOutputMetainfo(aggregated_result.targets_meta);
2350  return result;
2351  }
2352 
2353  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2354  bool is_desc{false};
2355 
2356  auto execute_sort_query = [this,
2357  sort,
2358  &source,
2359  &is_aggregate,
2360  &eo,
2361  &co,
2362  render_info,
2363  queue_time_ms,
2364  &groupby_exprs,
2365  &is_desc]() -> ExecutionResult {
2366  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2367  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2368  ExecutionOptions eo_copy = {
2370  eo.allow_multifrag,
2371  eo.just_explain,
2372  eo.allow_loop_joins,
2373  eo.with_watchdog,
2374  eo.jit_debug,
2375  eo.just_validate || sort->isEmptyResult(),
2376  eo.with_dynamic_watchdog,
2377  eo.dynamic_watchdog_time_limit,
2378  eo.find_push_down_candidates,
2379  eo.just_calcite_explain,
2380  eo.gpu_input_mem_limit_percent,
2381  eo.allow_runtime_query_interrupt,
2382  eo.runtime_query_interrupt_frequency,
2383  eo.executor_type,
2384  };
2385 
2386  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2387  auto source_result = executeWorkUnit(source_work_unit,
2388  source->getOutputMetainfo(),
2389  is_aggregate,
2390  co,
2391  eo_copy,
2392  render_info,
2393  queue_time_ms);
2394  if (render_info && render_info->isPotentialInSituRender()) {
2395  return source_result;
2396  }
2397  if (source_result.isFilterPushDownEnabled()) {
2398  return source_result;
2399  }
2400  auto rows_to_sort = source_result.getRows();
2401  if (eo.just_explain) {
2402  return {rows_to_sort, {}};
2403  }
2404  const size_t limit = sort->getLimit();
2405  const size_t offset = sort->getOffset();
2406  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2407  !use_speculative_top_n(source_work_unit.exe_unit,
2408  rows_to_sort->getQueryMemDesc())) {
2409  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
2410  limit + offset);
2411  }
2412  if (limit || offset) {
2413  if (g_cluster && sort->collationCount() == 0) {
2414  if (offset >= rows_to_sort->rowCount()) {
2415  rows_to_sort->dropFirstN(offset);
2416  } else {
2417  rows_to_sort->keepFirstN(limit + offset);
2418  }
2419  } else {
2420  rows_to_sort->dropFirstN(offset);
2421  if (limit) {
2422  rows_to_sort->keepFirstN(limit);
2423  }
2424  }
2425  }
2426  return {rows_to_sort, source_result.getTargetsMeta()};
2427  };
2428 
2429  try {
2430  return execute_sort_query();
2431  } catch (const SpeculativeTopNFailed& e) {
2432  CHECK_EQ(size_t(1), groupby_exprs.size());
2433  CHECK(groupby_exprs.front());
2434  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2435  return execute_sort_query();
2436  }
2437 }
2438 
2440  const RelSort* sort,
2441  const ExecutionOptions& eo) {
2442  const auto source = sort->getInput(0);
2443  const size_t limit = sort->getLimit();
2444  const size_t offset = sort->getOffset();
2445  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2446  const size_t scan_total_limit =
2447  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2448  size_t max_groups_buffer_entry_guess{
2449  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2451  const auto order_entries = get_order_entries(sort);
2452  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2453  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2454  const auto& source_exe_unit = source_work_unit.exe_unit;
2455 
2456  // we do not allow sorting geometry or array types
2457  for (auto order_entry : order_entries) {
2458  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2459  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2460  const auto& ti = get_target_info(te, false);
2461  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2462  throw std::runtime_error(
2463  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2464  }
2465  }
2466 
2467  if (source_exe_unit.groupby_exprs.size() == 1) {
2468  if (!source_exe_unit.groupby_exprs.front()) {
2469  sort_algorithm = SortAlgorithm::StreamingTopN;
2470  } else {
2471  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2472  first_oe_is_desc(order_entries))) {
2473  sort_algorithm = SortAlgorithm::Default;
2474  }
2475  }
2476  }
2477 
2478  sort->setOutputMetainfo(source->getOutputMetainfo());
2479  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2480  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2481  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2482  std::move(source_exe_unit.input_col_descs),
2483  source_exe_unit.simple_quals,
2484  source_exe_unit.quals,
2485  source_exe_unit.join_quals,
2486  source_exe_unit.groupby_exprs,
2487  source_exe_unit.target_exprs,
2488  nullptr,
2489  {sort_info.order_entries, sort_algorithm, limit, offset},
2490  scan_total_limit,
2491  source_exe_unit.use_bump_allocator,
2492  source_exe_unit.union_all,
2493  source_exe_unit.query_state},
2494  source,
2495  max_groups_buffer_entry_guess,
2496  std::move(source_work_unit.query_rewriter),
2497  source_work_unit.input_permutation,
2498  source_work_unit.left_deep_join_input_sizes};
2499 }
2500 
2501 namespace {
2502 
2509 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2510  CHECK(!table_infos.empty());
2511  const auto& first_table = table_infos.front();
2512  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2513  for (const auto& table_info : table_infos) {
2514  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2515  max_num_groups = table_info.info.getNumTuplesUpperBound();
2516  }
2517  }
2518  return std::max(max_num_groups, size_t(1));
2519 }
2520 
2528  for (const auto target_expr : ra_exe_unit.target_exprs) {
2529  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2530  return false;
2531  }
2532  }
2533  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2534  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2535  return true;
2536  }
2537  return false;
2538 }
2539 
2540 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2541  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2542  ra_exe_unit.simple_quals.empty());
2543 }
2544 
2546  const RelAlgExecutionUnit& ra_exe_unit_in,
2547  const std::vector<InputTableInfo>& table_infos,
2548  const Executor* executor,
2549  const ExecutorDeviceType device_type_in,
2550  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2551  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2552  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2553  const auto target_expr = ra_exe_unit.target_exprs[i];
2554  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2555  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2556  continue;
2557  }
2558  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2559  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2560  CHECK(arg);
2561  const auto& arg_ti = arg->get_type_info();
2562  // Avoid calling getExpressionRange for variable length types (string and array),
2563  // it'd trigger an assertion since that API expects to be called only for types
2564  // for which the notion of range is well-defined. A bit of a kludge, but the
2565  // logic to reject these types anyway is at lower levels in the stack and not
2566  // really worth pulling into a separate function for now.
2567  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2568  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2569  continue;
2570  }
2571  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2572  if (arg_range.getType() != ExpressionRangeType::Integer) {
2573  continue;
2574  }
2575  // When running distributed, the threshold for using the precise implementation
2576  // must be consistent across all leaves, otherwise we could have a mix of precise
2577  // and approximate bitmaps and we cannot aggregate them.
2578  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2579  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2580  const auto sub_bitmap_count =
2581  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2582  int64_t approx_bitmap_sz_bits{0};
2583  const auto error_rate =
2584  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2585  if (error_rate) {
2586  CHECK(error_rate->get_type_info().get_type() == kINT);
2587  CHECK_GE(error_rate->get_constval().intval, 1);
2588  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2589  } else {
2590  approx_bitmap_sz_bits = g_hll_precision_bits;
2591  }
2592  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2593  arg_range.getIntMin(),
2594  approx_bitmap_sz_bits,
2595  true,
2596  device_type,
2597  sub_bitmap_count};
2598  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2599  arg_range.getIntMin(),
2600  bitmap_sz_bits,
2601  false,
2602  device_type,
2603  sub_bitmap_count};
2604  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2605  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2606  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2607  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2608  target_exprs_owned.push_back(precise_count_distinct);
2609  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2610  }
2611  }
2612  return ra_exe_unit;
2613 }
2614 
2616  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2617  const std::vector<TargetMetaInfo>& targets_meta) {
2618  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2619  render_info.targets.clear();
2620  for (size_t i = 0; i < targets_meta.size(); ++i) {
2621  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2622  targets_meta[i].get_resname(),
2623  work_unit_target_exprs[i]->get_shared_ptr(),
2624  false));
2625  }
2626 }
2627 
2628 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2629  const CompilationOptions& co,
2630  const ExecutionOptions& eo) {
2632  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2633 }
2634 
2635 } // namespace
2636 
2638  const RelAlgExecutor::WorkUnit& work_unit,
2639  const std::vector<TargetMetaInfo>& targets_meta,
2640  const bool is_agg,
2641  const CompilationOptions& co_in,
2642  const ExecutionOptions& eo,
2643  RenderInfo* render_info,
2644  const int64_t queue_time_ms,
2645  const ssize_t previous_count) {
2647  auto timer = DEBUG_TIMER(__func__);
2648 
2649  auto co = co_in;
2650  ColumnCacheMap column_cache;
2651  if (is_window_execution_unit(work_unit.exe_unit)) {
2653  throw std::runtime_error("Window functions support is disabled");
2654  }
2656  co.allow_lazy_fetch = false;
2657  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2658  }
2659  if (!eo.just_explain && eo.find_push_down_candidates) {
2660  // find potential candidates:
2661  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2662  if (!selected_filters.empty() || eo.just_calcite_explain) {
2663  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2664  }
2665  }
2666  if (render_info && render_info->isPotentialInSituRender()) {
2667  co.allow_lazy_fetch = false;
2668  }
2669  const auto body = work_unit.body;
2670  CHECK(body);
2671  auto it = leaf_results_.find(body->getId());
2672  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2673  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2674  if (it != leaf_results_.end()) {
2676  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2677  auto& aggregated_result = it->second;
2678  auto& result_rows = aggregated_result.rs;
2679  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2680  body->setOutputMetainfo(aggregated_result.targets_meta);
2681  if (render_info) {
2682  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2683  }
2684  return result;
2685  }
2686  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2687 
2689  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2690  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2691  if (is_window_execution_unit(ra_exe_unit)) {
2692  CHECK_EQ(table_infos.size(), size_t(1));
2693  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2694  max_groups_buffer_entry_guess =
2695  table_infos.front().info.fragments.front().getNumTuples();
2696  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2697  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2698  if (previous_count > 0 && !exe_unit_has_quals(ra_exe_unit)) {
2699  ra_exe_unit.scan_limit = static_cast<size_t>(previous_count);
2700  } else {
2701  // TODO(adb): enable bump allocator path for render queries
2702  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2703  ra_exe_unit.scan_limit = 0;
2704  ra_exe_unit.use_bump_allocator = true;
2705  } else if (eo.executor_type == ::ExecutorType::Extern) {
2706  ra_exe_unit.scan_limit = 0;
2707  } else if (!eo.just_explain) {
2708  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2709  if (filter_count_all >= 0) {
2710  ra_exe_unit.scan_limit = std::max(filter_count_all, ssize_t(1));
2711  }
2712  }
2713  }
2714  }
2715 
2716  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2717  co.device_type,
2719  nullptr,
2720  executor_),
2721  {}};
2722 
2723  auto execute_and_handle_errors =
2724  [&](const auto max_groups_buffer_entry_guess_in,
2725  const bool has_cardinality_estimation) -> ExecutionResult {
2726  // Note that the groups buffer entry guess may be modified during query execution.
2727  // Create a local copy so we can track those changes if we need to attempt a retry
2728  // due to OOM
2729  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2730  try {
2731  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2732  is_agg,
2733  table_infos,
2734  ra_exe_unit,
2735  co,
2736  eo,
2737  cat_,
2738  executor_->row_set_mem_owner_,
2739  render_info,
2740  has_cardinality_estimation,
2741  column_cache),
2742  targets_meta};
2743  } catch (const QueryExecutionError& e) {
2745  return handleOutOfMemoryRetry(
2746  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2747  targets_meta,
2748  is_agg,
2749  co,
2750  eo,
2751  render_info,
2753  queue_time_ms);
2754  }
2755  };
2756  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2757  try {
2758  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2759  auto card = cached_cardinality.second;
2760  if (cached_cardinality.first && card >= 0) {
2761  result = execute_and_handle_errors(card, true);
2762  } else {
2763  result = execute_and_handle_errors(
2764  max_groups_buffer_entry_guess,
2766  }
2767  VLOG(3) << "result.getRows()->entryCount()=" << result.getRows()->entryCount();
2768  } catch (const CardinalityEstimationRequired&) {
2769  // check the cardinality cache
2770  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2771  auto card = cached_cardinality.second;
2772  if (cached_cardinality.first && card >= 0) {
2773  result = execute_and_handle_errors(card, true);
2774  } else {
2775  const auto estimated_groups_buffer_entry_guess =
2776  2 * std::min(groups_approx_upper_bound(table_infos),
2777  getNDVEstimation(work_unit, is_agg, co, eo));
2778  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2779  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2780  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
2781  }
2782  }
2783 
2784  result.setQueueTime(queue_time_ms);
2785  if (render_info) {
2786  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2787  if (render_info->isPotentialInSituRender()) {
2788  // return an empty result (with the same queue time, and zero render time)
2789  return {std::make_shared<ResultSet>(
2790  queue_time_ms,
2791  0,
2792  executor_->row_set_mem_owner_
2793  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2794  : nullptr),
2795  {}};
2796  }
2797  }
2798  return result;
2799 }
2800 
2802  const bool is_agg,
2803  const CompilationOptions& co,
2804  const ExecutionOptions& eo) {
2805  const auto count =
2806  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2807  kCOUNT,
2808  nullptr,
2809  false,
2810  nullptr);
2811  const auto count_all_exe_unit =
2812  create_count_all_execution_unit(work_unit.exe_unit, count);
2813  size_t one{1};
2814  ResultSetPtr count_all_result;
2815  try {
2816  ColumnCacheMap column_cache;
2817  count_all_result =
2818  executor_->executeWorkUnit(one,
2819  is_agg,
2820  get_table_infos(work_unit.exe_unit, executor_),
2821  count_all_exe_unit,
2822  co,
2823  eo,
2824  cat_,
2825  executor_->row_set_mem_owner_,
2826  nullptr,
2827  false,
2828  column_cache);
2829  } catch (const std::exception& e) {
2830  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2831  return -1;
2832  }
2833  const auto count_row = count_all_result->getNextRow(false, false);
2834  CHECK_EQ(size_t(1), count_row.size());
2835  const auto& count_tv = count_row.front();
2836  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2837  CHECK(count_scalar_tv);
2838  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2839  CHECK(count_ptr);
2840  CHECK_GE(*count_ptr, 0);
2841  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2842  return std::max(count_upper_bound, size_t(1));
2843 }
2844 
2845 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2846  const auto& ra_exe_unit = work_unit.exe_unit;
2847  if (ra_exe_unit.input_descs.size() != 1) {
2848  return false;
2849  }
2850  const auto& table_desc = ra_exe_unit.input_descs.front();
2851  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2852  return false;
2853  }
2854  const int table_id = table_desc.getTableId();
2855  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
2856  const auto comp_expr =
2857  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2858  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2859  return false;
2860  }
2861  const auto lhs = comp_expr->get_left_operand();
2862  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2863  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2864  return false;
2865  }
2866  const auto rhs = comp_expr->get_right_operand();
2867  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2868  if (!rhs_const) {
2869  return false;
2870  }
2871  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2872  if (cd->isVirtualCol) {
2873  CHECK_EQ("rowid", cd->columnName);
2874  return true;
2875  }
2876  }
2877  return false;
2878 }
2879 
2881  const RelAlgExecutor::WorkUnit& work_unit,
2882  const std::vector<TargetMetaInfo>& targets_meta,
2883  const bool is_agg,
2884  const CompilationOptions& co,
2885  const ExecutionOptions& eo,
2886  RenderInfo* render_info,
2887  const bool was_multifrag_kernel_launch,
2888  const int64_t queue_time_ms) {
2889  // Disable the bump allocator
2890  // Note that this will have basically the same affect as using the bump allocator for
2891  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2892  // path and the bump allocator path for kernel per fragment execution.
2893  auto ra_exe_unit_in = work_unit.exe_unit;
2894  ra_exe_unit_in.use_bump_allocator = false;
2895 
2896  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2897  co.device_type,
2899  nullptr,
2900  executor_),
2901  {}};
2902 
2903  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2904  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2905  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2906  false,
2907  false,
2908  eo.allow_loop_joins,
2909  eo.with_watchdog,
2910  eo.jit_debug,
2911  false,
2914  false,
2915  false,
2917  false,
2919  eo.executor_type,
2921 
2922  if (was_multifrag_kernel_launch) {
2923  try {
2924  // Attempt to retry using the kernel per fragment path. The smaller input size
2925  // required may allow the entire kernel to execute in GPU memory.
2926  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2927  "kernels disabled.";
2928  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2929  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
2930  ColumnCacheMap column_cache;
2931  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2932  is_agg,
2933  table_infos,
2934  ra_exe_unit,
2935  co,
2936  eo_no_multifrag,
2937  cat_,
2938  executor_->row_set_mem_owner_,
2939  nullptr,
2940  true,
2941  column_cache),
2942  targets_meta};
2943  result.setQueueTime(queue_time_ms);
2944  } catch (const QueryExecutionError& e) {
2946  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2947  }
2948  }
2949 
2950  if (render_info) {
2951  render_info->setForceNonInSituData();
2952  }
2953 
2954  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
2955  // Only reset the group buffer entry guess if we ran out of slots, which
2956  // suggests a
2957  // highly pathological input which prevented a good estimation of distinct tuple
2958  // count. For projection queries, this will force a per-fragment scan limit, which is
2959  // compatible with the CPU path
2960  VLOG(1) << "Resetting max groups buffer entry guess.";
2961  max_groups_buffer_entry_guess = 0;
2962 
2963  int iteration_ctr = -1;
2964  while (true) {
2965  iteration_ctr++;
2967  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
2968  ColumnCacheMap column_cache;
2969  try {
2970  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2971  is_agg,
2972  table_infos,
2973  ra_exe_unit,
2974  co_cpu,
2975  eo_no_multifrag,
2976  cat_,
2977  executor_->row_set_mem_owner_,
2978  nullptr,
2979  true,
2980  column_cache),
2981  targets_meta};
2982  } catch (const QueryExecutionError& e) {
2983  // Ran out of slots
2984  if (e.getErrorCode() < 0) {
2985  // Even the conservative guess failed; it should only happen when we group
2986  // by a huge cardinality array. Maybe we should throw an exception instead?
2987  // Such a heavy query is entirely capable of exhausting all the host memory.
2988  CHECK(max_groups_buffer_entry_guess);
2989  // Only allow two iterations of increasingly large entry guesses up to a maximum
2990  // of 512MB per column per kernel
2991  if (g_enable_watchdog || iteration_ctr > 1) {
2992  throw std::runtime_error("Query ran out of output slots in the result");
2993  }
2994  max_groups_buffer_entry_guess *= 2;
2995  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
2996  "groups buffer entry "
2997  "guess equal to "
2998  << max_groups_buffer_entry_guess;
2999  } else {
3001  }
3002  continue;
3003  }
3004  result.setQueueTime(queue_time_ms);
3005  return result;
3006  }
3007  return result;
3008 }
3009 
3011  LOG(ERROR) << "Query execution failed with error "
3012  << getErrorMessageFromCode(error_code);
3013  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3014  // We ran out of GPU memory, this doesn't count as an error if the query is
3015  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3016  // --allow-cpu-retry.
3017  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3018  if (!g_allow_cpu_retry) {
3019  throw std::runtime_error(
3020  "Query ran out of GPU memory, unable to automatically retry on CPU");
3021  }
3022  return;
3023  }
3024  throw std::runtime_error(getErrorMessageFromCode(error_code));
3025 }
3026 
3028  if (error_code < 0) {
3029  return "Ran out of slots in the query output buffer";
3030  }
3031  switch (error_code) {
3033  return "Division by zero";
3035  return "Query couldn't keep the entire working set of columns in GPU memory";
3037  return "Self joins not supported yet";
3039  return "Not enough host memory to execute the query";
3041  return "Overflow or underflow";
3043  return "Query execution has exceeded the time limit";
3045  return "Query execution has been interrupted";
3047  return "Columnar conversion not supported for variable length types";
3049  return "Too many literals in the query";
3051  return "NONE ENCODED String types are not supported as input result set.";
3053  return "Not enough OpenGL memory to render the query results";
3055  return "Streaming-Top-N not supported in Render Query";
3057  return "Multiple distinct values encountered";
3058  case Executor::ERR_GEOS:
3059  return "Geos call failure";
3060  }
3061  return "Other error: code " + std::to_string(error_code);
3062 }
3063 
3065  const SortInfo& sort_info,
3066  const ExecutionOptions& eo) {
3067  const auto compound = dynamic_cast<const RelCompound*>(node);
3068  if (compound) {
3069  return createCompoundWorkUnit(compound, sort_info, eo);
3070  }
3071  const auto project = dynamic_cast<const RelProject*>(node);
3072  if (project) {
3073  return createProjectWorkUnit(project, sort_info, eo);
3074  }
3075  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3076  if (aggregate) {
3077  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3078  }
3079  const auto filter = dynamic_cast<const RelFilter*>(node);
3080  if (filter) {
3081  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3082  }
3083  LOG(FATAL) << "Unhandled node type: " << node->toString();
3084  return {};
3085 }
3086 
3087 namespace {
3088 
3090  auto sink = get_data_sink(ra);
3091  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3092  return join->getJoinType();
3093  }
3094  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3095  return JoinType::INNER;
3096  }
3097 
3098  return JoinType::INVALID;
3099 }
3100 
3101 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3102  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3103  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3104  return nullptr;
3105  }
3106  const auto equi_join_condition =
3107  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3108  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3109  return nullptr;
3110  }
3111  const auto both_are_null_condition =
3112  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3113  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3114  both_are_null_condition->size() != 2) {
3115  return nullptr;
3116  }
3117  const auto lhs_is_null =
3118  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3119  const auto rhs_is_null =
3120  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3121  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3122  rhs_is_null->getOperator() != kISNULL) {
3123  return nullptr;
3124  }
3125  CHECK_EQ(size_t(1), lhs_is_null->size());
3126  CHECK_EQ(size_t(1), rhs_is_null->size());
3127  CHECK_EQ(size_t(2), equi_join_condition->size());
3128  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3129  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3130  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3131  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3132  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3133  return nullptr;
3134  }
3135  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3136  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3137  RexDeepCopyVisitor deep_copy_visitor;
3138  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3139  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3140  eq_operands.emplace_back(lhs_op_copy.release());
3141  eq_operands.emplace_back(rhs_op_copy.release());
3142  return boost::make_unique<const RexOperator>(
3143  kBW_EQ, eq_operands, equi_join_condition->getType());
3144  }
3145  return nullptr;
3146 }
3147 
3148 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3149  const RexScalar* scalar) {
3150  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3151  if (condition && condition->getOperator() == kAND) {
3152  CHECK_GE(condition->size(), size_t(2));
3153  auto acc = get_bitwise_equals(condition->getOperand(0));
3154  if (!acc) {
3155  return nullptr;
3156  }
3157  for (size_t i = 1; i < condition->size(); ++i) {
3158  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3159  and_operands.emplace_back(std::move(acc));
3160  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3161  acc =
3162  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3163  }
3164  return acc;
3165  }
3166  return get_bitwise_equals(scalar);
3167 }
3168 
3169 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3170  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3171  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3172  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3173  ++nesting_level) {
3174  if (left_deep_join->getOuterCondition(nesting_level)) {
3175  join_types[nesting_level - 1] = JoinType::LEFT;
3176  }
3177  }
3178  return join_types;
3179 }
3180 
3181 template <class RA>
3182 std::vector<size_t> do_table_reordering(
3183  std::vector<InputDescriptor>& input_descs,
3184  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3185  const JoinQualsPerNestingLevel& left_deep_join_quals,
3186  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3187  const RA* node,
3188  const std::vector<InputTableInfo>& query_infos,
3189  const Executor* executor) {
3190  if (g_cluster) {
3191  // Disable table reordering in distributed mode. The aggregator does not have enough
3192  // information to break ties
3193  return {};
3194  }
3195  const auto& cat = *executor->getCatalog();
3196  for (const auto& table_info : query_infos) {
3197  if (table_info.table_id < 0) {
3198  continue;
3199  }
3200  const auto td = cat.getMetadataForTable(table_info.table_id);
3201  CHECK(td);
3202  if (table_is_replicated(td)) {
3203  return {};
3204  }
3205  }
3206  const auto input_permutation =
3207  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3208  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3209  std::tie(input_descs, input_col_descs, std::ignore) =
3210  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3211  return input_permutation;
3212 }
3213 
3215  const RelLeftDeepInnerJoin* left_deep_join) {
3216  std::vector<size_t> input_sizes;
3217  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3218  const auto inputs = get_node_output(left_deep_join->getInput(i));
3219  input_sizes.push_back(inputs.size());
3220  }
3221  return input_sizes;
3222 }
3223 
3224 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3225  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3226  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3227  for (const auto& qual : quals) {
3228  const auto rewritten_qual = rewrite_expr(qual.get());
3229  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3230  }
3231  return rewritten_quals;
3232 }
3233 
3234 } // namespace
3235 
3237  const RelCompound* compound,
3238  const SortInfo& sort_info,
3239  const ExecutionOptions& eo) {
3240  std::vector<InputDescriptor> input_descs;
3241  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3242  auto input_to_nest_level = get_input_nest_levels(compound, {});
3243  std::tie(input_descs, input_col_descs, std::ignore) =
3244  get_input_desc(compound, input_to_nest_level, {}, cat_);
3245  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3246  const auto query_infos = get_table_infos(input_descs, executor_);
3247  CHECK_EQ(size_t(1), compound->inputCount());
3248  const auto left_deep_join =
3249  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3250  JoinQualsPerNestingLevel left_deep_join_quals;
3251  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3252  : std::vector<JoinType>{get_join_type(compound)};
3253  std::vector<size_t> input_permutation;
3254  std::vector<size_t> left_deep_join_input_sizes;
3255  if (left_deep_join) {
3256  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3257  left_deep_join_quals = translateLeftDeepJoinFilter(
3258  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3260  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3261  join_types.end()) {
3262  input_permutation = do_table_reordering(input_descs,
3263  input_col_descs,
3264  left_deep_join_quals,
3265  input_to_nest_level,
3266  compound,
3267  query_infos,
3268  executor_);
3269  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3270  std::tie(input_descs, input_col_descs, std::ignore) =
3271  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3272  left_deep_join_quals = translateLeftDeepJoinFilter(
3273  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3274  }
3275  }
3276  RelAlgTranslator translator(cat_,
3277  query_state_,
3278  executor_,
3279  input_to_nest_level,
3280  join_types,
3281  now_,
3282  eo.just_explain);
3283  const auto scalar_sources =
3284  translate_scalar_sources(compound, translator, eo.executor_type);
3285  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3286  const auto quals_cf = translate_quals(compound, translator);
3287  const auto target_exprs = translate_targets(target_exprs_owned_,
3288  scalar_sources,
3289  groupby_exprs,
3290  compound,
3291  translator,
3292  eo.executor_type);
3293  CHECK_EQ(compound->size(), target_exprs.size());
3294  const RelAlgExecutionUnit exe_unit = {input_descs,
3295  input_col_descs,
3296  quals_cf.simple_quals,
3297  rewrite_quals(quals_cf.quals),
3298  left_deep_join_quals,
3299  groupby_exprs,
3300  target_exprs,
3301  nullptr,
3302  sort_info,
3303  0,
3304  false,
3305  std::nullopt,
3306  query_state_};
3307  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3308  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3309  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3310  compound->setOutputMetainfo(targets_meta);
3311  return {rewritten_exe_unit,
3312  compound,
3314  std::move(query_rewriter),
3315  input_permutation,
3316  left_deep_join_input_sizes};
3317 }
3318 
3319 namespace {
3320 
3321 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3322  CHECK(qual_expr);
3323  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3324  if (!bin_oper || bin_oper->getOperator() != kAND) {
3325  return {qual_expr};
3326  }
3327  CHECK_GE(bin_oper->size(), size_t(2));
3328  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3329  for (size_t i = 1; i < bin_oper->size(); ++i) {
3330  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3331  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3332  }
3333  return lhs_cf;
3334 }
3335 
3336 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3337  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3338  const SQLOps sql_op) {
3339  CHECK(!factors.empty());
3340  auto acc = factors.front();
3341  for (size_t i = 1; i < factors.size(); ++i) {
3342  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3343  }
3344  return acc;
3345 }
3346 
3347 template <class QualsList>
3348 bool list_contains_expression(const QualsList& haystack,
3349  const std::shared_ptr<Analyzer::Expr>& needle) {
3350  for (const auto& qual : haystack) {
3351  if (*qual == *needle) {
3352  return true;
3353  }
3354  }
3355  return false;
3356 }
3357 
3358 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3359 // evaluations of `p` and allows use of the original form in joins if `p`
3360 // can be used for hash joins.
3361 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3362  const std::shared_ptr<Analyzer::Expr>& expr) {
3363  const auto expr_terms = qual_to_disjunctive_form(expr);
3364  CHECK_GE(expr_terms.size(), size_t(1));
3365  const auto& first_term = expr_terms.front();
3366  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3367  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3368  // First, collect the conjunctive components common to all the disjunctive components.
3369  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3370  for (const auto& first_term_factor : first_term_factors.quals) {
3371  bool is_common =
3372  expr_terms.size() > 1; // Only report common factors for disjunction.
3373  for (size_t i = 1; i < expr_terms.size(); ++i) {
3374  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3375  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3376  is_common = false;
3377  break;
3378  }
3379  }
3380  if (is_common) {
3381  common_factors.push_back(first_term_factor);
3382  }
3383  }
3384  if (common_factors.empty()) {
3385  return expr;
3386  }
3387  // Now that the common expressions are known, collect the remaining expressions.
3388  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3389  for (const auto& term : expr_terms) {
3390  const auto term_cf = qual_to_conjunctive_form(term);
3391  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3392  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3393  for (const auto& qual : term_cf.quals) {
3394  if (!list_contains_expression(common_factors, qual)) {
3395  remaining_quals.push_back(qual);
3396  }
3397  }
3398  if (!remaining_quals.empty()) {
3399  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3400  }
3401  }
3402  // Reconstruct the expression with the transformation applied.
3403  const auto common_expr = build_logical_expression(common_factors, kAND);
3404  if (remaining_terms.empty()) {
3405  return common_expr;
3406  }
3407  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3408  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3409 }
3410 
3411 } // namespace
3412 
3413 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3414  const RexScalar* join_condition,
3415  const std::vector<JoinType>& join_types,
3416  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3417  const bool just_explain) const {
3418  RelAlgTranslator translator(
3419  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3420  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3421  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3422  for (const auto rex_condition_component : rex_condition_cf) {
3423  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3424  const auto join_condition =
3426  bw_equals ? bw_equals.get() : rex_condition_component));
3427  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3428  join_condition_quals.insert(join_condition_quals.end(),
3429  join_condition_cf.quals.begin(),
3430  join_condition_cf.quals.end());
3431  join_condition_quals.insert(join_condition_quals.end(),
3432  join_condition_cf.simple_quals.begin(),
3433  join_condition_cf.simple_quals.end());
3434  }
3435  return combine_equi_join_conditions(join_condition_quals);
3436 }
3437 
3438 // Translate left deep join filter and separate the conjunctive form qualifiers
3439 // per nesting level. The code generated for hash table lookups on each level
3440 // must dominate its uses in deeper nesting levels.
3442  const RelLeftDeepInnerJoin* join,
3443  const std::vector<InputDescriptor>& input_descs,
3444  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3445  const bool just_explain) {
3446  const auto join_types = left_deep_join_types(join);
3447  const auto join_condition_quals = makeJoinQuals(
3448  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3449  MaxRangeTableIndexVisitor rte_idx_visitor;
3450  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3451  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3452  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3453  const auto outer_condition = join->getOuterCondition(rte_idx);
3454  if (outer_condition) {
3455  result[rte_idx - 1].quals =
3456  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3457  CHECK_LE(rte_idx, join_types.size());
3458  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3459  result[rte_idx - 1].type = JoinType::LEFT;
3460  continue;
3461  }
3462  for (const auto& qual : join_condition_quals) {
3463  if (visited_quals.count(qual)) {
3464  continue;
3465  }
3466  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3467  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3468  const auto it_ok = visited_quals.emplace(qual);
3469  CHECK(it_ok.second);
3470  result[rte_idx - 1].quals.push_back(qual);
3471  }
3472  }
3473  CHECK_LE(rte_idx, join_types.size());
3474  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3475  result[rte_idx - 1].type = JoinType::INNER;
3476  }
3477  return result;
3478 }
3479 
3480 namespace {
3481 
3482 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3483  const RelAlgNode* ra_node,
3484  const size_t nest_level,
3485  const std::vector<TargetMetaInfo>& in_metainfo,
3486  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3487  CHECK_LE(size_t(1), ra_node->inputCount());
3488  CHECK_GE(size_t(2), ra_node->inputCount());
3489  const auto input = ra_node->getInput(nest_level);
3490  const auto it_rte_idx = input_to_nest_level.find(input);
3491  CHECK(it_rte_idx != input_to_nest_level.end());
3492  const int rte_idx = it_rte_idx->second;
3493  const int table_id = table_id_from_ra(input);
3494  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3495  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3496  int input_idx = 0;
3497  for (const auto& input_meta : in_metainfo) {
3498  inputs.push_back(
3499  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3500  table_id,
3501  scan_ra ? input_idx + 1 : input_idx,
3502  rte_idx));
3503  ++input_idx;
3504  }
3505  return inputs;
3506 }
3507 
3508 } // namespace
3509 
3511  const RelAggregate* aggregate,
3512  const SortInfo& sort_info,
3513  const bool just_explain) {
3514  std::vector<InputDescriptor> input_descs;
3515  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3516  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3517  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3518  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3519  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3520  const auto join_type = get_join_type(aggregate);
3521 
3522  RelAlgTranslator translator(cat_,
3523  query_state_,
3524  executor_,
3525  input_to_nest_level,
3526  {join_type},
3527  now_,
3528  just_explain);
3529  CHECK_EQ(size_t(1), aggregate->inputCount());
3530  const auto source = aggregate->getInput(0);
3531  const auto& in_metainfo = source->getOutputMetainfo();
3532  const auto scalar_sources =
3533  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3534  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3535  const auto target_exprs = translate_targets(
3536  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3537  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3538  aggregate->setOutputMetainfo(targets_meta);
3539  return {RelAlgExecutionUnit{input_descs,
3540  input_col_descs,
3541  {},
3542  {},
3543  {},
3544  groupby_exprs,
3545  target_exprs,
3546  nullptr,
3547  sort_info,
3548  0,
3549  false,
3550  std::nullopt,
3551  query_state_},
3552  aggregate,
3554  nullptr};
3555 }
3556 
3558  const RelProject* project,
3559  const SortInfo& sort_info,
3560  const ExecutionOptions& eo) {
3561  std::vector<InputDescriptor> input_descs;
3562  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3563  auto input_to_nest_level = get_input_nest_levels(project, {});
3564  std::tie(input_descs, input_col_descs, std::ignore) =
3565  get_input_desc(project, input_to_nest_level, {}, cat_);
3566  const auto query_infos = get_table_infos(input_descs, executor_);
3567 
3568  const auto left_deep_join =
3569  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3570  JoinQualsPerNestingLevel left_deep_join_quals;
3571  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3572  : std::vector<JoinType>{get_join_type(project)};
3573  std::vector<size_t> input_permutation;
3574  std::vector<size_t> left_deep_join_input_sizes;
3575  if (left_deep_join) {
3576  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3577  const auto query_infos = get_table_infos(input_descs, executor_);
3578  left_deep_join_quals = translateLeftDeepJoinFilter(
3579  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3581  input_permutation = do_table_reordering(input_descs,
3582  input_col_descs,
3583  left_deep_join_quals,
3584  input_to_nest_level,
3585  project,
3586  query_infos,
3587  executor_);
3588  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3589  std::tie(input_descs, input_col_descs, std::ignore) =
3590  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3591  left_deep_join_quals = translateLeftDeepJoinFilter(
3592  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3593  }
3594  }
3595 
3596  RelAlgTranslator translator(cat_,
3597  query_state_,
3598  executor_,
3599  input_to_nest_level,
3600  join_types,
3601  now_,
3602  eo.just_explain);
3603  const auto target_exprs_owned =
3604  translate_scalar_sources(project, translator, eo.executor_type);
3605  target_exprs_owned_.insert(
3606  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3607  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3608 
3609  const RelAlgExecutionUnit exe_unit = {input_descs,
3610  input_col_descs,
3611  {},
3612  {},
3613  left_deep_join_quals,
3614  {nullptr},
3615  target_exprs,
3616  nullptr,
3617  sort_info,
3618  0,
3619  false,
3620  std::nullopt,
3621  query_state_};
3622  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3623  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3624  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3625  project->setOutputMetainfo(targets_meta);
3626  return {rewritten_exe_unit,
3627  project,
3629  std::move(query_rewriter),
3630  input_permutation,
3631  left_deep_join_input_sizes};
3632 }
3633 
3634 namespace {
3635 
3636 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
3637  RelAlgNode const* input_node) {
3638  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
3639  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
3640  const int negative_node_id = -input_node->getId();
3641  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
3642  target_exprs.reserve(tmis.size());
3643  for (size_t i = 0; i < tmis.size(); ++i) {
3644  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
3645  tmis[i].get_type_info(), negative_node_id, i, 0));
3646  }
3647  return target_exprs;
3648 }
3649 
3650 } // namespace
3651 
3653  const RelLogicalUnion* logical_union,
3654  const SortInfo& sort_info,
3655  const ExecutionOptions& eo) {
3656  std::vector<InputDescriptor> input_descs;
3657  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3658  // Map ra input ptr to index (0, 1).
3659  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3660  std::tie(input_descs, input_col_descs, std::ignore) =
3661  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3662  const auto query_infos = get_table_infos(input_descs, executor_);
3663  auto const max_num_tuples =
3664  std::accumulate(query_infos.cbegin(),
3665  query_infos.cend(),
3666  size_t(0),
3667  [](auto max, auto const& query_info) {
3668  return std::max(max, query_info.info.getNumTuples());
3669  });
3670 
3671  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3672  for (auto& pair : input_to_nest_level) {
3673  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3674  }
3675 
3676  RelAlgTranslator translator(
3677  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3678 
3679  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3680  CHECK(!input_exprs_owned.empty())
3681  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3682  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3683  for (auto& input_expr : input_exprs_owned) {
3684  VLOG(3) << " " << input_expr->toString();
3685  }
3686  target_exprs_owned_.insert(
3687  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3688  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3689 
3690  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3691  << " input_col_descs=" << shared::printContainer(input_col_descs)
3692  << " target_exprs.size()=" << target_exprs.size()
3693  << " max_num_tuples=" << max_num_tuples;
3694 
3695  const RelAlgExecutionUnit exe_unit = {input_descs,
3696  input_col_descs,
3697  {}, // quals_cf.simple_quals,
3698  {}, // rewrite_quals(quals_cf.quals),
3699  {},
3700  {nullptr},
3701  target_exprs,
3702  nullptr,
3703  sort_info,
3704  max_num_tuples,
3705  false,
3706  logical_union->isAll(),
3707  query_state_};
3708  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3709  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3710 
3711  RelAlgNode const* input0 = logical_union->getInput(0);
3712  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3713  logical_union->setOutputMetainfo(
3714  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3715  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3716  logical_union->setOutputMetainfo(
3717  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3718  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3719  logical_union->setOutputMetainfo(
3720  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3721  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
3722  logical_union->setOutputMetainfo(
3723  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3724  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
3725  logical_union->setOutputMetainfo(
3726  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3727  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
3728  logical_union->setOutputMetainfo(
3729  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3730  } else if (dynamic_cast<const RelSort*>(input0)) {
3731  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
3732  } else {
3733  throw QueryNotSupported("Unsupported input type: " + input0->toString());
3734  }
3735  VLOG(3) << "logical_union->getOutputMetainfo()="
3736  << shared::printContainer(logical_union->getOutputMetainfo())
3737  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
3738  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
3739 
3740  return {rewritten_exe_unit,
3741  logical_union,
3743  std::move(query_rewriter)};
3744 }
3745 
3747  const RelTableFunction* table_func,
3748  const bool just_explain) {
3749  std::vector<InputDescriptor> input_descs;
3750  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3751  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3752  std::tie(input_descs, input_col_descs, std::ignore) =
3753  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3754  const auto query_infos = get_table_infos(input_descs, executor_);
3755  CHECK_EQ(size_t(1), table_func->inputCount());
3756 
3757  RelAlgTranslator translator(
3758  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
3759  const auto input_exprs_owned =
3760  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3761  target_exprs_owned_.insert(
3762  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3763  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3764 
3765  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3766  for (auto input_expr : input_exprs) {
3767  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3768  input_col_exprs.push_back(col_var);
3769  }
3770  }
3771  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3772 
3773  const auto& table_function_impl =
3775 
3776  std::vector<Analyzer::Expr*> table_func_outputs;
3777  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3778  const auto ti = table_function_impl.getOutputSQLType(i);
3779  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3780  table_func_outputs.push_back(target_exprs_owned_.back().get());
3781  }
3782 
3783  std::optional<size_t> output_row_multiplier;
3784  if (table_function_impl.hasUserSpecifiedOutputMultiplier()) {
3785  const auto parameter_index = table_function_impl.getOutputRowParameter();
3786  CHECK_GT(parameter_index, size_t(0));
3787  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3788  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3789  if (!parameter_expr_literal) {
3790  throw std::runtime_error(
3791  "Provided output buffer multiplier parameter is not a literal. Only literal "
3792  "values are supported with output buffer multiplier configured table "
3793  "functions.");
3794  }
3795  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3796  if (literal_val < 0) {
3797  throw std::runtime_error("Provided output row multiplier " +
3798  std::to_string(literal_val) +
3799  " is not valid for table functions.");
3800  }
3801  output_row_multiplier = static_cast<size_t>(literal_val);
3802  }
3803 
3804  const TableFunctionExecutionUnit exe_unit = {
3805  input_descs,
3806  input_col_descs,
3807  input_exprs, // table function inputs
3808  input_col_exprs, // table function column inputs (duplicates w/ above)
3809  table_func_outputs, // table function projected exprs
3810  output_row_multiplier, // output buffer multiplier
3811  table_func->getFunctionName()};
3812  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3813  table_func->setOutputMetainfo(targets_meta);
3814  return {exe_unit, table_func};
3815 }
3816 
3817 namespace {
3818 
3819 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3821  const RelAlgTranslator& translator,
3822  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3823  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3824  std::vector<TargetMetaInfo> in_metainfo;
3825  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3826  const auto data_sink_node = get_data_sink(filter);
3827  auto input_it = inputs_owned.begin();
3828  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3829  const auto source = data_sink_node->getInput(nest_level);
3830  const auto scan_source = dynamic_cast<const RelScan*>(source);
3831  if (scan_source) {
3832  CHECK(source->getOutputMetainfo().empty());
3833  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3834  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3835  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3836  }
3837  const auto source_metadata =
3838  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3839  in_metainfo.insert(
3840  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3841  exprs_owned.insert(
3842  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3843  } else {
3844  const auto& source_metadata = source->getOutputMetainfo();
3845  input_it += source_metadata.size();
3846  in_metainfo.insert(
3847  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3848  const auto scalar_sources_owned = synthesize_inputs(
3849  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3850  exprs_owned.insert(
3851  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3852  }
3853  }
3854  return std::make_pair(in_metainfo, exprs_owned);
3855 }
3856 
3857 } // namespace
3858 
3860  const SortInfo& sort_info,
3861  const bool just_explain) {
3862  CHECK_EQ(size_t(1), filter->inputCount());
3863  std::vector<InputDescriptor> input_descs;
3864  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3865  std::vector<TargetMetaInfo> in_metainfo;
3866  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3867  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3868 
3869  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3870  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3871  get_input_desc(filter, input_to_nest_level, {}, cat_);
3872  const auto join_type = get_join_type(filter);
3873  RelAlgTranslator translator(cat_,
3874  query_state_,
3875  executor_,
3876  input_to_nest_level,
3877  {join_type},
3878  now_,
3879  just_explain);
3880  std::tie(in_metainfo, target_exprs_owned) =
3881  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3882  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3883  const auto qual = fold_expr(filter_expr.get());
3884  target_exprs_owned_.insert(
3885  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3886  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3887  filter->setOutputMetainfo(in_metainfo);
3888  const auto rewritten_qual = rewrite_expr(qual.get());
3889  return {{input_descs,
3890  input_col_descs,
3891  {},
3892  {rewritten_qual ? rewritten_qual : qual},
3893  {},
3894  {nullptr},
3895  target_exprs,
3896  nullptr,
3897  sort_info,
3898  0},
3899  filter,
3901  nullptr};
3902 }
3903 
const size_t getGroupByCount() const
bool hasRows() const
bool isAll() const
SQLTypeInfo getOutputSQLType(const size_t idx) const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
JoinType
Definition: sqldefs.h:107
HOST DEVICE int get_size() const
Definition: sqltypes.h:267
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
#define EMPTY_KEY_64
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain)
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string cat(Ts &&...args)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1192
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1032
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
Definition: sqltypes.h:50
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:148
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2726
size_t size() const override
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:149
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:75
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:828
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:188
const unsigned runtime_query_interrupt_frequency
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:66
bool g_enable_union
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3478
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:98
const RexScalar * getCondition() const
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:335
Definition: sqldefs.h:38
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1625
#define UNREACHABLE()
Definition: Logger.h:241
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
static const TableFunction & get(const std::string &name)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:807
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1038
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1034
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:191
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:257
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
size_t getNDVEstimation(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1411
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:287
bool inputMetainfoTypesMatch() const
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:41
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:396
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1035
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:129
size_t getRowsSize() const
size_t getColInputsSize() const
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1036
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1033
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:156
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1024
static std::shared_ptr< Analyzer::Expr > translateLiteral(const RexLiteral *)
const bool allow_multifrag
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:81
CHECK(cgen_state)
const bool just_validate
unsigned getId() const
Classes representing a parse tree.
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
const SortInfo sort_info
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1028
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
const double gpu_input_mem_limit_percent
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:97
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1030
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1031
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1027
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1054
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1037
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1025
size_t getTableFuncInputsSize() const
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1545
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:141
int get_result_table_id() const
Definition: Analyzer.h:1582
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:272
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:98
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const std::vector< size_t > outer_fragment_indices
const RexScalar * getProjectAt(const size_t idx) const
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
Definition: sqltypes.h:53
bool hasInput(const RelAlgNode *needle) const
Definition: sqltypes.h:54
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:196
const bool just_calcite_explain
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:858
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:265
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::shared_ptr< const query_state::QueryState > query_state_
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
Datum get_constval() const
Definition: Analyzer.h:336
Definition: sqldefs.h:76
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
const RelAlgNode * getSourceNode() const
bool isAggregate() const
size_t getLimit() const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
Definition: sqltypes.h:42
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:266
const RexScalar * getTableFuncInputAt(const size_t idx) const
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1051
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool g_enable_bump_allocator
Definition: Execute.cpp:104
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getValueAt(const size_t row_idx, const size_t col_idx) const
const RexScalar * getInnerCondition() const
virtual std::string toString() const =0
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
bool is_geometry() const
Definition: sqltypes.h:420
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:78
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:443
void cleanupPostExecution()
std::vector< std::string > ColumnNameList
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
Definition: sqltypes.h:46
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:60
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
bool is_string() const
Definition: sqltypes.h:408
const size_t inputCount() const
specifies the content in-memory of a row in the table metadata table
const bool allow_runtime_query_interrupt
int8_t * numbersPtr
Definition: sqltypes.h:147
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Definition: sqldefs.h:74
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:1583
bool sameTypeInfo(std::vector< TargetMetaInfo > const &lhs, std::vector< TargetMetaInfo > const &rhs)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1407
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1029
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:612
const TableDescriptor * getTableDescriptor() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
Definition: sqldefs.h:72
const std::vector< std::shared_ptr< TargetEntry > > & get_targetlist() const
Definition: Analyzer.h:1566
static size_t getArenaBlockSize()
Definition: Execute.cpp:197
Executor * executor_
int getNestLevel() const
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:172
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1540
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:119
bool g_enable_table_functions
Definition: Execute.cpp:99
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:325
const RexScalar * getScalarSource(const size_t i) const
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)