OmniSciDB  8a228a1076
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 #include "Parser/ParserNode.h"
31 #include "QueryEngine/RexVisitor.h"
35 #include "Shared/measure.h"
36 #include "Shared/misc.h"
37 #include "Shared/shard_key.h"
38 
39 #include <boost/algorithm/cxx11/any_of.hpp>
40 #include <boost/range/adaptor/reversed.hpp>
41 
42 #include <algorithm>
43 #include <functional>
44 #include <numeric>
45 
47 extern bool g_enable_bump_allocator;
48 bool g_enable_interop{false};
49 bool g_enable_union{false};
50 
51 namespace {
52 
53 bool node_is_aggregate(const RelAlgNode* ra) {
54  const auto compound = dynamic_cast<const RelCompound*>(ra);
55  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
56  return ((compound && compound->isAggregate()) || aggregate);
57 }
58 
59 std::unordered_set<PhysicalInput> get_physical_inputs(
61  const RelAlgNode* ra) {
62  auto phys_inputs = get_physical_inputs(ra);
63  std::unordered_set<PhysicalInput> phys_inputs2;
64  for (auto& phi : phys_inputs) {
65  phys_inputs2.insert(
66  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
67  }
68  return phys_inputs2;
69 }
70 
71 } // namespace
72 
74  const ExecutionOptions& eo) {
76  return 0;
77  }
78 
79  if (eo.just_explain) {
80  return 0;
81  }
82 
83  CHECK(query_dag_);
84 
85  query_dag_->resetQueryExecutionState();
86  const auto& ra = query_dag_->getRootNode();
87 
88  mapd_shared_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
89  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
90  const auto phys_inputs = get_physical_inputs(cat_, &ra);
91  const auto phys_table_ids = get_physical_table_inputs(&ra);
92  executor_->setCatalog(&cat_);
93  executor_->setupCaching(phys_inputs, phys_table_ids);
94 
95  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
96  auto ed_seq = RaExecutionSequence(&ra);
97 
98  if (!getSubqueries().empty()) {
99  return 0;
100  }
101 
102  CHECK(!ed_seq.empty());
103  if (ed_seq.size() > 1) {
104  return 0;
105  }
106 
107  decltype(temporary_tables_)().swap(temporary_tables_);
108  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
109  executor_->catalog_ = &cat_;
110  executor_->temporary_tables_ = &temporary_tables_;
111 
113  auto exec_desc_ptr = ed_seq.getDescriptor(0);
114  CHECK(exec_desc_ptr);
115  auto& exec_desc = *exec_desc_ptr;
116  const auto body = exec_desc.getBody();
117  if (body->isNop()) {
118  return 0;
119  }
120 
121  const auto project = dynamic_cast<const RelProject*>(body);
122  if (project) {
123  auto work_unit =
124  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
125 
126  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
127  executor_);
128  }
129 
130  const auto compound = dynamic_cast<const RelCompound*>(body);
131  if (compound) {
132  if (compound->isDeleteViaSelect()) {
133  return 0;
134  } else if (compound->isUpdateViaSelect()) {
135  return 0;
136  } else {
137  if (compound->isAggregate()) {
138  return 0;
139  }
140 
141  const auto work_unit =
142  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
143 
144  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
145  executor_);
146  }
147  }
148 
149  return 0;
150 }
151 
153  const ExecutionOptions& eo,
154  const bool just_explain_plan,
155  RenderInfo* render_info) {
156  CHECK(query_dag_);
157  auto timer = DEBUG_TIMER(__func__);
158  INJECT_TIMER(executeRelAlgQuery);
159 
160  try {
161  return executeRelAlgQueryNoRetry(co, eo, just_explain_plan, render_info);
162  } catch (const QueryMustRunOnCpu&) {
163  if (!g_allow_cpu_retry) {
164  throw;
165  }
166  }
167  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
168  auto co_cpu = CompilationOptions::makeCpuOnly(co);
169 
170  if (render_info) {
171  render_info->setForceNonInSituData();
172  }
173  return executeRelAlgQueryNoRetry(co_cpu, eo, just_explain_plan, render_info);
174 }
175 
176 namespace {
177 
179  mapd_shared_lock<mapd_shared_mutex> shared_lock;
180  mapd_unique_lock<mapd_shared_mutex> unique_lock;
181 };
182 
183 } // namespace
184 
186  const ExecutionOptions& eo,
187  const bool just_explain_plan,
188  RenderInfo* render_info) {
189  INJECT_TIMER(executeRelAlgQueryNoRetry);
190  auto timer = DEBUG_TIMER(__func__);
191  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
192 
193  query_dag_->resetQueryExecutionState();
194  const auto& ra = query_dag_->getRootNode();
195 
196  // capture the lock acquistion time
197  auto clock_begin = timer_start();
199  executor_->resetInterrupt();
200  }
201  std::string query_session = "";
203  // a request of query execution without session id can happen, i.e., test query
204  // if so, we turn back to the original way: a runtime query interrupt
205  // without per-session management (as similar to dynamic watchdog)
206  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
207  executor_->executor_session_mutex_);
208  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
209  query_session = query_state_->getConstSessionInfo()->get_session_id();
210  } else if (executor_->getCurrentQuerySession(session_read_lock) != query_session) {
211  query_session = executor_->getCurrentQuerySession(session_read_lock);
212  }
213  session_read_lock.unlock();
214  if (query_session != "") {
215  // if session is valid, then we allow per-session runtime query interrupt
216  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
217  executor_->executor_session_mutex_);
218  executor_->addToQuerySessionList(query_session, session_write_lock);
219  session_write_lock.unlock();
220  // hybrid spinlock. if it fails to acquire a lock, then
221  // it sleeps {g_runtime_query_interrupt_frequency} millisecond.
222  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
223  // failed to get the spinlock: check whether query is interrupted
224  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
225  executor_->executor_session_mutex_);
226  bool isQueryInterrupted =
227  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
228  session_read_lock.unlock();
229  if (isQueryInterrupted) {
230  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
231  executor_->executor_session_mutex_);
232  executor_->removeFromQuerySessionList(query_session, session_write_lock);
233  session_write_lock.unlock();
234  throw std::runtime_error(
235  "Query execution has been interrupted (pending query)");
236  }
237  // here it fails to acquire the lock
238  std::this_thread::sleep_for(
239  std::chrono::milliseconds(g_runtime_query_interrupt_frequency));
240  };
241  }
242  // currently, atomic_flag does not provide a way to get its current status,
243  // i.e., spinlock.is_locked(), so we additionally lock the execute_mutex_
244  // right after acquiring spinlock to let other part of the code can know
245  // whether there exists a running query on the executor
246  }
247  auto aquire_execute_mutex = [](Executor* executor) -> ExecutorMutexHolder {
248  ExecutorMutexHolder ret;
249  if (executor->executor_id_ == Executor::UNITARY_EXECUTOR_ID) {
250  // Only one unitary executor can run at a time
251  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(executor->execute_mutex_);
252  } else {
253  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(executor->execute_mutex_);
254  }
255  return ret;
256  };
257  auto lock = aquire_execute_mutex(executor_);
258  ScopeGuard clearRuntimeInterruptStatus = [this] {
259  // reset the runtime query interrupt status
261  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
262  executor_->executor_session_mutex_);
263  std::string curSession = executor_->getCurrentQuerySession(session_read_lock);
264  session_read_lock.unlock();
265  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
266  executor_->executor_session_mutex_);
267  executor_->removeFromQuerySessionList(curSession, session_write_lock);
268  executor_->invalidateQuerySession(session_write_lock);
269  executor_->execute_spin_lock_.clear(std::memory_order_release);
270  session_write_lock.unlock();
271  executor_->resetInterrupt();
272  VLOG(1) << "RESET runtime query interrupt status of Executor " << this;
273  }
274  };
275 
277  // check whether this query session is already interrupted
278  // this case occurs when there is very short gap between being interrupted and
279  // taking the execute lock
280  // if so we interrupt the query session and remove it from the running session list
281  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
282  executor_->executor_session_mutex_);
283  bool isAlreadyInterrupted =
284  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
285  session_read_lock.unlock();
286  if (isAlreadyInterrupted) {
287  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
288  executor_->executor_session_mutex_);
289  executor_->removeFromQuerySessionList(query_session, session_write_lock);
290  session_write_lock.unlock();
291  throw std::runtime_error("Query execution has been interrupted");
292  }
293 
294  // make sure to set the running session ID
295  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
296  executor_->executor_session_mutex_);
297  executor_->invalidateQuerySession(session_write_lock);
298  executor_->setCurrentQuerySession(query_session, session_write_lock);
299  session_write_lock.unlock();
300  }
301 
302  int64_t queue_time_ms = timer_stop(clock_begin);
303  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
304  const auto phys_inputs = get_physical_inputs(cat_, &ra);
305  const auto phys_table_ids = get_physical_table_inputs(&ra);
306  executor_->setCatalog(&cat_);
307  executor_->setupCaching(phys_inputs, phys_table_ids);
308 
309  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
310  auto ed_seq = RaExecutionSequence(&ra);
311 
312  if (just_explain_plan) {
313  std::stringstream ss;
314  std::vector<const RelAlgNode*> nodes;
315  for (size_t i = 0; i < ed_seq.size(); i++) {
316  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
317  }
318  size_t ctr = nodes.size();
319  size_t tab_ctr = 0;
320  for (auto& body : boost::adaptors::reverse(nodes)) {
321  const auto index = ctr--;
322  const auto tabs = std::string(tab_ctr++, '\t');
323  CHECK(body);
324  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
325  if (auto sort = dynamic_cast<const RelSort*>(body)) {
326  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
327  }
328  if (dynamic_cast<const RelProject*>(body) ||
329  dynamic_cast<const RelCompound*>(body)) {
330  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
331  ss << tabs << " : " << join->toString() << "\n";
332  }
333  }
334  }
335  auto rs = std::make_shared<ResultSet>(ss.str());
336  return {rs, {}};
337  }
338 
339  if (render_info) {
340  // set render to be non-insitu in certain situations.
342  ed_seq.size() > 1) {
343  // old logic
344  // disallow if more than one ED
345  render_info->setInSituDataIfUnset(false);
346  }
347  }
348 
349  if (eo.find_push_down_candidates) {
350  // this extra logic is mainly due to current limitations on multi-step queries
351  // and/or subqueries.
352  return executeRelAlgQueryWithFilterPushDown(
353  ed_seq, co, eo, render_info, queue_time_ms);
354  }
355  timer_setup.stop();
356 
357  // Dispatch the subqueries first
358  for (auto subquery : getSubqueries()) {
359  const auto subquery_ra = subquery->getRelAlg();
360  CHECK(subquery_ra);
361  if (subquery_ra->hasContextData()) {
362  continue;
363  }
364  // Execute the subquery and cache the result.
365  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
366  RaExecutionSequence subquery_seq(subquery_ra);
367  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
368  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
369  }
370  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
371 }
372 
374  AggregatedColRange agg_col_range_cache;
375  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
376  return executor_->computeColRangesCache(phys_inputs);
377 }
378 
380  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
381  return executor_->computeStringDictionaryGenerations(phys_inputs);
382 }
383 
385  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
386  return executor_->computeTableGenerations(phys_table_ids);
387 }
388 
389 Executor* RelAlgExecutor::getExecutor() const {
390  return executor_;
391 }
392 
394  CHECK(executor_);
395  executor_->row_set_mem_owner_ = nullptr;
396  executor_->lit_str_dict_proxy_ = nullptr;
397 }
398 
399 namespace {
400 
402  CHECK_EQ(size_t(1), sort->inputCount());
403  const auto source = sort->getInput(0);
404  if (dynamic_cast<const RelSort*>(source)) {
405  throw std::runtime_error("Sort node not supported as input to another sort");
406  }
407 }
408 
409 } // namespace
410 
412  const RaExecutionSequence& seq,
413  const size_t step_idx,
414  const CompilationOptions& co,
415  const ExecutionOptions& eo,
416  RenderInfo* render_info) {
417  INJECT_TIMER(executeRelAlgQueryStep);
418  auto exe_desc_ptr = seq.getDescriptor(step_idx);
419  CHECK(exe_desc_ptr);
420  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
421 
422  size_t shard_count{0};
423  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
424  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
425  };
426 
427  if (sort) {
429  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
431  source_work_unit.exe_unit, *executor_->getCatalog());
432  if (!shard_count) {
433  // No point in sorting on the leaf, only execute the input to the sort node.
434  CHECK_EQ(size_t(1), sort->inputCount());
435  const auto source = sort->getInput(0);
436  if (sort->collationCount() || node_is_aggregate(source)) {
437  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
438  CHECK_EQ(temp_seq.size(), size_t(1));
439  // Use subseq to avoid clearing existing temporary tables
440  return {executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo, nullptr, 0),
441  merge_type(source),
442  source->getId(),
443  false};
444  }
445  }
446  }
447  return {executeRelAlgSubSeq(seq,
448  std::make_pair(step_idx, step_idx + 1),
449  co,
450  eo,
451  render_info,
452  queue_time_ms_),
453  merge_type(exe_desc_ptr->getBody()),
454  exe_desc_ptr->getBody()->getId(),
455  false};
456 }
457 
459  const AggregatedColRange& agg_col_range,
460  const StringDictionaryGenerations& string_dictionary_generations,
461  const TableGenerations& table_generations) {
462  // capture the lock acquistion time
463  auto clock_begin = timer_start();
465  executor_->resetInterrupt();
466  }
467  queue_time_ms_ = timer_stop(clock_begin);
468  executor_->row_set_mem_owner_ =
469  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize());
470  executor_->table_generations_ = table_generations;
471  executor_->agg_col_range_cache_ = agg_col_range;
472  executor_->string_dictionary_generations_ = string_dictionary_generations;
473 }
474 
476  const CompilationOptions& co,
477  const ExecutionOptions& eo,
478  RenderInfo* render_info,
479  const int64_t queue_time_ms,
480  const bool with_existing_temp_tables) {
481  INJECT_TIMER(executeRelAlgSeq);
482  auto timer = DEBUG_TIMER(__func__);
483  if (!with_existing_temp_tables) {
484  decltype(temporary_tables_)().swap(temporary_tables_);
485  }
486  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
487  executor_->catalog_ = &cat_;
488  executor_->temporary_tables_ = &temporary_tables_;
489 
490  time(&now_);
491  CHECK(!seq.empty());
492  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
493 
494  for (size_t i = 0; i < exec_desc_count; i++) {
495  VLOG(1) << "Executing query step " << i;
496  // only render on the last step
497  try {
498  executeRelAlgStep(seq,
499  i,
500  co,
501  eo,
502  (i == exec_desc_count - 1) ? render_info : nullptr,
503  queue_time_ms);
504  } catch (const NativeExecutionError&) {
505  if (!g_enable_interop) {
506  throw;
507  }
508  auto eo_extern = eo;
509  eo_extern.executor_type = ::ExecutorType::Extern;
510  auto exec_desc_ptr = seq.getDescriptor(i);
511  const auto body = exec_desc_ptr->getBody();
512  const auto compound = dynamic_cast<const RelCompound*>(body);
513  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
514  LOG(INFO) << "Also failed to run the query using interoperability";
515  throw;
516  }
517  executeRelAlgStep(seq,
518  i,
519  co,
520  eo_extern,
521  (i == exec_desc_count - 1) ? render_info : nullptr,
522  queue_time_ms);
523  }
524  }
525 
526  return seq.getDescriptor(exec_desc_count - 1)->getResult();
527 }
528 
530  const RaExecutionSequence& seq,
531  const std::pair<size_t, size_t> interval,
532  const CompilationOptions& co,
533  const ExecutionOptions& eo,
534  RenderInfo* render_info,
535  const int64_t queue_time_ms) {
536  INJECT_TIMER(executeRelAlgSubSeq);
537  executor_->catalog_ = &cat_;
538  executor_->temporary_tables_ = &temporary_tables_;
539 
540  time(&now_);
541  for (size_t i = interval.first; i < interval.second; i++) {
542  // only render on the last step
543  executeRelAlgStep(seq,
544  i,
545  co,
546  eo,
547  (i == interval.second - 1) ? render_info : nullptr,
548  queue_time_ms);
549  }
550 
551  return seq.getDescriptor(interval.second - 1)->getResult();
552 }
553 
555  const size_t step_idx,
556  const CompilationOptions& co,
557  const ExecutionOptions& eo,
558  RenderInfo* render_info,
559  const int64_t queue_time_ms) {
560  INJECT_TIMER(executeRelAlgStep);
561  auto timer = DEBUG_TIMER(__func__);
563  auto exec_desc_ptr = seq.getDescriptor(step_idx);
564  CHECK(exec_desc_ptr);
565  auto& exec_desc = *exec_desc_ptr;
566  const auto body = exec_desc.getBody();
567  if (body->isNop()) {
568  handleNop(exec_desc);
569  return;
570  }
571  const ExecutionOptions eo_work_unit{
573  eo.allow_multifrag,
574  eo.just_explain,
575  eo.allow_loop_joins,
576  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
577  eo.jit_debug,
578  eo.just_validate,
586  eo.executor_type,
587  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
588 
589  const auto compound = dynamic_cast<const RelCompound*>(body);
590  if (compound) {
591  if (compound->isDeleteViaSelect()) {
592  executeDelete(compound, co, eo_work_unit, queue_time_ms);
593  } else if (compound->isUpdateViaSelect()) {
594  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
595  } else {
596  exec_desc.setResult(
597  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
598  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
599  << static_cast<int>(-compound->getId()) << ", ...)"
600  << " exec_desc.getResult().getDataPtr()->rowCount()="
601  << exec_desc.getResult().getDataPtr()->rowCount();
602  if (exec_desc.getResult().isFilterPushDownEnabled()) {
603  return;
604  }
605  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
606  }
607  return;
608  }
609  const auto project = dynamic_cast<const RelProject*>(body);
610  if (project) {
611  if (project->isDeleteViaSelect()) {
612  executeDelete(project, co, eo_work_unit, queue_time_ms);
613  } else if (project->isUpdateViaSelect()) {
614  executeUpdate(project, co, eo_work_unit, queue_time_ms);
615  } else {
616  ssize_t prev_count = -1;
617  // Disabling the intermediate count optimization in distributed, as the previous
618  // execution descriptor will likely not hold the aggregated result.
619  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
620  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
621  CHECK(prev_exec_desc);
622  RelAlgNode const* prev_body = prev_exec_desc->getBody();
623  // This optimization needs to be restricted in its application for UNION, which
624  // can have 2 input nodes in which neither should restrict the count of the other.
625  // However some non-UNION queries are measurably slower with this restriction, so
626  // it is only applied when g_enable_union is true.
627  bool const parent_check =
628  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
629  // If the previous node produced a reliable count, skip the pre-flight count
630  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
631  dynamic_cast<const RelLogicalValues*>(prev_body))) {
632  const auto& prev_exe_result = prev_exec_desc->getResult();
633  const auto prev_result = prev_exe_result.getRows();
634  if (prev_result) {
635  prev_count = static_cast<ssize_t>(prev_result->rowCount());
636  }
637  VLOG(3) << "prev_exec_desc->getBody()->toString()="
638  << prev_exec_desc->getBody()->toString()
639  << " prev_count=" << prev_count;
640  }
641  }
642  exec_desc.setResult(executeProject(
643  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
644  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
645  << static_cast<int>(-project->getId()) << ", ...)"
646  << " exec_desc.getResult().getDataPtr()->rowCount()="
647  << exec_desc.getResult().getDataPtr()->rowCount();
648  if (exec_desc.getResult().isFilterPushDownEnabled()) {
649  return;
650  }
651  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
652  }
653  return;
654  }
655  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
656  if (aggregate) {
657  exec_desc.setResult(
658  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
659  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
660  return;
661  }
662  const auto filter = dynamic_cast<const RelFilter*>(body);
663  if (filter) {
664  exec_desc.setResult(
665  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
666  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
667  return;
668  }
669  const auto sort = dynamic_cast<const RelSort*>(body);
670  if (sort) {
671  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
672  if (exec_desc.getResult().isFilterPushDownEnabled()) {
673  return;
674  }
675  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
676  return;
677  }
678  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
679  if (logical_values) {
680  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
681  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
682  return;
683  }
684  const auto modify = dynamic_cast<const RelModify*>(body);
685  if (modify) {
686  exec_desc.setResult(executeModify(modify, eo_work_unit));
687  return;
688  }
689  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
690  if (logical_union) {
691  exec_desc.setResult(
692  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
693  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
694  return;
695  }
696  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
697  if (table_func) {
698  exec_desc.setResult(
699  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
700  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
701  return;
702  }
703  LOG(FATAL) << "Unhandled body type: " << body->toString();
704 }
705 
707  // just set the result of the previous node as the result of no op
708  auto body = ed.getBody();
709  CHECK(dynamic_cast<const RelAggregate*>(body));
710  CHECK_EQ(size_t(1), body->inputCount());
711  const auto input = body->getInput(0);
712  body->setOutputMetainfo(input->getOutputMetainfo());
713  const auto it = temporary_tables_.find(-input->getId());
714  CHECK(it != temporary_tables_.end());
715  // set up temp table as it could be used by the outer query or next step
716  addTemporaryTable(-body->getId(), it->second);
717 
718  ed.setResult({it->second, input->getOutputMetainfo()});
719 }
720 
721 namespace {
722 
723 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
724  public:
726 
727  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
728  return synthesized_physical_inputs_owned;
729  }
730 
731  std::unordered_set<const RexInput*> visitInput(
732  const RexInput* rex_input) const override {
733  const auto input_ra = rex_input->getSourceNode();
734  CHECK(input_ra);
735  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
736  if (scan_ra) {
737  const auto td = scan_ra->getTableDescriptor();
738  if (td) {
739  const auto col_id = rex_input->getIndex();
740  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
741  if (cd && cd->columnType.get_physical_cols() > 0) {
742  CHECK(IS_GEO(cd->columnType.get_type()));
743  std::unordered_set<const RexInput*> synthesized_physical_inputs;
744  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
745  auto physical_input =
746  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
747  synthesized_physical_inputs_owned.emplace_back(physical_input);
748  synthesized_physical_inputs.insert(physical_input);
749  }
750  return synthesized_physical_inputs;
751  }
752  }
753  }
754  return {rex_input};
755  }
756 
757  protected:
758  std::unordered_set<const RexInput*> aggregateResult(
759  const std::unordered_set<const RexInput*>& aggregate,
760  const std::unordered_set<const RexInput*>& next_result) const override {
761  auto result = aggregate;
762  result.insert(next_result.begin(), next_result.end());
763  return result;
764  }
765 
766  private:
767  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
769 };
770 
771 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
772  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
773  return table_func;
774  }
775  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
776  CHECK_EQ(size_t(2), join->inputCount());
777  return join;
778  }
779  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
780  CHECK_EQ(size_t(1), ra_node->inputCount());
781  }
782  auto only_src = ra_node->getInput(0);
783  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
784  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
785  return is_join ? only_src : ra_node;
786 }
787 
788 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
790  RexUsedInputsVisitor visitor(cat);
791  const auto filter_expr = compound->getFilterExpr();
792  std::unordered_set<const RexInput*> used_inputs =
793  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
794  const auto sources_size = compound->getScalarSourcesSize();
795  for (size_t i = 0; i < sources_size; ++i) {
796  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
797  used_inputs.insert(source_inputs.begin(), source_inputs.end());
798  }
799  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
800  return std::make_pair(used_inputs, used_inputs_owned);
801 }
802 
803 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
805  CHECK_EQ(size_t(1), aggregate->inputCount());
806  std::unordered_set<const RexInput*> used_inputs;
807  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
808  const auto source = aggregate->getInput(0);
809  const auto& in_metainfo = source->getOutputMetainfo();
810  const auto group_count = aggregate->getGroupByCount();
811  CHECK_GE(in_metainfo.size(), group_count);
812  for (size_t i = 0; i < group_count; ++i) {
813  auto synthesized_used_input = new RexInput(source, i);
814  used_inputs_owned.emplace_back(synthesized_used_input);
815  used_inputs.insert(synthesized_used_input);
816  }
817  for (const auto& agg_expr : aggregate->getAggExprs()) {
818  for (size_t i = 0; i < agg_expr->size(); ++i) {
819  const auto operand_idx = agg_expr->getOperand(i);
820  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
821  auto synthesized_used_input = new RexInput(source, operand_idx);
822  used_inputs_owned.emplace_back(synthesized_used_input);
823  used_inputs.insert(synthesized_used_input);
824  }
825  }
826  return std::make_pair(used_inputs, used_inputs_owned);
827 }
828 
829 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
831  RexUsedInputsVisitor visitor(cat);
832  std::unordered_set<const RexInput*> used_inputs;
833  for (size_t i = 0; i < project->size(); ++i) {
834  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
835  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
836  }
837  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
838  return std::make_pair(used_inputs, used_inputs_owned);
839 }
840 
841 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
844  RexUsedInputsVisitor visitor(cat);
845  std::unordered_set<const RexInput*> used_inputs;
846  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
847  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
848  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
849  }
850  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
851  return std::make_pair(used_inputs, used_inputs_owned);
852 }
853 
854 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
856  std::unordered_set<const RexInput*> used_inputs;
857  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
858  const auto data_sink_node = get_data_sink(filter);
859  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
860  const auto source = data_sink_node->getInput(nest_level);
861  const auto scan_source = dynamic_cast<const RelScan*>(source);
862  if (scan_source) {
863  CHECK(source->getOutputMetainfo().empty());
864  for (size_t i = 0; i < scan_source->size(); ++i) {
865  auto synthesized_used_input = new RexInput(scan_source, i);
866  used_inputs_owned.emplace_back(synthesized_used_input);
867  used_inputs.insert(synthesized_used_input);
868  }
869  } else {
870  const auto& partial_in_metadata = source->getOutputMetainfo();
871  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
872  auto synthesized_used_input = new RexInput(source, i);
873  used_inputs_owned.emplace_back(synthesized_used_input);
874  used_inputs.insert(synthesized_used_input);
875  }
876  }
877  }
878  return std::make_pair(used_inputs, used_inputs_owned);
879 }
880 
881 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
883  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
884  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
885  used_inputs_owned.reserve(logical_union->inputCount());
886  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
887  auto const n_inputs = logical_union->inputCount();
888  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
889  auto input = logical_union->getInput(nest_level);
890  for (size_t i = 0; i < input->size(); ++i) {
891  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
892  used_inputs.insert(used_inputs_owned.back().get());
893  }
894  }
895  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
896 }
897 
898 int table_id_from_ra(const RelAlgNode* ra_node) {
899  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
900  if (scan_ra) {
901  const auto td = scan_ra->getTableDescriptor();
902  CHECK(td);
903  return td->tableId;
904  }
905  return -ra_node->getId();
906 }
907 
908 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
909  const RelAlgNode* ra_node,
910  const std::vector<size_t>& input_permutation) {
911  const auto data_sink_node = get_data_sink(ra_node);
912  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
913  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
914  const auto input_node_idx =
915  input_permutation.empty() ? input_idx : input_permutation[input_idx];
916  const auto input_ra = data_sink_node->getInput(input_node_idx);
917  // Having a non-zero mapped value (input_idx) results in the query being interpretted
918  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
919  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
920  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
921  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
922  CHECK(it_ok.second);
923  LOG_IF(INFO, !input_permutation.empty())
924  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
925  }
926  return input_to_nest_level;
927 }
928 
929 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
932  const auto data_sink_node = get_data_sink(ra_node);
933  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
934  CHECK_EQ(join->inputCount(), 2u);
935  const auto condition = join->getCondition();
936  RexUsedInputsVisitor visitor(cat);
937  auto condition_inputs = visitor.visit(condition);
938  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
939  visitor.get_inputs_owned());
940  return std::make_pair(condition_inputs, condition_inputs_owned);
941  }
942 
943  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
944  CHECK_GE(left_deep_join->inputCount(), 2u);
945  const auto condition = left_deep_join->getInnerCondition();
946  RexUsedInputsVisitor visitor(cat);
947  auto result = visitor.visit(condition);
948  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
949  ++nesting_level) {
950  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
951  if (outer_condition) {
952  const auto outer_result = visitor.visit(outer_condition);
953  result.insert(outer_result.begin(), outer_result.end());
954  }
955  }
956  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
957  return std::make_pair(result, used_inputs_owned);
958  }
959 
960  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
961  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
962  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
963  CHECK_GT(ra_node->inputCount(), 0u) << ra_node->toString();
964  } else {
965  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
966  }
967  return std::make_pair(std::unordered_set<const RexInput*>{},
968  std::vector<std::shared_ptr<RexInput>>{});
969 }
970 
972  std::vector<InputDescriptor>& input_descs,
974  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
975  const RelAlgNode* ra_node,
976  const std::unordered_set<const RexInput*>& source_used_inputs,
977  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
978  VLOG(3) << "ra_node=" << ra_node->toString()
979  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
980  << " source_used_inputs.size()=" << source_used_inputs.size();
981  for (const auto used_input : source_used_inputs) {
982  const auto input_ra = used_input->getSourceNode();
983  const int table_id = table_id_from_ra(input_ra);
984  const auto col_id = used_input->getIndex();
985  auto it = input_to_nest_level.find(input_ra);
986  if (it != input_to_nest_level.end()) {
987  const int input_desc = it->second;
988  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
989  dynamic_cast<const RelScan*>(input_ra)
990  ? cat.getColumnIdBySpi(table_id, col_id + 1)
991  : col_id,
992  table_id,
993  input_desc));
994  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
995  throw std::runtime_error("Bushy joins not supported");
996  }
997  }
998 }
999 
1000 template <class RA>
1001 std::pair<std::vector<InputDescriptor>,
1002  std::list<std::shared_ptr<const InputColDescriptor>>>
1003 get_input_desc_impl(const RA* ra_node,
1004  const std::unordered_set<const RexInput*>& used_inputs,
1005  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1006  const std::vector<size_t>& input_permutation,
1008  std::vector<InputDescriptor> input_descs;
1009  const auto data_sink_node = get_data_sink(ra_node);
1010  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1011  const auto input_node_idx =
1012  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1013  auto input_ra = data_sink_node->getInput(input_node_idx);
1014  const int table_id = table_id_from_ra(input_ra);
1015  input_descs.emplace_back(table_id, input_idx);
1016  }
1017  std::sort(input_descs.begin(),
1018  input_descs.end(),
1019  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1020  return lhs.getNestLevel() < rhs.getNestLevel();
1021  });
1022  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1023  collect_used_input_desc(input_descs,
1024  cat,
1025  input_col_descs_unique, // modified
1026  ra_node,
1027  used_inputs,
1028  input_to_nest_level);
1029  std::unordered_set<const RexInput*> join_source_used_inputs;
1030  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1031  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1032  get_join_source_used_inputs(ra_node, cat);
1033  collect_used_input_desc(input_descs,
1034  cat,
1035  input_col_descs_unique, // modified
1036  ra_node,
1037  join_source_used_inputs,
1038  input_to_nest_level);
1039  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1040  input_col_descs_unique.begin(), input_col_descs_unique.end());
1041 
1042  std::sort(input_col_descs.begin(),
1043  input_col_descs.end(),
1044  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1045  std::shared_ptr<const InputColDescriptor> const& rhs) {
1046  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1047  lhs->getColId(),
1048  lhs->getScanDesc().getTableId()) <
1049  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1050  rhs->getColId(),
1051  rhs->getScanDesc().getTableId());
1052  });
1053  return {input_descs,
1054  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1055  input_col_descs.end())};
1056 }
1057 
1058 template <class RA>
1059 std::tuple<std::vector<InputDescriptor>,
1060  std::list<std::shared_ptr<const InputColDescriptor>>,
1061  std::vector<std::shared_ptr<RexInput>>>
1062 get_input_desc(const RA* ra_node,
1063  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1064  const std::vector<size_t>& input_permutation,
1066  std::unordered_set<const RexInput*> used_inputs;
1067  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1068  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1069  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1070  auto input_desc_pair = get_input_desc_impl(
1071  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1072  return std::make_tuple(
1073  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1074 }
1075 
1076 size_t get_scalar_sources_size(const RelCompound* compound) {
1077  return compound->getScalarSourcesSize();
1078 }
1079 
1080 size_t get_scalar_sources_size(const RelProject* project) {
1081  return project->size();
1082 }
1083 
1084 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1085  return table_func->getTableFuncInputsSize();
1086 }
1087 
1088 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1089  return compound->getScalarSource(i);
1090 }
1091 
1092 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1093  return project->getProjectAt(i);
1094 }
1095 
1096 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1097  return table_func->getTableFuncInputAt(i);
1098 }
1099 
1100 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1101  const std::shared_ptr<Analyzer::Expr> expr) {
1102  const auto& ti = expr->get_type_info();
1103  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1104  return expr;
1105  }
1106  auto transient_dict_ti = ti;
1107  transient_dict_ti.set_compression(kENCODING_DICT);
1108  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1109  transient_dict_ti.set_fixed_size();
1110  return expr->add_cast(transient_dict_ti);
1111 }
1112 
1114  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1115  const std::shared_ptr<Analyzer::Expr>& expr) {
1116  try {
1117  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1118  } catch (...) {
1119  scalar_sources.push_back(fold_expr(expr.get()));
1120  }
1121 }
1122 
1123 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1124  const std::shared_ptr<Analyzer::Expr>& input) {
1125  const auto& input_ti = input->get_type_info();
1126  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1127  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1128  }
1129  return input;
1130 }
1131 
1132 template <class RA>
1133 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1134  const RA* ra_node,
1135  const RelAlgTranslator& translator,
1136  const ::ExecutorType executor_type) {
1137  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1138  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1139  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1140  << ") = " << scalar_sources_size;
1141  for (size_t i = 0; i < scalar_sources_size; ++i) {
1142  const auto scalar_rex = scalar_at(i, ra_node);
1143  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1144  // RexRef are synthetic scalars we append at the end of the real ones
1145  // for the sake of taking memory ownership, no real work needed here.
1146  continue;
1147  }
1148 
1149  const auto scalar_expr =
1150  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1151  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1152  if (executor_type == ExecutorType::Native) {
1153  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1154  } else {
1155  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1156  }
1157  }
1158 
1159  return scalar_sources;
1160 }
1161 
1162 template <class RA>
1163 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1164  const RA* ra_node,
1165  const RelAlgTranslator& translator,
1166  int32_t tableId,
1168  const ColumnNameList& colNames,
1169  size_t starting_projection_column_idx) {
1170  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1171  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1172  const auto scalar_rex = scalar_at(i, ra_node);
1173  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1174  // RexRef are synthetic scalars we append at the end of the real ones
1175  // for the sake of taking memory ownership, no real work needed here.
1176  continue;
1177  }
1178 
1179  std::shared_ptr<Analyzer::Expr> translated_expr;
1180  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1181  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1182  tableId,
1183  cat,
1184  colNames[i - starting_projection_column_idx]);
1185  } else {
1186  translated_expr = translator.translateScalarRex(scalar_rex);
1187  }
1188  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1189  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1190  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1191  }
1192 
1193  return scalar_sources;
1194 }
1195 
1196 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1197  const RelCompound* compound,
1198  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1199  if (!compound->isAggregate()) {
1200  return {nullptr};
1201  }
1202  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1203  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1204  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1205  }
1206  return groupby_exprs;
1207 }
1208 
1209 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1210  const RelAggregate* aggregate,
1211  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1212  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1213  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1214  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1215  }
1216  return groupby_exprs;
1217 }
1218 
1220  const RelAlgTranslator& translator) {
1221  const auto filter_rex = compound->getFilterExpr();
1222  const auto filter_expr =
1223  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1224  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1226 }
1227 
1228 std::vector<Analyzer::Expr*> translate_targets(
1229  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1230  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1231  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1232  const RelCompound* compound,
1233  const RelAlgTranslator& translator,
1234  const ExecutorType executor_type) {
1235  std::vector<Analyzer::Expr*> target_exprs;
1236  for (size_t i = 0; i < compound->size(); ++i) {
1237  const auto target_rex = compound->getTargetExpr(i);
1238  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1239  std::shared_ptr<Analyzer::Expr> target_expr;
1240  if (target_rex_agg) {
1241  target_expr =
1242  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1243  } else {
1244  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1245  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1246  if (target_rex_ref) {
1247  const auto ref_idx = target_rex_ref->getIndex();
1248  CHECK_GE(ref_idx, size_t(1));
1249  CHECK_LE(ref_idx, groupby_exprs.size());
1250  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1251  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1252  } else {
1253  target_expr = translator.translateScalarRex(target_rex_scalar);
1254  auto rewritten_expr = rewrite_expr(target_expr.get());
1255  target_expr = fold_expr(rewritten_expr.get());
1256  if (executor_type == ExecutorType::Native) {
1257  try {
1258  target_expr = set_transient_dict(target_expr);
1259  } catch (...) {
1260  // noop
1261  }
1262  } else {
1263  target_expr = cast_dict_to_none(target_expr);
1264  }
1265  }
1266  }
1267  CHECK(target_expr);
1268  target_exprs_owned.push_back(target_expr);
1269  target_exprs.push_back(target_expr.get());
1270  }
1271  return target_exprs;
1272 }
1273 
1274 std::vector<Analyzer::Expr*> translate_targets(
1275  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1276  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1277  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1278  const RelAggregate* aggregate,
1279  const RelAlgTranslator& translator) {
1280  std::vector<Analyzer::Expr*> target_exprs;
1281  size_t group_key_idx = 1;
1282  for (const auto& groupby_expr : groupby_exprs) {
1283  auto target_expr =
1284  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1285  target_exprs_owned.push_back(target_expr);
1286  target_exprs.push_back(target_expr.get());
1287  }
1288 
1289  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1290  auto target_expr =
1291  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1292  CHECK(target_expr);
1293  target_expr = fold_expr(target_expr.get());
1294  target_exprs_owned.push_back(target_expr);
1295  target_exprs.push_back(target_expr.get());
1296  }
1297  return target_exprs;
1298 }
1299 
1301  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1302  return agg_expr && agg_expr->get_is_distinct();
1303 }
1304 
1305 bool is_agg(const Analyzer::Expr* expr) {
1306  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1307  if (agg_expr && agg_expr->get_contains_agg()) {
1308  auto agg_type = agg_expr->get_aggtype();
1309  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1310  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1311  return true;
1312  }
1313  }
1314  return false;
1315 }
1316 
1318  if (is_count_distinct(&expr)) {
1319  return SQLTypeInfo(kBIGINT, false);
1320  } else if (is_agg(&expr)) {
1322  }
1323  return get_logical_type_info(expr.get_type_info());
1324 }
1325 
1326 template <class RA>
1327 std::vector<TargetMetaInfo> get_targets_meta(
1328  const RA* ra_node,
1329  const std::vector<Analyzer::Expr*>& target_exprs) {
1330  std::vector<TargetMetaInfo> targets_meta;
1331  for (size_t i = 0; i < ra_node->size(); ++i) {
1332  CHECK(target_exprs[i]);
1333  // TODO(alex): remove the count distinct type fixup.
1334  targets_meta.emplace_back(ra_node->getFieldName(i),
1335  get_logical_type_for_expr(*target_exprs[i]),
1336  target_exprs[i]->get_type_info());
1337  }
1338  return targets_meta;
1339 }
1340 
1341 template <>
1342 std::vector<TargetMetaInfo> get_targets_meta(
1343  const RelFilter* filter,
1344  const std::vector<Analyzer::Expr*>& target_exprs) {
1345  RelAlgNode const* input0 = filter->getInput(0);
1346  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1347  return get_targets_meta(input, target_exprs);
1348  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1349  return get_targets_meta(input, target_exprs);
1350  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1351  return get_targets_meta(input, target_exprs);
1352  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1353  return get_targets_meta(input, target_exprs);
1354  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1355  return get_targets_meta(input, target_exprs);
1356  }
1357  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1358  return {};
1359 }
1360 
1361 } // namespace
1362 
1364  const CompilationOptions& co_in,
1365  const ExecutionOptions& eo_in,
1366  const int64_t queue_time_ms) {
1367  CHECK(node);
1368  auto timer = DEBUG_TIMER(__func__);
1369 
1371 
1372  auto co = co_in;
1373  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1374  // encoded string updates
1375 
1376  auto execute_update_for_node =
1377  [this, &co, &eo_in](const auto node, auto& work_unit, const bool is_aggregate) {
1378  UpdateTransactionParameters update_params(node->getModifiedTableDescriptor(),
1379  node->getTargetColumns(),
1380  node->getOutputMetainfo(),
1381  node->isVarlenUpdateRequired());
1382 
1383  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1384 
1385  auto execute_update_ra_exe_unit =
1386  [this, &co, &eo_in, &table_infos, &update_params](
1387  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1389 
1390  auto eo = eo_in;
1391  if (update_params.tableIsTemporary()) {
1392  eo.output_columnar_hint = true;
1393  co_project.allow_lazy_fetch = false;
1394  co_project.filter_on_deleted_column =
1395  false; // project the entire delete column for columnar update
1396  }
1397 
1398  auto update_callback = yieldUpdateCallback(update_params);
1399  executor_->executeUpdate(ra_exe_unit,
1400  table_infos,
1401  co_project,
1402  eo,
1403  cat_,
1404  executor_->row_set_mem_owner_,
1405  update_callback,
1406  is_aggregate);
1407  update_params.finalizeTransaction();
1408  };
1409 
1410  if (update_params.tableIsTemporary()) {
1411  // hold owned target exprs during execution if rewriting
1412  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1413  // rewrite temp table updates to generate the full column by moving the where
1414  // clause into a case if such a rewrite is not possible, bail on the update
1415  // operation build an expr for the update target
1416  const auto td = update_params.getTableDescriptor();
1417  CHECK(td);
1418  const auto update_column_names = update_params.getUpdateColumnNames();
1419  if (update_column_names.size() > 1) {
1420  throw std::runtime_error(
1421  "Multi-column update is not yet supported for temporary tables.");
1422  }
1423 
1424  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1425  CHECK(cd);
1426  auto projected_column_to_update =
1427  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1428  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1429  work_unit.exe_unit, projected_column_to_update);
1430  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1431  throw std::runtime_error(
1432  "Variable length updates not yet supported on temporary tables.");
1433  }
1434  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1435  } else {
1436  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1437  }
1438  };
1439 
1440  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1441  auto work_unit =
1442  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1443 
1444  execute_update_for_node(compound, work_unit, compound->isAggregate());
1445  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1446  auto work_unit =
1447  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1448 
1449  if (project->isSimple()) {
1450  CHECK_EQ(size_t(1), project->inputCount());
1451  const auto input_ra = project->getInput(0);
1452  if (dynamic_cast<const RelSort*>(input_ra)) {
1453  const auto& input_table =
1454  get_temporary_table(&temporary_tables_, -input_ra->getId());
1455  CHECK(input_table);
1456  work_unit.exe_unit.scan_limit = input_table->rowCount();
1457  }
1458  }
1459 
1460  execute_update_for_node(project, work_unit, false);
1461  } else {
1462  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1463  }
1464 }
1465 
1467  const CompilationOptions& co,
1468  const ExecutionOptions& eo_in,
1469  const int64_t queue_time_ms) {
1470  CHECK(node);
1471  auto timer = DEBUG_TIMER(__func__);
1472 
1474 
1475  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1476  auto& work_unit,
1477  const bool is_aggregate) {
1478  auto* table_descriptor = node->getModifiedTableDescriptor();
1479  CHECK(table_descriptor);
1480  if (!table_descriptor->hasDeletedCol) {
1481  throw std::runtime_error(
1482  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1483  }
1484 
1485  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1486 
1487  auto execute_delete_ra_exe_unit =
1488  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1489  const bool is_aggregate) {
1490  DeleteTransactionParameters delete_params(table_is_temporary(table_descriptor));
1491  auto delete_callback = yieldDeleteCallback(delete_params);
1493 
1494  auto eo = eo_in;
1495  if (delete_params.tableIsTemporary()) {
1496  eo.output_columnar_hint = true;
1497  co_delete.filter_on_deleted_column =
1498  false; // project the entire delete column for columnar update
1499  } else {
1500  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1501  }
1502 
1503  executor_->executeUpdate(exe_unit,
1504  table_infos,
1505  co_delete,
1506  eo,
1507  cat_,
1508  executor_->row_set_mem_owner_,
1509  delete_callback,
1510  is_aggregate);
1511  delete_params.finalizeTransaction();
1512  };
1513 
1514  if (table_is_temporary(table_descriptor)) {
1515  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1516  auto cd = cat_.getDeletedColumn(table_descriptor);
1517  CHECK(cd);
1518  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1519  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1520  const auto rewritten_exe_unit =
1521  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1522  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1523  } else {
1524  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1525  }
1526  };
1527 
1528  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1529  const auto work_unit =
1530  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1531  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1532  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1533  auto work_unit =
1534  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1535  if (project->isSimple()) {
1536  CHECK_EQ(size_t(1), project->inputCount());
1537  const auto input_ra = project->getInput(0);
1538  if (dynamic_cast<const RelSort*>(input_ra)) {
1539  const auto& input_table =
1540  get_temporary_table(&temporary_tables_, -input_ra->getId());
1541  CHECK(input_table);
1542  work_unit.exe_unit.scan_limit = input_table->rowCount();
1543  }
1544  }
1545  execute_delete_for_node(project, work_unit, false);
1546  } else {
1547  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1548  }
1549 }
1550 
1552  const CompilationOptions& co,
1553  const ExecutionOptions& eo,
1554  RenderInfo* render_info,
1555  const int64_t queue_time_ms) {
1556  auto timer = DEBUG_TIMER(__func__);
1557  const auto work_unit =
1558  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1559  CompilationOptions co_compound = co;
1560  return executeWorkUnit(work_unit,
1561  compound->getOutputMetainfo(),
1562  compound->isAggregate(),
1563  co_compound,
1564  eo,
1565  render_info,
1566  queue_time_ms);
1567 }
1568 
1570  const CompilationOptions& co,
1571  const ExecutionOptions& eo,
1572  RenderInfo* render_info,
1573  const int64_t queue_time_ms) {
1574  auto timer = DEBUG_TIMER(__func__);
1575  const auto work_unit = createAggregateWorkUnit(
1576  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1577  return executeWorkUnit(work_unit,
1578  aggregate->getOutputMetainfo(),
1579  true,
1580  co,
1581  eo,
1582  render_info,
1583  queue_time_ms);
1584 }
1585 
1586 namespace {
1587 
1588 // Returns true iff the execution unit contains window functions.
1590  return std::any_of(ra_exe_unit.target_exprs.begin(),
1591  ra_exe_unit.target_exprs.end(),
1592  [](const Analyzer::Expr* expr) {
1593  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1594  });
1595 }
1596 
1597 } // namespace
1598 
1600  const CompilationOptions& co,
1601  const ExecutionOptions& eo,
1602  RenderInfo* render_info,
1603  const int64_t queue_time_ms,
1604  const ssize_t previous_count) {
1605  auto timer = DEBUG_TIMER(__func__);
1606  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1607  CompilationOptions co_project = co;
1608  if (project->isSimple()) {
1609  CHECK_EQ(size_t(1), project->inputCount());
1610  const auto input_ra = project->getInput(0);
1611  if (dynamic_cast<const RelSort*>(input_ra)) {
1612  co_project.device_type = ExecutorDeviceType::CPU;
1613  const auto& input_table =
1614  get_temporary_table(&temporary_tables_, -input_ra->getId());
1615  CHECK(input_table);
1616  work_unit.exe_unit.scan_limit =
1617  std::min(input_table->getLimit(), input_table->rowCount());
1618  }
1619  }
1620  return executeWorkUnit(work_unit,
1621  project->getOutputMetainfo(),
1622  false,
1623  co_project,
1624  eo,
1625  render_info,
1626  queue_time_ms,
1627  previous_count);
1628 }
1629 
1631  const CompilationOptions& co_in,
1632  const ExecutionOptions& eo,
1633  const int64_t queue_time_ms) {
1634  INJECT_TIMER(executeTableFunction);
1635  auto timer = DEBUG_TIMER(__func__);
1636 
1637  auto co = co_in;
1638 
1639  if (g_cluster) {
1640  throw std::runtime_error("Table functions not supported in distributed mode yet");
1641  }
1642  if (!g_enable_table_functions) {
1643  throw std::runtime_error("Table function support is disabled");
1644  }
1645 
1646  auto table_func_work_unit = createTableFunctionWorkUnit(table_func, eo.just_explain);
1647  const auto body = table_func_work_unit.body;
1648  CHECK(body);
1649 
1650  const auto table_infos =
1651  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1652 
1653  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1654  co.device_type,
1656  nullptr,
1657  executor_),
1658  {}};
1659 
1660  try {
1661  result = {executor_->executeTableFunction(
1662  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1663  body->getOutputMetainfo()};
1664  } catch (const QueryExecutionError& e) {
1665  handlePersistentError(e.getErrorCode());
1667  throw std::runtime_error("Table function ran out of memory during execution");
1668  }
1669  result.setQueueTime(queue_time_ms);
1670  return result;
1671 }
1672 
1673 namespace {
1674 
1675 // Creates a new expression which has the range table index set to 1. This is needed to
1676 // reuse the hash join construction helpers to generate a hash table for the window
1677 // function partition: create an equals expression with left and right sides identical
1678 // except for the range table index.
1679 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1680  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1681  if (tuple) {
1682  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1683  for (const auto& element : tuple->getTuple()) {
1684  transformed_tuple.push_back(transform_to_inner(element.get()));
1685  }
1686  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1687  }
1688  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1689  if (!col) {
1690  throw std::runtime_error("Only columns supported in the window partition for now");
1691  }
1692  return makeExpr<Analyzer::ColumnVar>(
1693  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1694 }
1695 
1696 } // namespace
1697 
1699  const CompilationOptions& co,
1700  const ExecutionOptions& eo,
1701  ColumnCacheMap& column_cache_map,
1702  const int64_t queue_time_ms) {
1703  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1704  CHECK_EQ(query_infos.size(), size_t(1));
1705  if (query_infos.front().info.fragments.size() != 1) {
1706  throw std::runtime_error(
1707  "Only single fragment tables supported for window functions for now");
1708  }
1709  if (eo.executor_type == ::ExecutorType::Extern) {
1710  return;
1711  }
1712  query_infos.push_back(query_infos.front());
1713  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1714  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1715  ++target_index) {
1716  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1717  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1718  if (!window_func) {
1719  continue;
1720  }
1721  // Always use baseline layout hash tables for now, make the expression a tuple.
1722  const auto& partition_keys = window_func->getPartitionKeys();
1723  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1724  if (partition_keys.size() > 1) {
1725  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1726  } else {
1727  if (partition_keys.empty()) {
1728  throw std::runtime_error(
1729  "Empty window function partitions are not supported yet");
1730  }
1731  CHECK_EQ(partition_keys.size(), size_t(1));
1732  partition_key_tuple = partition_keys.front();
1733  }
1734  // Creates a tautology equality with the partition expression on both sides.
1735  const auto partition_key_cond =
1736  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1737  kBW_EQ,
1738  kONE,
1739  partition_key_tuple,
1740  transform_to_inner(partition_key_tuple.get()));
1741  auto context = createWindowFunctionContext(window_func,
1742  partition_key_cond,
1743  ra_exe_unit,
1744  query_infos,
1745  co,
1746  column_cache_map,
1747  executor_->getRowSetMemoryOwner());
1748  context->compute();
1749  window_project_node_context->addWindowFunctionContext(std::move(context),
1750  target_index);
1751  }
1752 }
1753 
1754 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1755  const Analyzer::WindowFunction* window_func,
1756  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1757  const RelAlgExecutionUnit& ra_exe_unit,
1758  const std::vector<InputTableInfo>& query_infos,
1759  const CompilationOptions& co,
1760  ColumnCacheMap& column_cache_map,
1761  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1762  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1765  const auto join_table_or_err =
1766  executor_->buildHashTableForQualifier(partition_key_cond,
1767  query_infos,
1768  memory_level,
1770  column_cache_map);
1771  if (!join_table_or_err.fail_reason.empty()) {
1772  throw std::runtime_error(join_table_or_err.fail_reason);
1773  }
1774  CHECK(join_table_or_err.hash_table->getHashType() ==
1776  const auto& order_keys = window_func->getOrderKeys();
1777  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1778  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1779  auto context = std::make_unique<WindowFunctionContext>(window_func,
1780  join_table_or_err.hash_table,
1781  elem_count,
1782  co.device_type,
1783  row_set_mem_owner);
1784  for (const auto& order_key : order_keys) {
1785  const auto order_col =
1786  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1787  if (!order_col) {
1788  throw std::runtime_error("Only order by columns supported for now");
1789  }
1790  const int8_t* column;
1791  size_t join_col_elem_count;
1792  std::tie(column, join_col_elem_count) =
1794  *order_col,
1795  query_infos.front().info.fragments.front(),
1796  memory_level,
1797  0,
1798  nullptr,
1799  chunks_owner,
1800  column_cache_map);
1801  CHECK_EQ(join_col_elem_count, elem_count);
1802  context->addOrderColumn(column, order_col.get(), chunks_owner);
1803  }
1804  return context;
1805 }
1806 
1808  const CompilationOptions& co,
1809  const ExecutionOptions& eo,
1810  RenderInfo* render_info,
1811  const int64_t queue_time_ms) {
1812  auto timer = DEBUG_TIMER(__func__);
1813  const auto work_unit =
1814  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1815  return executeWorkUnit(
1816  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1817 }
1818 
1819 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
1820  std::vector<TargetMetaInfo> const& rhs) {
1821  if (lhs.size() == rhs.size()) {
1822  for (size_t i = 0; i < lhs.size(); ++i) {
1823  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
1824  return false;
1825  }
1826  }
1827  return true;
1828  }
1829  return false;
1830 }
1831 
1832 bool isGeometry(TargetMetaInfo const& target_meta_info) {
1833  return target_meta_info.get_type_info().is_geometry();
1834 }
1835 
1837  const RaExecutionSequence& seq,
1838  const CompilationOptions& co,
1839  const ExecutionOptions& eo,
1840  RenderInfo* render_info,
1841  const int64_t queue_time_ms) {
1842  auto timer = DEBUG_TIMER(__func__);
1843  if (!logical_union->isAll()) {
1844  throw std::runtime_error("UNION without ALL is not supported yet.");
1845  }
1846  // Will throw a std::runtime_error if types don't match.
1847  logical_union->checkForMatchingMetaInfoTypes();
1848  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
1849  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
1850  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
1851  }
1852  // Only Projections and Aggregates from a UNION are supported for now.
1853  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
1854  if (node->hasInput(logical_union) &&
1855  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
1856  node)) {
1857  throw std::runtime_error("UNION ALL not yet supported in this context.");
1858  }
1859  });
1860 
1861  auto work_unit =
1862  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
1863  return executeWorkUnit(work_unit,
1864  logical_union->getOutputMetainfo(),
1865  false,
1867  eo,
1868  render_info,
1869  queue_time_ms);
1870 }
1871 
1873  const RelLogicalValues* logical_values,
1874  const ExecutionOptions& eo) {
1875  auto timer = DEBUG_TIMER(__func__);
1876  if (eo.just_explain) {
1877  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1878  }
1879 
1880  QueryMemoryDescriptor query_mem_desc(executor_,
1881  logical_values->getNumRows(),
1883  /*is_table_function=*/false);
1884 
1885  auto tuple_type = logical_values->getTupleType();
1886  for (size_t i = 0; i < tuple_type.size(); ++i) {
1887  auto& target_meta_info = tuple_type[i];
1888  if (target_meta_info.get_type_info().is_varlen()) {
1889  throw std::runtime_error("Variable length types not supported in VALUES yet.");
1890  }
1891  if (target_meta_info.get_type_info().get_type() == kNULLT) {
1892  // replace w/ bigint
1893  tuple_type[i] =
1894  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
1895  }
1896  query_mem_desc.addColSlotInfo(
1897  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
1898  }
1899  logical_values->setOutputMetainfo(tuple_type);
1900 
1901  std::vector<TargetInfo> target_infos;
1902  for (const auto& tuple_type_component : tuple_type) {
1903  target_infos.emplace_back(TargetInfo{false,
1904  kCOUNT,
1905  tuple_type_component.get_type_info(),
1906  SQLTypeInfo(kNULLT, false),
1907  false,
1908  false});
1909  }
1910  auto rs = std::make_shared<ResultSet>(target_infos,
1912  query_mem_desc,
1913  executor_->getRowSetMemoryOwner(),
1914  executor_);
1915 
1916  if (logical_values->hasRows()) {
1917  CHECK_EQ(logical_values->getRowsSize(), logical_values->size());
1918 
1919  auto storage = rs->allocateStorage();
1920  auto buff = storage->getUnderlyingBuffer();
1921 
1922  for (size_t i = 0; i < logical_values->getNumRows(); i++) {
1923  std::vector<std::shared_ptr<Analyzer::Expr>> row_literals;
1924  int8_t* ptr = buff + i * query_mem_desc.getRowSize();
1925 
1926  for (size_t j = 0; j < logical_values->getRowsSize(); j++) {
1927  auto rex_literal =
1928  dynamic_cast<const RexLiteral*>(logical_values->getValueAt(i, j));
1929  CHECK(rex_literal);
1930  const auto expr = RelAlgTranslator::translateLiteral(rex_literal);
1931  const auto constant = std::dynamic_pointer_cast<Analyzer::Constant>(expr);
1932  CHECK(constant);
1933 
1934  if (constant->get_is_null()) {
1935  CHECK(!target_infos[j].sql_type.is_varlen());
1936  *reinterpret_cast<int64_t*>(ptr) =
1937  inline_int_null_val(target_infos[j].sql_type);
1938  } else {
1939  const auto ti = constant->get_type_info();
1940  const auto datum = constant->get_constval();
1941 
1942  // Initialize the entire 8-byte slot
1943  *reinterpret_cast<int64_t*>(ptr) = EMPTY_KEY_64;
1944 
1945  const auto sz = ti.get_size();
1946  CHECK_GE(sz, int(0));
1947  std::memcpy(ptr, &datum, sz);
1948  }
1949  ptr += 8;
1950  }
1951  }
1952  }
1953  return {rs, tuple_type};
1954 }
1955 
1956 namespace {
1957 
1958 template <class T>
1959 int64_t insert_one_dict_str(T* col_data,
1960  const std::string& columnName,
1961  const SQLTypeInfo& columnType,
1962  const Analyzer::Constant* col_cv,
1963  const Catalog_Namespace::Catalog& catalog) {
1964  if (col_cv->get_is_null()) {
1965  *col_data = inline_fixed_encoding_null_val(columnType);
1966  } else {
1967  const int dict_id = columnType.get_comp_param();
1968  const auto col_datum = col_cv->get_constval();
1969  const auto& str = *col_datum.stringval;
1970  const auto dd = catalog.getMetadataForDict(dict_id);
1971  CHECK(dd && dd->stringDict);
1972  int32_t str_id = dd->stringDict->getOrAdd(str);
1973  if (!dd->dictIsTemp) {
1974  const auto checkpoint_ok = dd->stringDict->checkpoint();
1975  if (!checkpoint_ok) {
1976  throw std::runtime_error("Failed to checkpoint dictionary for column " +
1977  columnName);
1978  }
1979  }
1980  const bool invalid = str_id > max_valid_int_value<T>();
1981  if (invalid || str_id == inline_int_null_value<int32_t>()) {
1982  if (invalid) {
1983  LOG(ERROR) << "Could not encode string: " << str
1984  << ", the encoded value doesn't fit in " << sizeof(T) * 8
1985  << " bits. Will store NULL instead.";
1986  }
1987  str_id = inline_fixed_encoding_null_val(columnType);
1988  }
1989  *col_data = str_id;
1990  }
1991  return *col_data;
1992 }
1993 
1994 template <class T>
1995 int64_t insert_one_dict_str(T* col_data,
1996  const ColumnDescriptor* cd,
1997  const Analyzer::Constant* col_cv,
1998  const Catalog_Namespace::Catalog& catalog) {
1999  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2000 }
2001 
2002 } // namespace
2003 
2005  const ExecutionOptions& eo) {
2006  auto timer = DEBUG_TIMER(__func__);
2007  if (eo.just_explain) {
2008  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2009  }
2010 
2011  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2014  executor_->getRowSetMemoryOwner(),
2015  executor_);
2016 
2017  std::vector<TargetMetaInfo> empty_targets;
2018  return {rs, empty_targets};
2019 }
2020 
2022  // Note: We currently obtain an executor for this method, but we do not need it.
2023  // Therefore, we skip the executor state setup in the regular execution path. In the
2024  // future, we will likely want to use the executor to evaluate expressions in the insert
2025  // statement.
2026 
2027  const auto& targets = query.get_targetlist();
2028  const int table_id = query.get_result_table_id();
2029  const auto& col_id_list = query.get_result_col_list();
2030 
2031  std::vector<const ColumnDescriptor*> col_descriptors;
2032  std::vector<int> col_ids;
2033  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2034  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2035  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2036 
2037  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2038  CHECK(table_descriptor);
2039  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
2040  const TableDescriptor* shard{nullptr};
2041 
2042  for (const int col_id : col_id_list) {
2043  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2044  const auto col_enc = cd->columnType.get_compression();
2045  if (cd->columnType.is_string()) {
2046  switch (col_enc) {
2047  case kENCODING_NONE: {
2048  auto it_ok =
2049  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2050  CHECK(it_ok.second);
2051  break;
2052  }
2053  case kENCODING_DICT: {
2054  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2055  CHECK(dd);
2056  const auto it_ok = col_buffers.emplace(
2057  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2058  CHECK(it_ok.second);
2059  break;
2060  }
2061  default:
2062  CHECK(false);
2063  }
2064  } else if (cd->columnType.is_geometry()) {
2065  auto it_ok =
2066  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2067  CHECK(it_ok.second);
2068  } else if (cd->columnType.is_array()) {
2069  auto it_ok =
2070  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2071  CHECK(it_ok.second);
2072  } else {
2073  const auto it_ok = col_buffers.emplace(
2074  col_id,
2075  std::unique_ptr<uint8_t[]>(
2076  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2077  // the buffer
2078  CHECK(it_ok.second);
2079  }
2080  col_descriptors.push_back(cd);
2081  col_ids.push_back(col_id);
2082  }
2083  size_t col_idx = 0;
2085  insert_data.databaseId = cat_.getCurrentDB().dbId;
2086  insert_data.tableId = table_id;
2087  int64_t int_col_val{0};
2088  for (auto target_entry : targets) {
2089  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2090  if (!col_cv) {
2091  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2092  CHECK(col_cast);
2093  CHECK_EQ(kCAST, col_cast->get_optype());
2094  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2095  }
2096  CHECK(col_cv);
2097  const auto cd = col_descriptors[col_idx];
2098  auto col_datum = col_cv->get_constval();
2099  auto col_type = cd->columnType.get_type();
2100  uint8_t* col_data_bytes{nullptr};
2101  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2102  (!cd->columnType.is_string() ||
2103  cd->columnType.get_compression() == kENCODING_DICT)) {
2104  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2105  CHECK(col_data_bytes_it != col_buffers.end());
2106  col_data_bytes = col_data_bytes_it->second.get();
2107  }
2108  switch (col_type) {
2109  case kBOOLEAN: {
2110  auto col_data = col_data_bytes;
2111  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2112  : (col_datum.boolval ? 1 : 0);
2113  break;
2114  }
2115  case kTINYINT: {
2116  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2117  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2118  : col_datum.tinyintval;
2119  int_col_val = col_datum.tinyintval;
2120  break;
2121  }
2122  case kSMALLINT: {
2123  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2124  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2125  : col_datum.smallintval;
2126  int_col_val = col_datum.smallintval;
2127  break;
2128  }
2129  case kINT: {
2130  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2131  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2132  : col_datum.intval;
2133  int_col_val = col_datum.intval;
2134  break;
2135  }
2136  case kBIGINT:
2137  case kDECIMAL:
2138  case kNUMERIC: {
2139  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2140  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2141  : col_datum.bigintval;
2142  int_col_val = col_datum.bigintval;
2143  break;
2144  }
2145  case kFLOAT: {
2146  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2147  *col_data = col_datum.floatval;
2148  break;
2149  }
2150  case kDOUBLE: {
2151  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2152  *col_data = col_datum.doubleval;
2153  break;
2154  }
2155  case kTEXT:
2156  case kVARCHAR:
2157  case kCHAR: {
2158  switch (cd->columnType.get_compression()) {
2159  case kENCODING_NONE:
2160  str_col_buffers[col_ids[col_idx]].push_back(
2161  col_datum.stringval ? *col_datum.stringval : "");
2162  break;
2163  case kENCODING_DICT: {
2164  switch (cd->columnType.get_size()) {
2165  case 1:
2166  int_col_val = insert_one_dict_str(
2167  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2168  break;
2169  case 2:
2170  int_col_val = insert_one_dict_str(
2171  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2172  break;
2173  case 4:
2174  int_col_val = insert_one_dict_str(
2175  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2176  break;
2177  default:
2178  CHECK(false);
2179  }
2180  break;
2181  }
2182  default:
2183  CHECK(false);
2184  }
2185  break;
2186  }
2187  case kTIME:
2188  case kTIMESTAMP:
2189  case kDATE: {
2190  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2191  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2192  : col_datum.bigintval;
2193  break;
2194  }
2195  case kARRAY: {
2196  const auto is_null = col_cv->get_is_null();
2197  const auto size = cd->columnType.get_size();
2198  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2199  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2200  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2201  if (is_null && !is_point_coords) {
2202  if (size > 0) {
2203  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2204  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2205  throw std::runtime_error("Column " + cd->columnName +
2206  " doesn't accept NULL values");
2207  }
2208  int8_t* buf = (int8_t*)checked_malloc(size);
2209  put_null_array(static_cast<void*>(buf), elem_ti, "");
2210  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2211  p += elem_ti.get_size()) {
2212  put_null(static_cast<void*>(p), elem_ti, "");
2213  }
2214  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2215  } else {
2216  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2217  }
2218  break;
2219  }
2220  const auto l = col_cv->get_value_list();
2221  size_t len = l.size() * elem_ti.get_size();
2222  if (size > 0 && static_cast<size_t>(size) != len) {
2223  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2224  std::to_string(size / elem_ti.get_size()) +
2225  " values, " + "received " + std::to_string(l.size()));
2226  }
2227  if (elem_ti.is_string()) {
2228  CHECK(kENCODING_DICT == elem_ti.get_compression());
2229  CHECK(4 == elem_ti.get_size());
2230 
2231  int8_t* buf = (int8_t*)checked_malloc(len);
2232  int32_t* p = reinterpret_cast<int32_t*>(buf);
2233 
2234  int elemIndex = 0;
2235  for (auto& e : l) {
2236  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2237  CHECK(c);
2238 
2239  int_col_val = insert_one_dict_str(
2240  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2241 
2242  elemIndex++;
2243  }
2244  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2245 
2246  } else {
2247  int8_t* buf = (int8_t*)checked_malloc(len);
2248  int8_t* p = buf;
2249  for (auto& e : l) {
2250  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2251  CHECK(c);
2252  p = appendDatum(p, c->get_constval(), elem_ti);
2253  }
2254  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2255  }
2256  break;
2257  }
2258  case kPOINT:
2259  case kLINESTRING:
2260  case kPOLYGON:
2261  case kMULTIPOLYGON:
2262  str_col_buffers[col_ids[col_idx]].push_back(
2263  col_datum.stringval ? *col_datum.stringval : "");
2264  break;
2265  default:
2266  CHECK(false);
2267  }
2268  ++col_idx;
2269  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2270  const auto shard_count = shard_tables.size();
2271  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2272  shard = shard_tables[shard_idx];
2273  }
2274  }
2275  for (const auto& kv : col_buffers) {
2276  insert_data.columnIds.push_back(kv.first);
2277  DataBlockPtr p;
2278  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2279  insert_data.data.push_back(p);
2280  }
2281  for (auto& kv : str_col_buffers) {
2282  insert_data.columnIds.push_back(kv.first);
2283  DataBlockPtr p;
2284  p.stringsPtr = &kv.second;
2285  insert_data.data.push_back(p);
2286  }
2287  for (auto& kv : arr_col_buffers) {
2288  insert_data.columnIds.push_back(kv.first);
2289  DataBlockPtr p;
2290  p.arraysPtr = &kv.second;
2291  insert_data.data.push_back(p);
2292  }
2293  insert_data.numRows = 1;
2294  if (shard) {
2295  shard->fragmenter->insertData(insert_data);
2296  } else {
2297  table_descriptor->fragmenter->insertData(insert_data);
2298  }
2299 
2300  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2303  executor_->getRowSetMemoryOwner(),
2304  executor_);
2305  std::vector<TargetMetaInfo> empty_targets;
2306  return {rs, empty_targets};
2307 }
2308 
2309 namespace {
2310 
2311 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2312 // the executor interface to use the collation directly and remove this conversion.
2313 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2314  std::list<Analyzer::OrderEntry> result;
2315  for (size_t i = 0; i < sort->collationCount(); ++i) {
2316  const auto sort_field = sort->getCollation(i);
2317  result.emplace_back(sort_field.getField() + 1,
2318  sort_field.getSortDir() == SortDirection::Descending,
2319  sort_field.getNullsPosition() == NullSortedPosition::First);
2320  }
2321  return result;
2322 }
2323 
2324 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2325  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2326  if (aggregate) {
2327  return 0;
2328  }
2329  const auto compound = dynamic_cast<const RelCompound*>(ra);
2330  return (compound && compound->isAggregate()) ? 0 : limit;
2331 }
2332 
2333 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2334  return !order_entries.empty() && order_entries.front().is_desc;
2335 }
2336 
2337 } // namespace
2338 
2340  const CompilationOptions& co,
2341  const ExecutionOptions& eo,
2342  RenderInfo* render_info,
2343  const int64_t queue_time_ms) {
2344  auto timer = DEBUG_TIMER(__func__);
2346  const auto source = sort->getInput(0);
2347  const bool is_aggregate = node_is_aggregate(source);
2348  auto it = leaf_results_.find(sort->getId());
2349  if (it != leaf_results_.end()) {
2350  // Add any transient string literals to the sdp on the agg
2351  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2353  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2354  // Handle push-down for LIMIT for multi-node
2355  auto& aggregated_result = it->second;
2356  auto& result_rows = aggregated_result.rs;
2357  const size_t limit = sort->getLimit();
2358  const size_t offset = sort->getOffset();
2359  const auto order_entries = get_order_entries(sort);
2360  if (limit || offset) {
2361  if (!order_entries.empty()) {
2362  result_rows->sort(order_entries, limit + offset);
2363  }
2364  result_rows->dropFirstN(offset);
2365  if (limit) {
2366  result_rows->keepFirstN(limit);
2367  }
2368  }
2369  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2370  sort->setOutputMetainfo(aggregated_result.targets_meta);
2371  return result;
2372  }
2373 
2374  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2375  bool is_desc{false};
2376 
2377  auto execute_sort_query = [this,
2378  sort,
2379  &source,
2380  &is_aggregate,
2381  &eo,
2382  &co,
2383  render_info,
2384  queue_time_ms,
2385  &groupby_exprs,
2386  &is_desc]() -> ExecutionResult {
2387  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2388  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2389  ExecutionOptions eo_copy = {
2391  eo.allow_multifrag,
2392  eo.just_explain,
2393  eo.allow_loop_joins,
2394  eo.with_watchdog,
2395  eo.jit_debug,
2396  eo.just_validate || sort->isEmptyResult(),
2397  eo.with_dynamic_watchdog,
2398  eo.dynamic_watchdog_time_limit,
2399  eo.find_push_down_candidates,
2400  eo.just_calcite_explain,
2401  eo.gpu_input_mem_limit_percent,
2402  eo.allow_runtime_query_interrupt,
2403  eo.runtime_query_interrupt_frequency,
2404  eo.executor_type,
2405  };
2406 
2407  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2408  auto source_result = executeWorkUnit(source_work_unit,
2409  source->getOutputMetainfo(),
2410  is_aggregate,
2411  co,
2412  eo_copy,
2413  render_info,
2414  queue_time_ms);
2415  if (render_info && render_info->isPotentialInSituRender()) {
2416  return source_result;
2417  }
2418  if (source_result.isFilterPushDownEnabled()) {
2419  return source_result;
2420  }
2421  auto rows_to_sort = source_result.getRows();
2422  if (eo.just_explain) {
2423  return {rows_to_sort, {}};
2424  }
2425  const size_t limit = sort->getLimit();
2426  const size_t offset = sort->getOffset();
2427  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2428  !use_speculative_top_n(source_work_unit.exe_unit,
2429  rows_to_sort->getQueryMemDesc())) {
2430  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
2431  limit + offset);
2432  }
2433  if (limit || offset) {
2434  if (g_cluster && sort->collationCount() == 0) {
2435  if (offset >= rows_to_sort->rowCount()) {
2436  rows_to_sort->dropFirstN(offset);
2437  } else {
2438  rows_to_sort->keepFirstN(limit + offset);
2439  }
2440  } else {
2441  rows_to_sort->dropFirstN(offset);
2442  if (limit) {
2443  rows_to_sort->keepFirstN(limit);
2444  }
2445  }
2446  }
2447  return {rows_to_sort, source_result.getTargetsMeta()};
2448  };
2449 
2450  try {
2451  return execute_sort_query();
2452  } catch (const SpeculativeTopNFailed& e) {
2453  CHECK_EQ(size_t(1), groupby_exprs.size());
2454  CHECK(groupby_exprs.front());
2455  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2456  return execute_sort_query();
2457  }
2458 }
2459 
2461  const RelSort* sort,
2462  const ExecutionOptions& eo) {
2463  const auto source = sort->getInput(0);
2464  const size_t limit = sort->getLimit();
2465  const size_t offset = sort->getOffset();
2466  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2467  const size_t scan_total_limit =
2468  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2469  size_t max_groups_buffer_entry_guess{
2470  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2472  const auto order_entries = get_order_entries(sort);
2473  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2474  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2475  const auto& source_exe_unit = source_work_unit.exe_unit;
2476 
2477  // we do not allow sorting geometry or array types
2478  for (auto order_entry : order_entries) {
2479  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2480  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2481  const auto& ti = get_target_info(te, false);
2482  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2483  throw std::runtime_error(
2484  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2485  }
2486  }
2487 
2488  if (source_exe_unit.groupby_exprs.size() == 1) {
2489  if (!source_exe_unit.groupby_exprs.front()) {
2490  sort_algorithm = SortAlgorithm::StreamingTopN;
2491  } else {
2492  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2493  first_oe_is_desc(order_entries))) {
2494  sort_algorithm = SortAlgorithm::Default;
2495  }
2496  }
2497  }
2498 
2499  sort->setOutputMetainfo(source->getOutputMetainfo());
2500  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2501  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2502  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2503  std::move(source_exe_unit.input_col_descs),
2504  source_exe_unit.simple_quals,
2505  source_exe_unit.quals,
2506  source_exe_unit.join_quals,
2507  source_exe_unit.groupby_exprs,
2508  source_exe_unit.target_exprs,
2509  nullptr,
2510  {sort_info.order_entries, sort_algorithm, limit, offset},
2511  scan_total_limit,
2512  source_exe_unit.use_bump_allocator,
2513  source_exe_unit.union_all,
2514  source_exe_unit.query_state},
2515  source,
2516  max_groups_buffer_entry_guess,
2517  std::move(source_work_unit.query_rewriter),
2518  source_work_unit.input_permutation,
2519  source_work_unit.left_deep_join_input_sizes};
2520 }
2521 
2522 namespace {
2523 
2530 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2531  CHECK(!table_infos.empty());
2532  const auto& first_table = table_infos.front();
2533  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2534  for (const auto& table_info : table_infos) {
2535  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2536  max_num_groups = table_info.info.getNumTuplesUpperBound();
2537  }
2538  }
2539  return std::max(max_num_groups, size_t(1));
2540 }
2541 
2549  for (const auto target_expr : ra_exe_unit.target_exprs) {
2550  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2551  return false;
2552  }
2553  }
2554  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2555  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2556  return true;
2557  }
2558  return false;
2559 }
2560 
2561 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2562  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2563  ra_exe_unit.simple_quals.empty());
2564 }
2565 
2567  const RelAlgExecutionUnit& ra_exe_unit_in,
2568  const std::vector<InputTableInfo>& table_infos,
2569  const Executor* executor,
2570  const ExecutorDeviceType device_type_in,
2571  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2572  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2573  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2574  const auto target_expr = ra_exe_unit.target_exprs[i];
2575  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2576  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2577  continue;
2578  }
2579  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2580  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2581  CHECK(arg);
2582  const auto& arg_ti = arg->get_type_info();
2583  // Avoid calling getExpressionRange for variable length types (string and array),
2584  // it'd trigger an assertion since that API expects to be called only for types
2585  // for which the notion of range is well-defined. A bit of a kludge, but the
2586  // logic to reject these types anyway is at lower levels in the stack and not
2587  // really worth pulling into a separate function for now.
2588  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2589  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2590  continue;
2591  }
2592  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2593  if (arg_range.getType() != ExpressionRangeType::Integer) {
2594  continue;
2595  }
2596  // When running distributed, the threshold for using the precise implementation
2597  // must be consistent across all leaves, otherwise we could have a mix of precise
2598  // and approximate bitmaps and we cannot aggregate them.
2599  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2600  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2601  const auto sub_bitmap_count =
2602  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2603  int64_t approx_bitmap_sz_bits{0};
2604  const auto error_rate =
2605  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2606  if (error_rate) {
2607  CHECK(error_rate->get_type_info().get_type() == kINT);
2608  CHECK_GE(error_rate->get_constval().intval, 1);
2609  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2610  } else {
2611  approx_bitmap_sz_bits = g_hll_precision_bits;
2612  }
2613  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2614  arg_range.getIntMin(),
2615  approx_bitmap_sz_bits,
2616  true,
2617  device_type,
2618  sub_bitmap_count};
2619  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2620  arg_range.getIntMin(),
2621  bitmap_sz_bits,
2622  false,
2623  device_type,
2624  sub_bitmap_count};
2625  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2626  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2627  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2628  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2629  target_exprs_owned.push_back(precise_count_distinct);
2630  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2631  }
2632  }
2633  return ra_exe_unit;
2634 }
2635 
2637  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2638  const std::vector<TargetMetaInfo>& targets_meta) {
2639  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2640  render_info.targets.clear();
2641  for (size_t i = 0; i < targets_meta.size(); ++i) {
2642  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2643  targets_meta[i].get_resname(),
2644  work_unit_target_exprs[i]->get_shared_ptr(),
2645  false));
2646  }
2647 }
2648 
2649 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2650  const CompilationOptions& co,
2651  const ExecutionOptions& eo) {
2653  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2654 }
2655 
2656 } // namespace
2657 
2659  const RelAlgExecutor::WorkUnit& work_unit,
2660  const std::vector<TargetMetaInfo>& targets_meta,
2661  const bool is_agg,
2662  const CompilationOptions& co_in,
2663  const ExecutionOptions& eo,
2664  RenderInfo* render_info,
2665  const int64_t queue_time_ms,
2666  const ssize_t previous_count) {
2667  INJECT_TIMER(executeWorkUnit);
2668  auto timer = DEBUG_TIMER(__func__);
2669 
2670  auto co = co_in;
2671  ColumnCacheMap column_cache;
2672  if (is_window_execution_unit(work_unit.exe_unit)) {
2674  throw std::runtime_error("Window functions support is disabled");
2675  }
2676  co.device_type = ExecutorDeviceType::CPU;
2677  co.allow_lazy_fetch = false;
2678  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2679  }
2680  if (!eo.just_explain && eo.find_push_down_candidates) {
2681  // find potential candidates:
2682  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2683  if (!selected_filters.empty() || eo.just_calcite_explain) {
2684  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2685  }
2686  }
2687  if (render_info && render_info->isPotentialInSituRender()) {
2688  co.allow_lazy_fetch = false;
2689  }
2690  const auto body = work_unit.body;
2691  CHECK(body);
2692  auto it = leaf_results_.find(body->getId());
2693  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2694  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2695  if (it != leaf_results_.end()) {
2697  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2698  auto& aggregated_result = it->second;
2699  auto& result_rows = aggregated_result.rs;
2700  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2701  body->setOutputMetainfo(aggregated_result.targets_meta);
2702  if (render_info) {
2703  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2704  }
2705  return result;
2706  }
2707  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2708 
2710  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2711  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2712  if (is_window_execution_unit(ra_exe_unit)) {
2713  CHECK_EQ(table_infos.size(), size_t(1));
2714  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2715  max_groups_buffer_entry_guess =
2716  table_infos.front().info.fragments.front().getNumTuples();
2717  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2718  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2719  if (previous_count > 0 && !exe_unit_has_quals(ra_exe_unit)) {
2720  ra_exe_unit.scan_limit = static_cast<size_t>(previous_count);
2721  } else {
2722  // TODO(adb): enable bump allocator path for render queries
2723  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2724  ra_exe_unit.scan_limit = 0;
2725  ra_exe_unit.use_bump_allocator = true;
2726  } else if (eo.executor_type == ::ExecutorType::Extern) {
2727  ra_exe_unit.scan_limit = 0;
2728  } else if (!eo.just_explain) {
2729  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2730  if (filter_count_all >= 0) {
2731  ra_exe_unit.scan_limit = std::max(filter_count_all, ssize_t(1));
2732  }
2733  }
2734  }
2735  }
2736  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2737  co.device_type,
2739  nullptr,
2740  executor_),
2741  {}};
2742 
2743  auto execute_and_handle_errors =
2744  [&](const auto max_groups_buffer_entry_guess_in,
2745  const bool has_cardinality_estimation) -> ExecutionResult {
2746  // Note that the groups buffer entry guess may be modified during query execution.
2747  // Create a local copy so we can track those changes if we need to attempt a retry
2748  // due to OOM
2749  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2750  try {
2751  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2752  is_agg,
2753  table_infos,
2754  ra_exe_unit,
2755  co,
2756  eo,
2757  cat_,
2758  render_info,
2759  has_cardinality_estimation,
2760  column_cache),
2761  targets_meta};
2762  } catch (const QueryExecutionError& e) {
2763  handlePersistentError(e.getErrorCode());
2764  return handleOutOfMemoryRetry(
2765  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2766  targets_meta,
2767  is_agg,
2768  co,
2769  eo,
2770  render_info,
2772  queue_time_ms);
2773  }
2774  };
2775 
2776  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2777  try {
2778  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2779  auto card = cached_cardinality.second;
2780  if (cached_cardinality.first && card >= 0) {
2781  result = execute_and_handle_errors(card, true);
2782  } else {
2783  result = execute_and_handle_errors(
2784  max_groups_buffer_entry_guess,
2786  }
2787  } catch (const CardinalityEstimationRequired& e) {
2788  // check the cardinality cache
2789  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2790  auto card = cached_cardinality.second;
2791  if (cached_cardinality.first && card >= 0) {
2792  result = execute_and_handle_errors(card, true);
2793  } else {
2794  const auto estimated_groups_buffer_entry_guess =
2795  2 * std::min(groups_approx_upper_bound(table_infos),
2796  getNDVEstimation(work_unit, e.range(), is_agg, co, eo));
2797  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2798  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2799  if (!(eo.just_validate || eo.just_explain)) {
2800  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
2801  }
2802  }
2803  }
2804 
2805  result.setQueueTime(queue_time_ms);
2806  if (render_info) {
2807  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2808  if (render_info->isPotentialInSituRender()) {
2809  // return an empty result (with the same queue time, and zero render time)
2810  return {std::make_shared<ResultSet>(
2811  queue_time_ms,
2812  0,
2813  executor_->row_set_mem_owner_
2814  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2815  : nullptr),
2816  {}};
2817  }
2818  }
2819  return result;
2820 }
2821 
2823  const bool is_agg,
2824  const CompilationOptions& co,
2825  const ExecutionOptions& eo) {
2826  const auto count =
2827  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2828  kCOUNT,
2829  nullptr,
2830  false,
2831  nullptr);
2832  const auto count_all_exe_unit =
2833  create_count_all_execution_unit(work_unit.exe_unit, count);
2834  size_t one{1};
2835  ResultSetPtr count_all_result;
2836  try {
2837  ColumnCacheMap column_cache;
2838  count_all_result =
2839  executor_->executeWorkUnit(one,
2840  is_agg,
2841  get_table_infos(work_unit.exe_unit, executor_),
2842  count_all_exe_unit,
2843  co,
2844  eo,
2845  cat_,
2846  nullptr,
2847  false,
2848  column_cache);
2849  } catch (const std::exception& e) {
2850  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2851  return -1;
2852  }
2853  const auto count_row = count_all_result->getNextRow(false, false);
2854  CHECK_EQ(size_t(1), count_row.size());
2855  const auto& count_tv = count_row.front();
2856  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2857  CHECK(count_scalar_tv);
2858  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2859  CHECK(count_ptr);
2860  CHECK_GE(*count_ptr, 0);
2861  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2862  return std::max(count_upper_bound, size_t(1));
2863 }
2864 
2865 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2866  const auto& ra_exe_unit = work_unit.exe_unit;
2867  if (ra_exe_unit.input_descs.size() != 1) {
2868  return false;
2869  }
2870  const auto& table_desc = ra_exe_unit.input_descs.front();
2871  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2872  return false;
2873  }
2874  const int table_id = table_desc.getTableId();
2875  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
2876  const auto comp_expr =
2877  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2878  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2879  return false;
2880  }
2881  const auto lhs = comp_expr->get_left_operand();
2882  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2883  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2884  return false;
2885  }
2886  const auto rhs = comp_expr->get_right_operand();
2887  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2888  if (!rhs_const) {
2889  return false;
2890  }
2891  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2892  if (cd->isVirtualCol) {
2893  CHECK_EQ("rowid", cd->columnName);
2894  return true;
2895  }
2896  }
2897  return false;
2898 }
2899 
2901  const RelAlgExecutor::WorkUnit& work_unit,
2902  const std::vector<TargetMetaInfo>& targets_meta,
2903  const bool is_agg,
2904  const CompilationOptions& co,
2905  const ExecutionOptions& eo,
2906  RenderInfo* render_info,
2907  const bool was_multifrag_kernel_launch,
2908  const int64_t queue_time_ms) {
2909  // Disable the bump allocator
2910  // Note that this will have basically the same affect as using the bump allocator for
2911  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2912  // path and the bump allocator path for kernel per fragment execution.
2913  auto ra_exe_unit_in = work_unit.exe_unit;
2914  ra_exe_unit_in.use_bump_allocator = false;
2915 
2916  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2917  co.device_type,
2919  nullptr,
2920  executor_),
2921  {}};
2922 
2923  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2924  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2925  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2926  false,
2927  false,
2928  eo.allow_loop_joins,
2929  eo.with_watchdog,
2930  eo.jit_debug,
2931  false,
2934  false,
2935  false,
2937  false,
2939  eo.executor_type,
2941 
2942  if (was_multifrag_kernel_launch) {
2943  try {
2944  // Attempt to retry using the kernel per fragment path. The smaller input size
2945  // required may allow the entire kernel to execute in GPU memory.
2946  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2947  "kernels disabled.";
2948  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2949  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
2950  ColumnCacheMap column_cache;
2951  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2952  is_agg,
2953  table_infos,
2954  ra_exe_unit,
2955  co,
2956  eo_no_multifrag,
2957  cat_,
2958  nullptr,
2959  true,
2960  column_cache),
2961  targets_meta};
2962  result.setQueueTime(queue_time_ms);
2963  } catch (const QueryExecutionError& e) {
2964  handlePersistentError(e.getErrorCode());
2965  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2966  }
2967  }
2968 
2969  if (render_info) {
2970  render_info->setForceNonInSituData();
2971  }
2972 
2973  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
2974  // Only reset the group buffer entry guess if we ran out of slots, which
2975  // suggests a
2976  // highly pathological input which prevented a good estimation of distinct tuple
2977  // count. For projection queries, this will force a per-fragment scan limit, which is
2978  // compatible with the CPU path
2979  VLOG(1) << "Resetting max groups buffer entry guess.";
2980  max_groups_buffer_entry_guess = 0;
2981 
2982  int iteration_ctr = -1;
2983  while (true) {
2984  iteration_ctr++;
2986  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
2987  ColumnCacheMap column_cache;
2988  try {
2989  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2990  is_agg,
2991  table_infos,
2992  ra_exe_unit,
2993  co_cpu,
2994  eo_no_multifrag,
2995  cat_,
2996  nullptr,
2997  true,
2998  column_cache),
2999  targets_meta};
3000  } catch (const QueryExecutionError& e) {
3001  // Ran out of slots
3002  if (e.getErrorCode() < 0) {
3003  // Even the conservative guess failed; it should only happen when we group
3004  // by a huge cardinality array. Maybe we should throw an exception instead?
3005  // Such a heavy query is entirely capable of exhausting all the host memory.
3006  CHECK(max_groups_buffer_entry_guess);
3007  // Only allow two iterations of increasingly large entry guesses up to a maximum
3008  // of 512MB per column per kernel
3009  if (g_enable_watchdog || iteration_ctr > 1) {
3010  throw std::runtime_error("Query ran out of output slots in the result");
3011  }
3012  max_groups_buffer_entry_guess *= 2;
3013  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3014  "groups buffer entry "
3015  "guess equal to "
3016  << max_groups_buffer_entry_guess;
3017  } else {
3018  handlePersistentError(e.getErrorCode());
3019  }
3020  continue;
3021  }
3022  result.setQueueTime(queue_time_ms);
3023  return result;
3024  }
3025  return result;
3026 }
3027 
3029  LOG(ERROR) << "Query execution failed with error "
3030  << getErrorMessageFromCode(error_code);
3031  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3032  // We ran out of GPU memory, this doesn't count as an error if the query is
3033  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3034  // --allow-cpu-retry.
3035  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3036  if (!g_allow_cpu_retry) {
3037  throw std::runtime_error(
3038  "Query ran out of GPU memory, unable to automatically retry on CPU");
3039  }
3040  return;
3041  }
3042  throw std::runtime_error(getErrorMessageFromCode(error_code));
3043 }
3044 
3046  if (error_code < 0) {
3047  return "Ran out of slots in the query output buffer";
3048  }
3049  switch (error_code) {
3051  return "Division by zero";
3053  return "Query couldn't keep the entire working set of columns in GPU memory";
3055  return "Self joins not supported yet";
3057  return "Not enough host memory to execute the query";
3059  return "Overflow or underflow";
3061  return "Query execution has exceeded the time limit";
3063  return "Query execution has been interrupted";
3065  return "Columnar conversion not supported for variable length types";
3067  return "Too many literals in the query";
3069  return "NONE ENCODED String types are not supported as input result set.";
3071  return "Not enough OpenGL memory to render the query results";
3073  return "Streaming-Top-N not supported in Render Query";
3075  return "Multiple distinct values encountered";
3076  case Executor::ERR_GEOS:
3077  return "Geos call failure";
3078  }
3079  return "Other error: code " + std::to_string(error_code);
3080 }
3081 
3083  const SortInfo& sort_info,
3084  const ExecutionOptions& eo) {
3085  const auto compound = dynamic_cast<const RelCompound*>(node);
3086  if (compound) {
3087  return createCompoundWorkUnit(compound, sort_info, eo);
3088  }
3089  const auto project = dynamic_cast<const RelProject*>(node);
3090  if (project) {
3091  return createProjectWorkUnit(project, sort_info, eo);
3092  }
3093  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3094  if (aggregate) {
3095  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3096  }
3097  const auto filter = dynamic_cast<const RelFilter*>(node);
3098  if (filter) {
3099  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3100  }
3101  LOG(FATAL) << "Unhandled node type: " << node->toString();
3102  return {};
3103 }
3104 
3105 namespace {
3106 
3108  auto sink = get_data_sink(ra);
3109  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3110  return join->getJoinType();
3111  }
3112  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3113  return JoinType::INNER;
3114  }
3115 
3116  return JoinType::INVALID;
3117 }
3118 
3119 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3120  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3121  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3122  return nullptr;
3123  }
3124  const auto equi_join_condition =
3125  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3126  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3127  return nullptr;
3128  }
3129  const auto both_are_null_condition =
3130  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3131  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3132  both_are_null_condition->size() != 2) {
3133  return nullptr;
3134  }
3135  const auto lhs_is_null =
3136  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3137  const auto rhs_is_null =
3138  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3139  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3140  rhs_is_null->getOperator() != kISNULL) {
3141  return nullptr;
3142  }
3143  CHECK_EQ(size_t(1), lhs_is_null->size());
3144  CHECK_EQ(size_t(1), rhs_is_null->size());
3145  CHECK_EQ(size_t(2), equi_join_condition->size());
3146  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3147  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3148  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3149  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3150  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3151  return nullptr;
3152  }
3153  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3154  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3155  RexDeepCopyVisitor deep_copy_visitor;
3156  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3157  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3158  eq_operands.emplace_back(lhs_op_copy.release());
3159  eq_operands.emplace_back(rhs_op_copy.release());
3160  return boost::make_unique<const RexOperator>(
3161  kBW_EQ, eq_operands, equi_join_condition->getType());
3162  }
3163  return nullptr;
3164 }
3165 
3166 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3167  const RexScalar* scalar) {
3168  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3169  if (condition && condition->getOperator() == kAND) {
3170  CHECK_GE(condition->size(), size_t(2));
3171  auto acc = get_bitwise_equals(condition->getOperand(0));
3172  if (!acc) {
3173  return nullptr;
3174  }
3175  for (size_t i = 1; i < condition->size(); ++i) {
3176  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3177  and_operands.emplace_back(std::move(acc));
3178  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3179  acc =
3180  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3181  }
3182  return acc;
3183  }
3184  return get_bitwise_equals(scalar);
3185 }
3186 
3187 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3188  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3189  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3190  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3191  ++nesting_level) {
3192  if (left_deep_join->getOuterCondition(nesting_level)) {
3193  join_types[nesting_level - 1] = JoinType::LEFT;
3194  }
3195  }
3196  return join_types;
3197 }
3198 
3199 template <class RA>
3200 std::vector<size_t> do_table_reordering(
3201  std::vector<InputDescriptor>& input_descs,
3202  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3203  const JoinQualsPerNestingLevel& left_deep_join_quals,
3204  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3205  const RA* node,
3206  const std::vector<InputTableInfo>& query_infos,
3207  const Executor* executor) {
3208  if (g_cluster) {
3209  // Disable table reordering in distributed mode. The aggregator does not have enough
3210  // information to break ties
3211  return {};
3212  }
3213  const auto& cat = *executor->getCatalog();
3214  for (const auto& table_info : query_infos) {
3215  if (table_info.table_id < 0) {
3216  continue;
3217  }
3218  const auto td = cat.getMetadataForTable(table_info.table_id);
3219  CHECK(td);
3220  if (table_is_replicated(td)) {
3221  return {};
3222  }
3223  }
3224  const auto input_permutation =
3225  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3226  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3227  std::tie(input_descs, input_col_descs, std::ignore) =
3228  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3229  return input_permutation;
3230 }
3231 
3233  const RelLeftDeepInnerJoin* left_deep_join) {
3234  std::vector<size_t> input_sizes;
3235  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3236  const auto inputs = get_node_output(left_deep_join->getInput(i));
3237  input_sizes.push_back(inputs.size());
3238  }
3239  return input_sizes;
3240 }
3241 
3242 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3243  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3244  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3245  for (const auto& qual : quals) {
3246  const auto rewritten_qual = rewrite_expr(qual.get());
3247  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3248  }
3249  return rewritten_quals;
3250 }
3251 
3252 } // namespace
3253 
3255  const RelCompound* compound,
3256  const SortInfo& sort_info,
3257  const ExecutionOptions& eo) {
3258  std::vector<InputDescriptor> input_descs;
3259  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3260  auto input_to_nest_level = get_input_nest_levels(compound, {});
3261  std::tie(input_descs, input_col_descs, std::ignore) =
3262  get_input_desc(compound, input_to_nest_level, {}, cat_);
3263  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3264  const auto query_infos = get_table_infos(input_descs, executor_);
3265  CHECK_EQ(size_t(1), compound->inputCount());
3266  const auto left_deep_join =
3267  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3268  JoinQualsPerNestingLevel left_deep_join_quals;
3269  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3270  : std::vector<JoinType>{get_join_type(compound)};
3271  std::vector<size_t> input_permutation;
3272  std::vector<size_t> left_deep_join_input_sizes;
3273  if (left_deep_join) {
3274  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3275  left_deep_join_quals = translateLeftDeepJoinFilter(
3276  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3278  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3279  join_types.end()) {
3280  input_permutation = do_table_reordering(input_descs,
3281  input_col_descs,
3282  left_deep_join_quals,
3283  input_to_nest_level,
3284  compound,
3285  query_infos,
3286  executor_);
3287  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3288  std::tie(input_descs, input_col_descs, std::ignore) =
3289  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3290  left_deep_join_quals = translateLeftDeepJoinFilter(
3291  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3292  }
3293  }
3294  RelAlgTranslator translator(cat_,
3295  query_state_,
3296  executor_,
3297  input_to_nest_level,
3298  join_types,
3299  now_,
3300  eo.just_explain);
3301  const auto scalar_sources =
3302  translate_scalar_sources(compound, translator, eo.executor_type);
3303  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3304  const auto quals_cf = translate_quals(compound, translator);
3305  const auto target_exprs = translate_targets(target_exprs_owned_,
3306  scalar_sources,
3307  groupby_exprs,
3308  compound,
3309  translator,
3310  eo.executor_type);
3311  CHECK_EQ(compound->size(), target_exprs.size());
3312  const RelAlgExecutionUnit exe_unit = {input_descs,
3313  input_col_descs,
3314  quals_cf.simple_quals,
3315  rewrite_quals(quals_cf.quals),
3316  left_deep_join_quals,
3317  groupby_exprs,
3318  target_exprs,
3319  nullptr,
3320  sort_info,
3321  0,
3322  false,
3323  std::nullopt,
3324  query_state_};
3325  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3326  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3327  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3328  compound->setOutputMetainfo(targets_meta);
3329  return {rewritten_exe_unit,
3330  compound,
3331  max_groups_buffer_entry_default_guess,
3332  std::move(query_rewriter),
3333  input_permutation,
3334  left_deep_join_input_sizes};
3335 }
3336 
3337 namespace {
3338 
3339 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3340  CHECK(qual_expr);
3341  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3342  if (!bin_oper || bin_oper->getOperator() != kAND) {
3343  return {qual_expr};
3344  }
3345  CHECK_GE(bin_oper->size(), size_t(2));
3346  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3347  for (size_t i = 1; i < bin_oper->size(); ++i) {
3348  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3349  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3350  }
3351  return lhs_cf;
3352 }
3353 
3354 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3355  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3356  const SQLOps sql_op) {
3357  CHECK(!factors.empty());
3358  auto acc = factors.front();
3359  for (size_t i = 1; i < factors.size(); ++i) {
3360  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3361  }
3362  return acc;
3363 }
3364 
3365 template <class QualsList>
3366 bool list_contains_expression(const QualsList& haystack,
3367  const std::shared_ptr<Analyzer::Expr>& needle) {
3368  for (const auto& qual : haystack) {
3369  if (*qual == *needle) {
3370  return true;
3371  }
3372  }
3373  return false;
3374 }
3375 
3376 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3377 // evaluations of `p` and allows use of the original form in joins if `p`
3378 // can be used for hash joins.
3379 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3380  const std::shared_ptr<Analyzer::Expr>& expr) {
3381  const auto expr_terms = qual_to_disjunctive_form(expr);
3382  CHECK_GE(expr_terms.size(), size_t(1));
3383  const auto& first_term = expr_terms.front();
3384  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3385  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3386  // First, collect the conjunctive components common to all the disjunctive components.
3387  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3388  for (const auto& first_term_factor : first_term_factors.quals) {
3389  bool is_common =
3390  expr_terms.size() > 1; // Only report common factors for disjunction.
3391  for (size_t i = 1; i < expr_terms.size(); ++i) {
3392  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3393  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3394  is_common = false;
3395  break;
3396  }
3397  }
3398  if (is_common) {
3399  common_factors.push_back(first_term_factor);
3400  }
3401  }
3402  if (common_factors.empty()) {
3403  return expr;
3404  }
3405  // Now that the common expressions are known, collect the remaining expressions.
3406  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3407  for (const auto& term : expr_terms) {
3408  const auto term_cf = qual_to_conjunctive_form(term);
3409  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3410  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3411  for (const auto& qual : term_cf.quals) {
3412  if (!list_contains_expression(common_factors, qual)) {
3413  remaining_quals.push_back(qual);
3414  }
3415  }
3416  if (!remaining_quals.empty()) {
3417  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3418  }
3419  }
3420  // Reconstruct the expression with the transformation applied.
3421  const auto common_expr = build_logical_expression(common_factors, kAND);
3422  if (remaining_terms.empty()) {
3423  return common_expr;
3424  }
3425  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3426  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3427 }
3428 
3429 } // namespace
3430 
3431 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3432  const RexScalar* join_condition,
3433  const std::vector<JoinType>& join_types,
3434  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3435  const bool just_explain) const {
3436  RelAlgTranslator translator(
3437  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3438  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3439  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3440  for (const auto rex_condition_component : rex_condition_cf) {
3441  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3442  const auto join_condition =
3444  bw_equals ? bw_equals.get() : rex_condition_component));
3445  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3446  join_condition_quals.insert(join_condition_quals.end(),
3447  join_condition_cf.quals.begin(),
3448  join_condition_cf.quals.end());
3449  join_condition_quals.insert(join_condition_quals.end(),
3450  join_condition_cf.simple_quals.begin(),
3451  join_condition_cf.simple_quals.end());
3452  }
3453  return combine_equi_join_conditions(join_condition_quals);
3454 }
3455 
3456 // Translate left deep join filter and separate the conjunctive form qualifiers
3457 // per nesting level. The code generated for hash table lookups on each level
3458 // must dominate its uses in deeper nesting levels.
3460  const RelLeftDeepInnerJoin* join,
3461  const std::vector<InputDescriptor>& input_descs,
3462  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3463  const bool just_explain) {
3464  const auto join_types = left_deep_join_types(join);
3465  const auto join_condition_quals = makeJoinQuals(
3466  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3467  MaxRangeTableIndexVisitor rte_idx_visitor;
3468  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3469  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3470  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3471  const auto outer_condition = join->getOuterCondition(rte_idx);
3472  if (outer_condition) {
3473  result[rte_idx - 1].quals =
3474  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3475  CHECK_LE(rte_idx, join_types.size());
3476  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3477  result[rte_idx - 1].type = JoinType::LEFT;
3478  continue;
3479  }
3480  for (const auto& qual : join_condition_quals) {
3481  if (visited_quals.count(qual)) {
3482  continue;
3483  }
3484  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3485  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3486  const auto it_ok = visited_quals.emplace(qual);
3487  CHECK(it_ok.second);
3488  result[rte_idx - 1].quals.push_back(qual);
3489  }
3490  }
3491  CHECK_LE(rte_idx, join_types.size());
3492  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3493  result[rte_idx - 1].type = JoinType::INNER;
3494  }
3495  return result;
3496 }
3497 
3498 namespace {
3499 
3500 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3501  const RelAlgNode* ra_node,
3502  const size_t nest_level,
3503  const std::vector<TargetMetaInfo>& in_metainfo,
3504  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3505  CHECK_LE(size_t(1), ra_node->inputCount());
3506  CHECK_GE(size_t(2), ra_node->inputCount());
3507  const auto input = ra_node->getInput(nest_level);
3508  const auto it_rte_idx = input_to_nest_level.find(input);
3509  CHECK(it_rte_idx != input_to_nest_level.end());
3510  const int rte_idx = it_rte_idx->second;
3511  const int table_id = table_id_from_ra(input);
3512  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3513  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3514  int input_idx = 0;
3515  for (const auto& input_meta : in_metainfo) {
3516  inputs.push_back(
3517  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3518  table_id,
3519  scan_ra ? input_idx + 1 : input_idx,
3520  rte_idx));
3521  ++input_idx;
3522  }
3523  return inputs;
3524 }
3525 
3526 } // namespace
3527 
3529  const RelAggregate* aggregate,
3530  const SortInfo& sort_info,
3531  const bool just_explain) {
3532  std::vector<InputDescriptor> input_descs;
3533  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3534  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3535  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3536  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3537  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3538  const auto join_type = get_join_type(aggregate);
3539 
3540  RelAlgTranslator translator(cat_,
3541  query_state_,
3542  executor_,
3543  input_to_nest_level,
3544  {join_type},
3545  now_,
3546  just_explain);
3547  CHECK_EQ(size_t(1), aggregate->inputCount());
3548  const auto source = aggregate->getInput(0);
3549  const auto& in_metainfo = source->getOutputMetainfo();
3550  const auto scalar_sources =
3551  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3552  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3553  const auto target_exprs = translate_targets(
3554  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3555  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3556  aggregate->setOutputMetainfo(targets_meta);
3557  return {RelAlgExecutionUnit{input_descs,
3558  input_col_descs,
3559  {},
3560  {},
3561  {},
3562  groupby_exprs,
3563  target_exprs,
3564  nullptr,
3565  sort_info,
3566  0,
3567  false,
3568  std::nullopt,
3569  query_state_},
3570  aggregate,
3571  max_groups_buffer_entry_default_guess,
3572  nullptr};
3573 }
3574 
3576  const RelProject* project,
3577  const SortInfo& sort_info,
3578  const ExecutionOptions& eo) {
3579  std::vector<InputDescriptor> input_descs;
3580  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3581  auto input_to_nest_level = get_input_nest_levels(project, {});
3582  std::tie(input_descs, input_col_descs, std::ignore) =
3583  get_input_desc(project, input_to_nest_level, {}, cat_);
3584  const auto query_infos = get_table_infos(input_descs, executor_);
3585 
3586  const auto left_deep_join =
3587  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3588  JoinQualsPerNestingLevel left_deep_join_quals;
3589  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3590  : std::vector<JoinType>{get_join_type(project)};
3591  std::vector<size_t> input_permutation;
3592  std::vector<size_t> left_deep_join_input_sizes;
3593  if (left_deep_join) {
3594  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3595  const auto query_infos = get_table_infos(input_descs, executor_);
3596  left_deep_join_quals = translateLeftDeepJoinFilter(
3597  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3599  input_permutation = do_table_reordering(input_descs,
3600  input_col_descs,
3601  left_deep_join_quals,
3602  input_to_nest_level,
3603  project,
3604  query_infos,
3605  executor_);
3606  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3607  std::tie(input_descs, input_col_descs, std::ignore) =
3608  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3609  left_deep_join_quals = translateLeftDeepJoinFilter(
3610  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3611  }
3612  }
3613 
3614  RelAlgTranslator translator(cat_,
3615  query_state_,
3616  executor_,
3617  input_to_nest_level,
3618  join_types,
3619  now_,
3620  eo.just_explain);
3621  const auto target_exprs_owned =
3622  translate_scalar_sources(project, translator, eo.executor_type);
3623  target_exprs_owned_.insert(
3624  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3625  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3626 
3627  const RelAlgExecutionUnit exe_unit = {input_descs,
3628  input_col_descs,
3629  {},
3630  {},
3631  left_deep_join_quals,
3632  {nullptr},
3633  target_exprs,
3634  nullptr,
3635  sort_info,
3636  0,
3637  false,
3638  std::nullopt,
3639  query_state_};
3640  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3641  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3642  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3643  project->setOutputMetainfo(targets_meta);
3644  return {rewritten_exe_unit,
3645  project,
3646  max_groups_buffer_entry_default_guess,
3647  std::move(query_rewriter),
3648  input_permutation,
3649  left_deep_join_input_sizes};
3650 }
3651 
3652 namespace {
3653 
3654 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
3655  RelAlgNode const* input_node) {
3656  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
3657  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
3658  const int negative_node_id = -input_node->getId();
3659  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
3660  target_exprs.reserve(tmis.size());
3661  for (size_t i = 0; i < tmis.size(); ++i) {
3662  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
3663  tmis[i].get_type_info(), negative_node_id, i, 0));
3664  }
3665  return target_exprs;
3666 }
3667 
3668 } // namespace
3669 
3671  const RelLogicalUnion* logical_union,
3672  const SortInfo& sort_info,
3673  const ExecutionOptions& eo) {
3674  std::vector<InputDescriptor> input_descs;
3675  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3676  // Map ra input ptr to index (0, 1).
3677  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3678  std::tie(input_descs, input_col_descs, std::ignore) =
3679  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3680  const auto query_infos = get_table_infos(input_descs, executor_);
3681  auto const max_num_tuples =
3682  std::accumulate(query_infos.cbegin(),
3683  query_infos.cend(),
3684  size_t(0),
3685  [](auto max, auto const& query_info) {
3686  return std::max(max, query_info.info.getNumTuples());
3687  });
3688 
3689  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3690  for (auto& pair : input_to_nest_level) {
3691  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3692  }
3693 
3694  RelAlgTranslator translator(
3695  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3696 
3697  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3698  CHECK(!input_exprs_owned.empty())
3699  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3700  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3701  for (auto& input_expr : input_exprs_owned) {
3702  VLOG(3) << " " << input_expr->toString();
3703  }
3704  target_exprs_owned_.insert(
3705  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3706  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3707 
3708  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3709  << " input_col_descs=" << shared::printContainer(input_col_descs)
3710  << " target_exprs.size()=" << target_exprs.size()
3711  << " max_num_tuples=" << max_num_tuples;
3712 
3713  const RelAlgExecutionUnit exe_unit = {input_descs,
3714  input_col_descs,
3715  {}, // quals_cf.simple_quals,
3716  {}, // rewrite_quals(quals_cf.quals),
3717  {},
3718  {nullptr},
3719  target_exprs,
3720  nullptr,
3721  sort_info,
3722  max_num_tuples,
3723  false,
3724  logical_union->isAll(),
3725  query_state_};
3726  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3727  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3728 
3729  RelAlgNode const* input0 = logical_union->getInput(0);
3730  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3731  logical_union->setOutputMetainfo(
3732  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3733  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3734  logical_union->setOutputMetainfo(
3735  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3736  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3737  logical_union->setOutputMetainfo(
3738  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3739  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
3740  logical_union->setOutputMetainfo(
3741  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3742  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
3743  logical_union->setOutputMetainfo(
3744  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3745  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
3746  logical_union->setOutputMetainfo(
3747  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3748  } else if (dynamic_cast<const RelSort*>(input0)) {
3749  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
3750  } else {
3751  throw QueryNotSupported("Unsupported input type: " + input0->toString());
3752  }
3753  VLOG(3) << "logical_union->getOutputMetainfo()="
3754  << shared::printContainer(logical_union->getOutputMetainfo())
3755  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
3756  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
3757 
3758  return {rewritten_exe_unit,
3759  logical_union,
3760  max_groups_buffer_entry_default_guess,
3761  std::move(query_rewriter)};
3762 }
3763 
3765  const RelTableFunction* table_func,
3766  const bool just_explain) {
3767  std::vector<InputDescriptor> input_descs;
3768  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3769  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3770  std::tie(input_descs, input_col_descs, std::ignore) =
3771  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3772  const auto query_infos = get_table_infos(input_descs, executor_);
3773 
3774  RelAlgTranslator translator(
3775  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
3776  const auto input_exprs_owned =
3777  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3778  target_exprs_owned_.insert(
3779  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3780  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3781 
3782  const auto& table_function_impl =
3784 
3785  std::optional<size_t> output_row_multiplier;
3786  if (table_function_impl.hasUserSpecifiedOutputMultiplier()) {
3787  const auto parameter_index = table_function_impl.getOutputRowParameter();
3788  CHECK_GT(parameter_index, size_t(0));
3789  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3790  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3791  if (!parameter_expr_literal) {
3792  throw std::runtime_error(
3793  "Provided output buffer multiplier parameter is not a literal. Only literal "
3794  "values are supported with output buffer multiplier configured table "
3795  "functions.");
3796  }
3797  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3798  if (literal_val < 0) {
3799  throw std::runtime_error("Provided output row multiplier " +
3800  std::to_string(literal_val) +
3801  " is not valid for table functions.");
3802  }
3803  output_row_multiplier = static_cast<size_t>(literal_val);
3804  }
3805 
3806  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3807  size_t index = 0;
3808  for (auto input_expr : input_exprs) {
3809  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3810  input_expr->set_type_info(table_function_impl.getInputSQLType(index));
3811  input_col_exprs.push_back(col_var);
3812  }
3813  index++;
3814  }
3815  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3816 
3817  std::vector<Analyzer::Expr*> table_func_outputs;
3818  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3819  const auto ti = table_function_impl.getOutputSQLType(i);
3820  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3821  table_func_outputs.push_back(target_exprs_owned_.back().get());
3822  }
3823 
3824  const TableFunctionExecutionUnit exe_unit = {
3825  input_descs,
3826  input_col_descs,
3827  input_exprs, // table function inputs
3828  input_col_exprs, // table function column inputs (duplicates w/ above)
3829  table_func_outputs, // table function projected exprs
3830  output_row_multiplier, // output buffer multiplier
3831  table_func->getFunctionName()};
3832  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3833  table_func->setOutputMetainfo(targets_meta);
3834  return {exe_unit, table_func};
3835 }
3836 
3837 namespace {
3838 
3839 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3841  const RelAlgTranslator& translator,
3842  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3843  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3844  std::vector<TargetMetaInfo> in_metainfo;
3845  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3846  const auto data_sink_node = get_data_sink(filter);
3847  auto input_it = inputs_owned.begin();
3848  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3849  const auto source = data_sink_node->getInput(nest_level);
3850  const auto scan_source = dynamic_cast<const RelScan*>(source);
3851  if (scan_source) {
3852  CHECK(source->getOutputMetainfo().empty());
3853  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3854  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3855  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3856  }
3857  const auto source_metadata =
3858  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3859  in_metainfo.insert(
3860  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3861  exprs_owned.insert(
3862  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3863  } else {
3864  const auto& source_metadata = source->getOutputMetainfo();
3865  input_it += source_metadata.size();
3866  in_metainfo.insert(
3867  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3868  const auto scalar_sources_owned = synthesize_inputs(
3869  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3870  exprs_owned.insert(
3871  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3872  }
3873  }
3874  return std::make_pair(in_metainfo, exprs_owned);
3875 }
3876 
3877 } // namespace
3878 
3880  const SortInfo& sort_info,
3881  const bool just_explain) {
3882  CHECK_EQ(size_t(1), filter->inputCount());
3883  std::vector<InputDescriptor> input_descs;
3884  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3885  std::vector<TargetMetaInfo> in_metainfo;
3886  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3887  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3888 
3889  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3890  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3891  get_input_desc(filter, input_to_nest_level, {}, cat_);
3892  const auto join_type = get_join_type(filter);
3893  RelAlgTranslator translator(cat_,
3894  query_state_,
3895  executor_,
3896  input_to_nest_level,
3897  {join_type},
3898  now_,
3899  just_explain);
3900  std::tie(in_metainfo, target_exprs_owned) =
3901  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3902  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3903  const auto qual = fold_expr(filter_expr.get());
3904  target_exprs_owned_.insert(
3905  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3906  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3907  filter->setOutputMetainfo(in_metainfo);
3908  const auto rewritten_qual = rewrite_expr(qual.get());
3909  return {{input_descs,
3910  input_col_descs,
3911  {},
3912  {rewritten_qual ? rewritten_qual : qual},
3913  {},
3914  {nullptr},
3915  target_exprs,
3916  nullptr,
3917  sort_info,
3918  0},
3919  filter,
3920  max_groups_buffer_entry_default_guess,
3921  nullptr};
3922 }
3923 
std::string getFunctionName() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
const std::vector< TargetMetaInfo > getTupleType() const
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput *> &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const ExecutionResult & getResult() const
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelAggregate *aggregate, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
const size_t getGroupByCount() const
JoinType
Definition: sqldefs.h:107
#define EMPTY_KEY_64
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain)
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
const TableDescriptor * getTableDescriptor() const
bool is_string() const
Definition: sqltypes.h:416
const std::vector< std::shared_ptr< TargetEntry > > & get_targetlist() const
Definition: Analyzer.h:1611
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1190
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:945
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
Definition: sqltypes.h:51
int64_t insert_one_dict_str(T *col_data, const ColumnDescriptor *cd, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
size_t collationCount() const
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
size_t size() const override
unsigned getId() const
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:75
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
TableGenerations computeTableGenerations()
size_t getColInputsSize() const
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
#define LOG(tag)
Definition: Logger.h:188
const unsigned runtime_query_interrupt_frequency
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:78
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
void checkForMatchingMetaInfoTypes() const
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1456
const RexScalar * getProjectAt(const size_t idx) const
const std::list< Analyzer::OrderEntry > order_entries
bool isAll() const
const Rex * getTargetExpr(const size_t i) const
const Executor * executor_
Definition: ResultSet.h:850
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:98
std::string join(T const &container, std::string const &delim)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
int get_result_table_id() const
Definition: Analyzer.h:1627
Definition: sqldefs.h:38
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1670
#define UNREACHABLE()
Definition: Logger.h:241
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
#define CHECK_GE(x, y)
Definition: Logger.h:210
int64_t const int32_t sz
static WindowProjectNodeContext * create(Executor *executor)
const RexScalar * getTableFuncInputAt(const size_t idx) const
static const TableFunction & get(const std::string &name)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:819
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
Definition: sqldefs.h:49
Definition: sqldefs.h:30
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
bool g_enable_union
void check_sort_node_source_constraint(const RelSort *sort)
static const int32_t ERR_GEOS
Definition: Execute.h:951
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:947
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:192
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
static void invalidateCaches()
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:287
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
const RelAlgNode * getBody() const
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
SortField getCollation(const size_t i) const
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:41
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:374
size_t getIndex() const
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:948
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:949
Datum get_constval() const
Definition: Analyzer.h:336
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:946
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:157
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:937
bool hasInput(const RelAlgNode *needle) const
static std::shared_ptr< Analyzer::Expr > translateLiteral(const RexLiteral *)
const bool allow_multifrag
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
std::string cat(Ts &&... args)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:81
const bool just_validate
bool isSimple() const
Classes representing a parse tree.
const RelAlgNode * getSourceNode() const
bool isGeometry(TargetMetaInfo const &target_meta_info)
const SortInfo sort_info
ExecutorType executor_type
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const bool with_dynamic_watchdog
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:1628
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:941
const JoinQualsPerNestingLevel join_quals
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
bool g_enable_bump_allocator
Definition: Execute.cpp:104
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
bool hasRows() const
int getNestLevel() const
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr *> &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
const double gpu_input_mem_limit_percent
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
size_t g_big_group_threshold
Definition: Execute.cpp:97
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:943
const size_t getScalarSourcesSize() const
const RexScalar * getInnerCondition() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:944
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
Definition: sqldefs.h:37
Definition: sqldefs.h:75
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:940
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
specifies the content in-memory of a row in the column metadata table
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:950
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:938
size_t getOffset() const
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelLogicalUnion *logical_union, const Catalog_Namespace::Catalog &)
std::string * stringval
Definition: sqltypes.h:143
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:272
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
bool g_enable_window_functions
Definition: Execute.cpp:98
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
const RexScalar * getOuterCondition(const size_t nesting_level) const
const std::vector< size_t > outer_fragment_indices
bool get_is_null() const
Definition: Analyzer.h:335
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
Definition: sqltypes.h:54
Definition: sqltypes.h:55
bool is_geometry() const
Definition: sqltypes.h:428
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: sqldefs.h:69
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:198
const bool just_calcite_explain
const RexScalar * scalar_at(const size_t i, const RelTableFunction *table_func)
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:870
const RexScalar * getCondition() const
const size_t getGroupByCount() const
Executor * getExecutor() const
unsigned getIndex() const
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
bool is_null(const T &v, const SQLTypeInfo &t)
size_t getLimit() const
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
bool table_is_replicated(const TableDescriptor *td)
const RexScalar * getScalarSource(const size_t i) const
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
SQLAgg get_aggtype() const
Definition: Analyzer.h:1096
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1452
bool isAggregate() const
Definition: sqldefs.h:76
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput *> &aggregate, const std::unordered_set< const RexInput *> &next_result) const override
bool get_is_distinct() const
Definition: Analyzer.h:1099
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
Definition: sqltypes.h:43
int table_id_from_ra(const RelAlgNode *ra_node)
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1540
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
const size_t max_groups_buffer_entry_guess
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getValueAt(const size_t row_idx, const size_t col_idx) const
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput *> &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
RaExecutionDesc * getDescriptor(size_t idx) const
const RexScalar * getFilterExpr() const
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:623
virtual std::string toString() const =0
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
Definition: sqldefs.h:31
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:78
static std::string getErrorMessageFromCode(const int32_t error_code)
const RelAlgNode * getInput(const size_t idx) const
bool g_enable_watchdog
Definition: Execute.cpp:74
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void cleanupPostExecution()
std::vector< std::string > ColumnNameList
size_t getRowsSize() const
Definition: sqltypes.h:47
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:63
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
size_t get_scalar_sources_size(const RelTableFunction *table_func)
specifies the content in-memory of a row in the table metadata table
void sort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:521
const bool allow_runtime_query_interrupt
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
int8_t * numbersPtr
Definition: sqltypes.h:149
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Definition: sqldefs.h:74
bool sameTypeInfo(std::vector< TargetMetaInfo > const &lhs, std::vector< TargetMetaInfo > const &rhs)
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:942
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
void setResult(const ExecutionResult &result)
bool isEmptyResult() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
size_t getTableFuncInputsSize() const
Definition: sqldefs.h:72
static size_t getArenaBlockSize()
Definition: Execute.cpp:195
size_t getNumRows() const
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:174
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
const SQLTypeInfo & get_type_info() const
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:120
bool g_enable_table_functions
Definition: Execute.cpp:99
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:303
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelAggregate *aggregate, const RelAlgTranslator &translator)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)
const Expr * get_left_operand() const
Definition: Analyzer.h:443