OmniSciDB  7bf56492aa
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 
19 #include <algorithm>
20 #include <boost/range/adaptor/reversed.hpp>
21 #include <numeric>
22 
23 #include "Parser/ParserNode.h"
38 #include "QueryEngine/RexVisitor.h"
43 #include "Shared/measure.h"
44 #include "Shared/shard_key.h"
45 
47 extern bool g_enable_bump_allocator;
48 bool g_enable_interop{false};
49 
50 namespace {
51 
52 bool node_is_aggregate(const RelAlgNode* ra) {
53  const auto compound = dynamic_cast<const RelCompound*>(ra);
54  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
55  return ((compound && compound->isAggregate()) || aggregate);
56 }
57 
58 std::unordered_set<PhysicalInput> get_physical_inputs(
59  const Catalog_Namespace::Catalog& cat,
60  const RelAlgNode* ra) {
61  auto phys_inputs = get_physical_inputs(ra);
62  std::unordered_set<PhysicalInput> phys_inputs2;
63  for (auto& phi : phys_inputs) {
64  phys_inputs2.insert(
65  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
66  }
67  return phys_inputs2;
68 }
69 
70 } // namespace
71 
73  const ExecutionOptions& eo) {
75  return 0;
76  }
77 
78  if (eo.just_explain) {
79  return 0;
80  }
81 
83 
84  query_dag_->resetQueryExecutionState();
85  const auto& ra = query_dag_->getRootNode();
86 
87  std::lock_guard<std::mutex> lock(executor_->execute_mutex_);
88  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
89  const auto phys_inputs = get_physical_inputs(cat_, &ra);
90  const auto phys_table_ids = get_physical_table_inputs(&ra);
91  executor_->setCatalog(&cat_);
92  executor_->setupCaching(phys_inputs, phys_table_ids);
93 
94  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
95  auto ed_seq = RaExecutionSequence(&ra);
96 
97  if (!getSubqueries().empty()) {
98  return 0;
99  }
100 
101  CHECK(!ed_seq.empty());
102  if (ed_seq.size() > 1) {
103  return 0;
104  }
105 
106  decltype(temporary_tables_)().swap(temporary_tables_);
107  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
108  executor_->catalog_ = &cat_;
109  executor_->temporary_tables_ = &temporary_tables_;
110 
112  auto exec_desc_ptr = ed_seq.getDescriptor(0);
113  CHECK(exec_desc_ptr);
114  auto& exec_desc = *exec_desc_ptr;
115  const auto body = exec_desc.getBody();
116  if (body->isNop()) {
117  return 0;
118  }
119 
120  const auto project = dynamic_cast<const RelProject*>(body);
121  if (project) {
122  auto work_unit =
123  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
124 
125  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
126  executor_);
127  }
128 
129  const auto compound = dynamic_cast<const RelCompound*>(body);
130  if (compound) {
131  if (compound->isDeleteViaSelect()) {
132  return 0;
133  } else if (compound->isUpdateViaSelect()) {
134  return 0;
135  } else {
136  if (compound->isAggregate()) {
137  return 0;
138  }
139 
140  const auto work_unit =
141  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
142 
143  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
144  executor_);
145  }
146  }
147 
148  return 0;
149 }
150 
152  const ExecutionOptions& eo,
153  const bool just_explain_plan,
154  RenderInfo* render_info) {
155  CHECK(query_dag_);
156  auto timer = DEBUG_TIMER(__func__);
158  try {
159  return executeRelAlgQueryNoRetry(co, eo, just_explain_plan, render_info);
160  } catch (const QueryMustRunOnCpu&) {
161  if (!g_allow_cpu_retry) {
162  throw;
163  }
164  }
165  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
166  auto co_cpu = CompilationOptions::makeCpuOnly(co);
167 
168  if (render_info) {
169  render_info->setForceNonInSituData();
170  }
171  return executeRelAlgQueryNoRetry(co_cpu, eo, just_explain_plan, render_info);
172 }
173 
175  const ExecutionOptions& eo,
176  const bool just_explain_plan,
177  RenderInfo* render_info) {
179  auto timer = DEBUG_TIMER(__func__);
180  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
181 
182  query_dag_->resetQueryExecutionState();
183  const auto& ra = query_dag_->getRootNode();
184 
185  // capture the lock acquistion time
186  auto clock_begin = timer_start();
188  executor_->resetInterrupt();
189  }
190 
191  std::string query_session = "";
193  // a request of query execution without session id can happen, i.e., test query
194  // if so, we turn back to the original way: a runtime query interrupt
195  // without per-session management (as similar to dynamic watchdog)
196  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
197  query_session = query_state_->getConstSessionInfo()->get_session_id();
198  } else if (executor_->getCurrentQuerySession() != query_session) {
199  query_session = executor_->getCurrentQuerySession();
200  }
201  if (query_session != "") {
202  // if session is valid, then we allow per-session runtime query interrupt
203  executor_->addToQuerySessionList(query_session);
204  // hybrid spinlock. if it fails to acquire a lock, then
205  // it sleeps {g_runtime_query_interrupt_frequency} millisecond.
206  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
207  // failed to get the spinlock: check whether query is interrupted
208  if (executor_->checkIsQuerySessionInterrupted(query_session)) {
209  executor_->removeFromQuerySessionList(query_session);
210  VLOG(1) << "Kill the Interrupted pending query.";
211  throw std::runtime_error(
212  "Query execution has been interrupted (pending query).");
213  }
214  // here it fails to acquire the lock
215  std::this_thread::sleep_for(
216  std::chrono::milliseconds(g_runtime_query_interrupt_frequency));
217  };
218  }
219  // currently, atomic_flag does not provide a way to get its current status,
220  // i.e., spinlock.is_locked(), so we additionally lock the execute_mutex_
221  // right after acquiring spinlock to let other part of the code can know
222  // whether there exists a running query on the executor
223  }
224  std::lock_guard<std::mutex> lock(executor_->execute_mutex_);
225 
226  ScopeGuard clearRuntimeInterruptStatus = [this] {
227  // reset the runtime query interrupt status
229  executor_->removeFromQuerySessionList(executor_->getCurrentQuerySession());
230  executor_->invalidateQuerySession();
231  executor_->resetInterrupt();
232  executor_->execute_spin_lock_.clear(std::memory_order_acquire);
233  VLOG(1) << "RESET runtime query interrupt status of Executor " << this;
234  }
235  };
236 
238  // make sure to set the running session ID
239  executor_->invalidateQuerySession();
240  executor_->setCurrentQuerySession(query_session);
241  }
242 
243  int64_t queue_time_ms = timer_stop(clock_begin);
244  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
245  const auto phys_inputs = get_physical_inputs(cat_, &ra);
246  const auto phys_table_ids = get_physical_table_inputs(&ra);
247  executor_->setCatalog(&cat_);
248  executor_->setupCaching(phys_inputs, phys_table_ids);
249 
250  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
251  auto ed_seq = RaExecutionSequence(&ra);
252 
253  if (just_explain_plan) {
254  std::stringstream ss;
255  std::vector<const RelAlgNode*> nodes;
256  for (size_t i = 0; i < ed_seq.size(); i++) {
257  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
258  }
259  size_t ctr = nodes.size();
260  size_t tab_ctr = 0;
261  for (auto& body : boost::adaptors::reverse(nodes)) {
262  const auto index = ctr--;
263  const auto tabs = std::string(tab_ctr++, '\t');
264  CHECK(body);
265  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
266  if (auto sort = dynamic_cast<const RelSort*>(body)) {
267  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
268  }
269  }
270  auto rs = std::make_shared<ResultSet>(ss.str());
271  return {rs, {}};
272  }
273 
274  if (render_info) {
275  // set render to be non-insitu in certain situations.
277  ed_seq.size() > 1) {
278  // old logic
279  // disallow if more than one ED
280  render_info->setInSituDataIfUnset(false);
281  }
282  }
283 
284  if (eo.find_push_down_candidates) {
285  // this extra logic is mainly due to current limitations on multi-step queries
286  // and/or subqueries.
288  ed_seq, co, eo, render_info, queue_time_ms);
289  }
290  timer_setup.stop();
291 
292  // Dispatch the subqueries first
293  for (auto subquery : getSubqueries()) {
294  const auto subquery_ra = subquery->getRelAlg();
295  CHECK(subquery_ra);
296  if (subquery_ra->hasContextData()) {
297  continue;
298  }
299  // Execute the subquery and cache the result.
300  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
301  RaExecutionSequence subquery_seq(subquery_ra);
302  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
303  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
304  }
305  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
306 }
307 
309  AggregatedColRange agg_col_range_cache;
310  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
311  return executor_->computeColRangesCache(phys_inputs);
312 }
313 
315  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
316  return executor_->computeStringDictionaryGenerations(phys_inputs);
317 }
318 
320  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
321  return executor_->computeTableGenerations(phys_table_ids);
322 }
323 
324 Executor* RelAlgExecutor::getExecutor() const {
325  return executor_;
326 }
327 
329  CHECK(executor_);
330  executor_->row_set_mem_owner_ = nullptr;
331  executor_->lit_str_dict_proxy_ = nullptr;
332 }
333 
334 namespace {
335 
336 inline void check_sort_node_source_constraint(const RelSort* sort) {
337  CHECK_EQ(size_t(1), sort->inputCount());
338  const auto source = sort->getInput(0);
339  if (dynamic_cast<const RelSort*>(source)) {
340  throw std::runtime_error("Sort node not supported as input to another sort");
341  }
342 }
343 
344 } // namespace
345 
347  const RaExecutionSequence& seq,
348  const size_t step_idx,
349  const CompilationOptions& co,
350  const ExecutionOptions& eo,
351  RenderInfo* render_info) {
352  INJECT_TIMER(executeRelAlgQueryStep);
353  auto exe_desc_ptr = seq.getDescriptor(step_idx);
354  CHECK(exe_desc_ptr);
355  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
356 
357  size_t shard_count{0};
358  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
359  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
360  };
361 
362  if (sort) {
364  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
366  source_work_unit.exe_unit, *executor_->getCatalog());
367  if (!shard_count) {
368  // No point in sorting on the leaf, only execute the input to the sort node.
369  CHECK_EQ(size_t(1), sort->inputCount());
370  const auto source = sort->getInput(0);
371  if (sort->collationCount() || node_is_aggregate(source)) {
372  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
373  CHECK_EQ(temp_seq.size(), size_t(1));
374  // Use subseq to avoid clearing existing temporary tables
375  return {executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo, nullptr, 0),
376  merge_type(source),
377  source->getId(),
378  false};
379  }
380  }
381  }
382  return {executeRelAlgSubSeq(seq,
383  std::make_pair(step_idx, step_idx + 1),
384  co,
385  eo,
386  render_info,
388  merge_type(exe_desc_ptr->getBody()),
389  exe_desc_ptr->getBody()->getId(),
390  false};
391 }
392 
394  const AggregatedColRange& agg_col_range,
395  const StringDictionaryGenerations& string_dictionary_generations,
396  const TableGenerations& table_generations) {
397  // capture the lock acquistion time
398  auto clock_begin = timer_start();
400  executor_->resetInterrupt();
401  }
402  queue_time_ms_ = timer_stop(clock_begin);
403  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
404  executor_->table_generations_ = table_generations;
405  executor_->agg_col_range_cache_ = agg_col_range;
406  executor_->string_dictionary_generations_ = string_dictionary_generations;
407 }
408 
410  const CompilationOptions& co,
411  const ExecutionOptions& eo,
412  RenderInfo* render_info,
413  const int64_t queue_time_ms,
414  const bool with_existing_temp_tables) {
416  auto timer = DEBUG_TIMER(__func__);
417  if (!with_existing_temp_tables) {
418  decltype(temporary_tables_)().swap(temporary_tables_);
419  }
420  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
421  executor_->catalog_ = &cat_;
422  executor_->temporary_tables_ = &temporary_tables_;
423 
424  time(&now_);
425  CHECK(!seq.empty());
426  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
427 
428  for (size_t i = 0; i < exec_desc_count; i++) {
429  // only render on the last step
430  try {
431  executeRelAlgStep(seq,
432  i,
433  co,
434  eo,
435  (i == exec_desc_count - 1) ? render_info : nullptr,
436  queue_time_ms);
437  } catch (const NativeExecutionError&) {
438  if (!g_enable_interop) {
439  throw;
440  }
441  auto eo_extern = eo;
442  eo_extern.executor_type = ::ExecutorType::Extern;
443  auto exec_desc_ptr = seq.getDescriptor(i);
444  const auto body = exec_desc_ptr->getBody();
445  const auto compound = dynamic_cast<const RelCompound*>(body);
446  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
447  LOG(INFO) << "Also failed to run the query using interoperability";
448  throw;
449  }
450  executeRelAlgStep(seq,
451  i,
452  co,
453  eo_extern,
454  (i == exec_desc_count - 1) ? render_info : nullptr,
455  queue_time_ms);
456  }
457  }
458 
459  return seq.getDescriptor(exec_desc_count - 1)->getResult();
460 }
461 
463  const RaExecutionSequence& seq,
464  const std::pair<size_t, size_t> interval,
465  const CompilationOptions& co,
466  const ExecutionOptions& eo,
467  RenderInfo* render_info,
468  const int64_t queue_time_ms) {
470  executor_->catalog_ = &cat_;
471  executor_->temporary_tables_ = &temporary_tables_;
472 
473  time(&now_);
474  CHECK(!eo.just_explain);
475 
476  for (size_t i = interval.first; i < interval.second; i++) {
477  // only render on the last step
478  executeRelAlgStep(seq,
479  i,
480  co,
481  eo,
482  (i == interval.second - 1) ? render_info : nullptr,
483  queue_time_ms);
484  }
485 
486  return seq.getDescriptor(interval.second - 1)->getResult();
487 }
488 
490  const size_t step_idx,
491  const CompilationOptions& co,
492  const ExecutionOptions& eo,
493  RenderInfo* render_info,
494  const int64_t queue_time_ms) {
496  auto timer = DEBUG_TIMER(__func__);
498  auto exec_desc_ptr = seq.getDescriptor(step_idx);
499  CHECK(exec_desc_ptr);
500  auto& exec_desc = *exec_desc_ptr;
501  const auto body = exec_desc.getBody();
502  if (body->isNop()) {
503  handleNop(exec_desc);
504  return;
505  }
506  const ExecutionOptions eo_work_unit{
508  eo.allow_multifrag,
509  eo.just_explain,
510  eo.allow_loop_joins,
511  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
512  eo.jit_debug,
513  eo.just_validate,
521  eo.executor_type,
522  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
523 
524  const auto compound = dynamic_cast<const RelCompound*>(body);
525  if (compound) {
526  if (compound->isDeleteViaSelect()) {
527  executeDelete(compound, co, eo_work_unit, queue_time_ms);
528  } else if (compound->isUpdateViaSelect()) {
529  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
530  } else {
531  exec_desc.setResult(
532  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
533  if (exec_desc.getResult().isFilterPushDownEnabled()) {
534  return;
535  }
536  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
537  }
538  return;
539  }
540  const auto project = dynamic_cast<const RelProject*>(body);
541  if (project) {
542  if (project->isDeleteViaSelect()) {
543  executeDelete(project, co, eo_work_unit, queue_time_ms);
544  } else if (project->isUpdateViaSelect()) {
545  executeUpdate(project, co, eo_work_unit, queue_time_ms);
546  } else {
547  ssize_t prev_count = -1;
548  // Disabling the intermediate count optimization in distributed, as the previous
549  // execution descriptor will likely not hold the aggregated result.
550  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
551  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
552  CHECK(prev_exec_desc);
553  // If the previous node produced a reliable count, skip the pre-flight count
554  if (dynamic_cast<const RelCompound*>(prev_exec_desc->getBody()) ||
555  dynamic_cast<const RelLogicalValues*>(prev_exec_desc->getBody())) {
556  const auto& prev_exe_result = prev_exec_desc->getResult();
557  const auto prev_result = prev_exe_result.getRows();
558  if (prev_result) {
559  prev_count = static_cast<ssize_t>(prev_result->rowCount());
560  }
561  }
562  }
563  exec_desc.setResult(executeProject(
564  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
565  if (exec_desc.getResult().isFilterPushDownEnabled()) {
566  return;
567  }
568  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
569  }
570  return;
571  }
572  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
573  if (aggregate) {
574  exec_desc.setResult(
575  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
576  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
577  return;
578  }
579  const auto filter = dynamic_cast<const RelFilter*>(body);
580  if (filter) {
581  exec_desc.setResult(
582  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
583  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
584  return;
585  }
586  const auto sort = dynamic_cast<const RelSort*>(body);
587  if (sort) {
588  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
589  if (exec_desc.getResult().isFilterPushDownEnabled()) {
590  return;
591  }
592  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
593  return;
594  }
595  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
596  if (logical_values) {
597  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
598  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
599  return;
600  }
601  const auto modify = dynamic_cast<const RelModify*>(body);
602  if (modify) {
603  exec_desc.setResult(executeModify(modify, eo_work_unit));
604  return;
605  }
606  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
607  if (table_func) {
608  exec_desc.setResult(
609  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
610  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
611  return;
612  }
613  CHECK(false);
614 }
615 
617  // just set the result of the previous node as the result of no op
618  auto body = ed.getBody();
619  CHECK(dynamic_cast<const RelAggregate*>(body));
620  CHECK_EQ(size_t(1), body->inputCount());
621  const auto input = body->getInput(0);
622  body->setOutputMetainfo(input->getOutputMetainfo());
623  const auto it = temporary_tables_.find(-input->getId());
624  CHECK(it != temporary_tables_.end());
625  // set up temp table as it could be used by the outer query or next step
626  addTemporaryTable(-body->getId(), it->second);
627 
628  ed.setResult({it->second, input->getOutputMetainfo()});
629 }
630 
631 namespace {
632 
633 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
634  public:
636 
637  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
638  return synthesized_physical_inputs_owned;
639  }
640 
641  std::unordered_set<const RexInput*> visitInput(
642  const RexInput* rex_input) const override {
643  const auto input_ra = rex_input->getSourceNode();
644  CHECK(input_ra);
645  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
646  if (scan_ra) {
647  const auto td = scan_ra->getTableDescriptor();
648  if (td) {
649  const auto col_id = rex_input->getIndex();
650  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
651  if (cd && cd->columnType.get_physical_cols() > 0) {
652  CHECK(IS_GEO(cd->columnType.get_type()));
653  std::unordered_set<const RexInput*> synthesized_physical_inputs;
654  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
655  auto physical_input =
656  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
657  synthesized_physical_inputs_owned.emplace_back(physical_input);
658  synthesized_physical_inputs.insert(physical_input);
659  }
660  return synthesized_physical_inputs;
661  }
662  }
663  }
664  return {rex_input};
665  }
666 
667  protected:
668  std::unordered_set<const RexInput*> aggregateResult(
669  const std::unordered_set<const RexInput*>& aggregate,
670  const std::unordered_set<const RexInput*>& next_result) const override {
671  auto result = aggregate;
672  result.insert(next_result.begin(), next_result.end());
673  return result;
674  }
675 
676  private:
677  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
679 };
680 
681 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
682  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
683  CHECK_EQ(size_t(2), join->inputCount());
684  return join;
685  }
686  CHECK_EQ(size_t(1), ra_node->inputCount());
687  auto only_src = ra_node->getInput(0);
688  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
689  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
690  return is_join ? only_src : ra_node;
691 }
692 
693 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
695  RexUsedInputsVisitor visitor(cat);
696  const auto filter_expr = compound->getFilterExpr();
697  std::unordered_set<const RexInput*> used_inputs =
698  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
699  const auto sources_size = compound->getScalarSourcesSize();
700  for (size_t i = 0; i < sources_size; ++i) {
701  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
702  used_inputs.insert(source_inputs.begin(), source_inputs.end());
703  }
704  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
705  return std::make_pair(used_inputs, used_inputs_owned);
706 }
707 
708 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
710  CHECK_EQ(size_t(1), aggregate->inputCount());
711  std::unordered_set<const RexInput*> used_inputs;
712  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
713  const auto source = aggregate->getInput(0);
714  const auto& in_metainfo = source->getOutputMetainfo();
715  const auto group_count = aggregate->getGroupByCount();
716  CHECK_GE(in_metainfo.size(), group_count);
717  for (size_t i = 0; i < group_count; ++i) {
718  auto synthesized_used_input = new RexInput(source, i);
719  used_inputs_owned.emplace_back(synthesized_used_input);
720  used_inputs.insert(synthesized_used_input);
721  }
722  for (const auto& agg_expr : aggregate->getAggExprs()) {
723  for (size_t i = 0; i < agg_expr->size(); ++i) {
724  const auto operand_idx = agg_expr->getOperand(i);
725  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
726  auto synthesized_used_input = new RexInput(source, operand_idx);
727  used_inputs_owned.emplace_back(synthesized_used_input);
728  used_inputs.insert(synthesized_used_input);
729  }
730  }
731  return std::make_pair(used_inputs, used_inputs_owned);
732 }
733 
734 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
736  RexUsedInputsVisitor visitor(cat);
737  std::unordered_set<const RexInput*> used_inputs;
738  for (size_t i = 0; i < project->size(); ++i) {
739  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
740  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
741  }
742  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
743  return std::make_pair(used_inputs, used_inputs_owned);
744 }
745 
746 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
748  const Catalog_Namespace::Catalog& cat) {
749  RexUsedInputsVisitor visitor(cat);
750  std::unordered_set<const RexInput*> used_inputs;
751  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
752  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
753  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
754  }
755  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
756  return std::make_pair(used_inputs, used_inputs_owned);
757 }
758 
759 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
761  std::unordered_set<const RexInput*> used_inputs;
762  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
763  const auto data_sink_node = get_data_sink(filter);
764  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
765  const auto source = data_sink_node->getInput(nest_level);
766  const auto scan_source = dynamic_cast<const RelScan*>(source);
767  if (scan_source) {
768  CHECK(source->getOutputMetainfo().empty());
769  for (size_t i = 0; i < scan_source->size(); ++i) {
770  auto synthesized_used_input = new RexInput(scan_source, i);
771  used_inputs_owned.emplace_back(synthesized_used_input);
772  used_inputs.insert(synthesized_used_input);
773  }
774  } else {
775  const auto& partial_in_metadata = source->getOutputMetainfo();
776  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
777  auto synthesized_used_input = new RexInput(source, i);
778  used_inputs_owned.emplace_back(synthesized_used_input);
779  used_inputs.insert(synthesized_used_input);
780  }
781  }
782  }
783  return std::make_pair(used_inputs, used_inputs_owned);
784 }
785 
786 int table_id_from_ra(const RelAlgNode* ra_node) {
787  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
788  if (scan_ra) {
789  const auto td = scan_ra->getTableDescriptor();
790  CHECK(td);
791  return td->tableId;
792  }
793  return -ra_node->getId();
794 }
795 
796 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
797  const RelAlgNode* ra_node,
798  const std::vector<size_t>& input_permutation) {
799  const auto data_sink_node = get_data_sink(ra_node);
800  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
801  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
802  const auto input_node_idx =
803  input_permutation.empty() ? input_idx : input_permutation[input_idx];
804  const auto input_ra = data_sink_node->getInput(input_node_idx);
805  const auto it_ok = input_to_nest_level.emplace(input_ra, input_idx);
806  CHECK(it_ok.second);
807  LOG_IF(INFO, !input_permutation.empty())
808  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
809  }
810  return input_to_nest_level;
811 }
812 
813 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
815  const Catalog_Namespace::Catalog& cat) {
816  const auto data_sink_node = get_data_sink(ra_node);
817  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
818  CHECK_EQ(join->inputCount(), 2u);
819  const auto condition = join->getCondition();
820  RexUsedInputsVisitor visitor(cat);
821  auto condition_inputs = visitor.visit(condition);
822  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
823  visitor.get_inputs_owned());
824  return std::make_pair(condition_inputs, condition_inputs_owned);
825  }
826 
827  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
828  CHECK_GE(left_deep_join->inputCount(), 2u);
829  const auto condition = left_deep_join->getInnerCondition();
830  RexUsedInputsVisitor visitor(cat);
831  auto result = visitor.visit(condition);
832  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
833  ++nesting_level) {
834  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
835  if (outer_condition) {
836  const auto outer_result = visitor.visit(outer_condition);
837  result.insert(outer_result.begin(), outer_result.end());
838  }
839  }
840  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
841  return std::make_pair(result, used_inputs_owned);
842  }
843 
844  CHECK_EQ(ra_node->inputCount(), 1u);
845  return std::make_pair(std::unordered_set<const RexInput*>{},
846  std::vector<std::shared_ptr<RexInput>>{});
847 }
848 
849 std::vector<const RelAlgNode*> get_non_join_sequence(const RelAlgNode* ra) {
850  std::vector<const RelAlgNode*> seq;
851  for (auto join = dynamic_cast<const RelJoin*>(ra); join;
852  join = static_cast<const RelJoin*>(join->getInput(0))) {
853  CHECK_EQ(size_t(2), join->inputCount());
854  seq.emplace_back(join->getInput(1));
855  auto lhs = join->getInput(0);
856  if (!dynamic_cast<const RelJoin*>(lhs)) {
857  seq.emplace_back(lhs);
858  break;
859  }
860  }
861  std::reverse(seq.begin(), seq.end());
862  return seq;
863 }
864 
866  std::vector<InputDescriptor>& input_descs,
867  const Catalog_Namespace::Catalog& cat,
868  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
869  const RelAlgNode* ra_node,
870  const std::unordered_set<const RexInput*>& source_used_inputs,
871  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
872  std::unordered_set<InputDescriptor> input_descs_unique(input_descs.begin(),
873  input_descs.end());
874  const auto non_join_src_seq = get_non_join_sequence(get_data_sink(ra_node));
875  std::unordered_map<const RelAlgNode*, int> non_join_to_nest_level;
876  for (const auto node : non_join_src_seq) {
877  non_join_to_nest_level.insert(std::make_pair(node, non_join_to_nest_level.size()));
878  }
879  for (const auto used_input : source_used_inputs) {
880  const auto input_ra = used_input->getSourceNode();
881  const int table_id = table_id_from_ra(input_ra);
882  const auto col_id = used_input->getIndex();
883  auto it = input_to_nest_level.find(input_ra);
884  if (it == input_to_nest_level.end()) {
885  throw std::runtime_error("Bushy joins not supported");
886  }
887  const int input_desc = it->second;
888  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
889  dynamic_cast<const RelScan*>(input_ra)
890  ? cat.getColumnIdBySpi(table_id, col_id + 1)
891  : col_id,
892  table_id,
893  input_desc));
894  }
895 }
896 
897 template <class RA>
898 std::pair<std::vector<InputDescriptor>,
899  std::list<std::shared_ptr<const InputColDescriptor>>>
900 get_input_desc_impl(const RA* ra_node,
901  const std::unordered_set<const RexInput*>& used_inputs,
902  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
903  const std::vector<size_t>& input_permutation,
904  const Catalog_Namespace::Catalog& cat) {
905  std::vector<InputDescriptor> input_descs;
906  const auto data_sink_node = get_data_sink(ra_node);
907  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
908  const auto input_node_idx =
909  input_permutation.empty() ? input_idx : input_permutation[input_idx];
910  const auto input_ra = data_sink_node->getInput(input_node_idx);
911  const int table_id = table_id_from_ra(input_ra);
912  input_descs.emplace_back(table_id, input_idx);
913  }
914  std::sort(input_descs.begin(),
915  input_descs.end(),
916  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
917  return lhs.getNestLevel() < rhs.getNestLevel();
918  });
919  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
920  collect_used_input_desc(input_descs,
921  cat,
922  input_col_descs_unique,
923  ra_node,
924  used_inputs,
925  input_to_nest_level);
926  std::unordered_set<const RexInput*> join_source_used_inputs;
927  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
928  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
929  get_join_source_used_inputs(ra_node, cat);
930  collect_used_input_desc(input_descs,
931  cat,
932  input_col_descs_unique,
933  ra_node,
934  join_source_used_inputs,
935  input_to_nest_level);
936  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
937  input_col_descs_unique.begin(), input_col_descs_unique.end());
938 
939  std::sort(
940  input_col_descs.begin(),
941  input_col_descs.end(),
942  [](std::shared_ptr<const InputColDescriptor> const& lhs,
943  std::shared_ptr<const InputColDescriptor> const& rhs) {
944  if (lhs->getScanDesc().getNestLevel() == rhs->getScanDesc().getNestLevel()) {
945  return lhs->getColId() < rhs->getColId();
946  }
947  return lhs->getScanDesc().getNestLevel() < rhs->getScanDesc().getNestLevel();
948  });
949  return {input_descs,
950  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
951  input_col_descs.end())};
952 }
953 
954 template <class RA>
955 std::tuple<std::vector<InputDescriptor>,
956  std::list<std::shared_ptr<const InputColDescriptor>>,
957  std::vector<std::shared_ptr<RexInput>>>
958 get_input_desc(const RA* ra_node,
959  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
960  const std::vector<size_t>& input_permutation,
961  const Catalog_Namespace::Catalog& cat) {
962  std::unordered_set<const RexInput*> used_inputs;
963  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
964  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
965  auto input_desc_pair = get_input_desc_impl(
966  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
967  return std::make_tuple(
968  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
969 }
970 
971 size_t get_scalar_sources_size(const RelCompound* compound) {
972  return compound->getScalarSourcesSize();
973 }
974 
975 size_t get_scalar_sources_size(const RelProject* project) {
976  return project->size();
977 }
978 
979 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
980  return table_func->getTableFuncInputsSize();
981 }
982 
983 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
984  return compound->getScalarSource(i);
985 }
986 
987 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
988  return project->getProjectAt(i);
989 }
990 
991 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
992  return table_func->getTableFuncInputAt(i);
993 }
994 
995 std::shared_ptr<Analyzer::Expr> set_transient_dict(
996  const std::shared_ptr<Analyzer::Expr> expr) {
997  const auto& ti = expr->get_type_info();
998  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
999  return expr;
1000  }
1001  auto transient_dict_ti = ti;
1002  transient_dict_ti.set_compression(kENCODING_DICT);
1003  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1004  transient_dict_ti.set_fixed_size();
1005  return expr->add_cast(transient_dict_ti);
1006 }
1007 
1009  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1010  const std::shared_ptr<Analyzer::Expr>& expr) {
1011  try {
1012  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1013  } catch (...) {
1014  scalar_sources.push_back(fold_expr(expr.get()));
1015  }
1016 }
1017 
1018 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1019  const std::shared_ptr<Analyzer::Expr>& input) {
1020  const auto& input_ti = input->get_type_info();
1021  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1022  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1023  }
1024  return input;
1025 }
1026 
1027 template <class RA>
1028 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1029  const RA* ra_node,
1030  const RelAlgTranslator& translator,
1031  const ::ExecutorType executor_type) {
1032  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1033  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1034  const auto scalar_rex = scalar_at(i, ra_node);
1035  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1036  // RexRef are synthetic scalars we append at the end of the real ones
1037  // for the sake of taking memory ownership, no real work needed here.
1038  continue;
1039  }
1040 
1041  const auto scalar_expr =
1042  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1043  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1044  if (executor_type == ExecutorType::Native) {
1045  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1046  } else {
1047  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1048  }
1049  }
1050 
1051  return scalar_sources;
1052 }
1053 
1054 template <class RA>
1055 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1056  const RA* ra_node,
1057  const RelAlgTranslator& translator,
1058  int32_t tableId,
1059  const Catalog_Namespace::Catalog& cat,
1060  const ColumnNameList& colNames,
1061  size_t starting_projection_column_idx) {
1062  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1063  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1064  const auto scalar_rex = scalar_at(i, ra_node);
1065  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1066  // RexRef are synthetic scalars we append at the end of the real ones
1067  // for the sake of taking memory ownership, no real work needed here.
1068  continue;
1069  }
1070 
1071  std::shared_ptr<Analyzer::Expr> translated_expr;
1072  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1073  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1074  tableId,
1075  cat,
1076  colNames[i - starting_projection_column_idx]);
1077  } else {
1078  translated_expr = translator.translateScalarRex(scalar_rex);
1079  }
1080  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1081  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1082  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1083  }
1084 
1085  return scalar_sources;
1086 }
1087 
1088 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1089  const RelCompound* compound,
1090  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1091  if (!compound->isAggregate()) {
1092  return {nullptr};
1093  }
1094  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1095  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1096  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1097  }
1098  return groupby_exprs;
1099 }
1100 
1101 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1102  const RelAggregate* aggregate,
1103  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1104  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1105  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1106  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1107  }
1108  return groupby_exprs;
1109 }
1110 
1112  const RelAlgTranslator& translator) {
1113  const auto filter_rex = compound->getFilterExpr();
1114  const auto filter_expr =
1115  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1116  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1118 }
1119 
1120 std::vector<Analyzer::Expr*> translate_targets(
1121  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1122  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1123  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1124  const RelCompound* compound,
1125  const RelAlgTranslator& translator,
1126  const ExecutorType executor_type) {
1127  std::vector<Analyzer::Expr*> target_exprs;
1128  for (size_t i = 0; i < compound->size(); ++i) {
1129  const auto target_rex = compound->getTargetExpr(i);
1130  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1131  std::shared_ptr<Analyzer::Expr> target_expr;
1132  if (target_rex_agg) {
1133  target_expr =
1134  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1135  } else {
1136  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1137  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1138  if (target_rex_ref) {
1139  const auto ref_idx = target_rex_ref->getIndex();
1140  CHECK_GE(ref_idx, size_t(1));
1141  CHECK_LE(ref_idx, groupby_exprs.size());
1142  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1143  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1144  } else {
1145  target_expr = translator.translateScalarRex(target_rex_scalar);
1146  auto rewritten_expr = rewrite_expr(target_expr.get());
1147  target_expr = fold_expr(rewritten_expr.get());
1148  if (executor_type == ExecutorType::Native) {
1149  try {
1150  target_expr = set_transient_dict(target_expr);
1151  } catch (...) {
1152  // noop
1153  }
1154  } else {
1155  target_expr = cast_dict_to_none(target_expr);
1156  }
1157  }
1158  }
1159  CHECK(target_expr);
1160  target_exprs_owned.push_back(target_expr);
1161  target_exprs.push_back(target_expr.get());
1162  }
1163  return target_exprs;
1164 }
1165 
1166 std::vector<Analyzer::Expr*> translate_targets(
1167  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1168  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1169  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1170  const RelAggregate* aggregate,
1171  const RelAlgTranslator& translator) {
1172  std::vector<Analyzer::Expr*> target_exprs;
1173  size_t group_key_idx = 0;
1174  for (const auto& groupby_expr : groupby_exprs) {
1175  auto target_expr =
1176  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1177  target_exprs_owned.push_back(target_expr);
1178  target_exprs.push_back(target_expr.get());
1179  }
1180 
1181  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1182  auto target_expr =
1183  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1184  CHECK(target_expr);
1185  target_expr = fold_expr(target_expr.get());
1186  target_exprs_owned.push_back(target_expr);
1187  target_exprs.push_back(target_expr.get());
1188  }
1189  return target_exprs;
1190 }
1191 
1193  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1194  return agg_expr && agg_expr->get_is_distinct();
1195 }
1196 
1197 bool is_agg(const Analyzer::Expr* expr) {
1198  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1199  if (agg_expr && agg_expr->get_contains_agg()) {
1200  auto agg_type = agg_expr->get_aggtype();
1201  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1202  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1203  return true;
1204  }
1205  }
1206  return false;
1207 }
1208 
1210  if (is_count_distinct(&expr)) {
1211  return SQLTypeInfo(kBIGINT, false);
1212  } else if (is_agg(&expr)) {
1214  }
1215  return get_logical_type_info(expr.get_type_info());
1216 }
1217 
1218 template <class RA>
1219 std::vector<TargetMetaInfo> get_targets_meta(
1220  const RA* ra_node,
1221  const std::vector<Analyzer::Expr*>& target_exprs) {
1222  std::vector<TargetMetaInfo> targets_meta;
1223  for (size_t i = 0; i < ra_node->size(); ++i) {
1224  CHECK(target_exprs[i]);
1225  // TODO(alex): remove the count distinct type fixup.
1226  targets_meta.emplace_back(ra_node->getFieldName(i),
1227  get_logical_type_for_expr(*target_exprs[i]),
1228  target_exprs[i]->get_type_info());
1229  }
1230  return targets_meta;
1231 }
1232 
1233 } // namespace
1234 
1236  const CompilationOptions& co_in,
1237  const ExecutionOptions& eo_in,
1238  const int64_t queue_time_ms) {
1239  CHECK(node);
1240  auto timer = DEBUG_TIMER(__func__);
1241 
1243 
1244  auto co = co_in;
1245  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1246  // encoded string updates
1247 
1248  auto execute_update_for_node =
1249  [this, &co, &eo_in](const auto node, auto& work_unit, const bool is_aggregate) {
1250  UpdateTransactionParameters update_params(node->getModifiedTableDescriptor(),
1251  node->getTargetColumns(),
1252  node->getOutputMetainfo(),
1253  node->isVarlenUpdateRequired());
1254 
1255  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1256 
1257  auto execute_update_ra_exe_unit =
1258  [this, &co, &eo_in, &table_infos, &update_params](
1259  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1261 
1262  auto eo = eo_in;
1263  if (update_params.tableIsTemporary()) {
1264  eo.output_columnar_hint = true;
1265  co_project.allow_lazy_fetch = false;
1266  }
1267 
1268  auto update_callback = yieldUpdateCallback(update_params);
1269  executor_->executeUpdate(ra_exe_unit,
1270  table_infos,
1271  co_project,
1272  eo,
1273  cat_,
1274  executor_->row_set_mem_owner_,
1275  update_callback,
1276  is_aggregate);
1277  update_params.finalizeTransaction();
1278  };
1279 
1280  if (update_params.tableIsTemporary()) {
1281  // hold owned target exprs during execution if rewriting
1282  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1283  // rewrite temp table updates to generate the full column by moving the where
1284  // clause into a case if such a rewrite is not possible, bail on the update
1285  // operation build an expr for the update target
1286  const auto td = update_params.getTableDescriptor();
1287  CHECK(td);
1288  const auto update_column_names = update_params.getUpdateColumnNames();
1289  if (update_column_names.size() > 1) {
1290  throw std::runtime_error(
1291  "Multi-column update is not yet supported for temporary tables.");
1292  }
1293 
1294  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1295  CHECK(cd);
1296  auto projected_column_to_update =
1297  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1298  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1299  work_unit.exe_unit, projected_column_to_update);
1300  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1301  throw std::runtime_error(
1302  "Variable length updates not yet supported on temporary tables.");
1303  }
1304  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1305  } else {
1306  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1307  }
1308  };
1309 
1310  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1311  auto work_unit =
1312  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1313 
1314  execute_update_for_node(compound, work_unit, compound->isAggregate());
1315  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1316  auto work_unit =
1317  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1318 
1319  if (project->isSimple()) {
1320  CHECK_EQ(size_t(1), project->inputCount());
1321  const auto input_ra = project->getInput(0);
1322  if (dynamic_cast<const RelSort*>(input_ra)) {
1323  const auto& input_table =
1324  get_temporary_table(&temporary_tables_, -input_ra->getId());
1325  CHECK(input_table);
1326  work_unit.exe_unit.scan_limit = input_table->rowCount();
1327  }
1328  }
1329 
1330  execute_update_for_node(project, work_unit, false);
1331  } else {
1332  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1333  }
1334 }
1335 
1337  const CompilationOptions& co,
1338  const ExecutionOptions& eo_in,
1339  const int64_t queue_time_ms) {
1340  CHECK(node);
1341  auto timer = DEBUG_TIMER(__func__);
1342 
1344 
1345  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1346  auto& work_unit,
1347  const bool is_aggregate) {
1348  auto* table_descriptor = node->getModifiedTableDescriptor();
1349  CHECK(table_descriptor);
1350  if (!table_descriptor->hasDeletedCol) {
1351  throw std::runtime_error(
1352  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1353  }
1354 
1355  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1356 
1357  auto execute_delete_ra_exe_unit =
1358  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1359  const bool is_aggregate) {
1360  DeleteTransactionParameters delete_params(table_is_temporary(table_descriptor));
1361  auto delete_callback = yieldDeleteCallback(delete_params);
1363 
1364  auto eo = eo_in;
1365  if (delete_params.tableIsTemporary()) {
1366  eo.output_columnar_hint = true;
1367  co_delete.add_delete_column =
1368  false; // project the entire delete column for columnar update
1369  } else {
1370  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1371  }
1372 
1373  executor_->executeUpdate(exe_unit,
1374  table_infos,
1375  co_delete,
1376  eo,
1377  cat_,
1378  executor_->row_set_mem_owner_,
1379  delete_callback,
1380  is_aggregate);
1381  delete_params.finalizeTransaction();
1382  };
1383 
1384  if (table_is_temporary(table_descriptor)) {
1385  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1386  auto cd = cat_.getDeletedColumn(table_descriptor);
1387  CHECK(cd);
1388  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1389  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1390  const auto rewritten_exe_unit =
1391  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1392  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1393  } else {
1394  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1395  }
1396  };
1397 
1398  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1399  const auto work_unit =
1400  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1401  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1402  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1403  auto work_unit =
1404  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1405  if (project->isSimple()) {
1406  CHECK_EQ(size_t(1), project->inputCount());
1407  const auto input_ra = project->getInput(0);
1408  if (dynamic_cast<const RelSort*>(input_ra)) {
1409  const auto& input_table =
1410  get_temporary_table(&temporary_tables_, -input_ra->getId());
1411  CHECK(input_table);
1412  work_unit.exe_unit.scan_limit = input_table->rowCount();
1413  }
1414  }
1415  execute_delete_for_node(project, work_unit, false);
1416  } else {
1417  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1418  }
1419 }
1420 
1422  const CompilationOptions& co,
1423  const ExecutionOptions& eo,
1424  RenderInfo* render_info,
1425  const int64_t queue_time_ms) {
1426  auto timer = DEBUG_TIMER(__func__);
1427  const auto work_unit =
1428  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1429  CompilationOptions co_compound = co;
1430  return executeWorkUnit(work_unit,
1431  compound->getOutputMetainfo(),
1432  compound->isAggregate(),
1433  co_compound,
1434  eo,
1435  render_info,
1436  queue_time_ms);
1437 }
1438 
1440  const CompilationOptions& co,
1441  const ExecutionOptions& eo,
1442  RenderInfo* render_info,
1443  const int64_t queue_time_ms) {
1444  auto timer = DEBUG_TIMER(__func__);
1445  const auto work_unit = createAggregateWorkUnit(
1446  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1447  return executeWorkUnit(work_unit,
1448  aggregate->getOutputMetainfo(),
1449  true,
1450  co,
1451  eo,
1452  render_info,
1453  queue_time_ms);
1454 }
1455 
1456 namespace {
1457 
1458 // Returns true iff the execution unit contains window functions.
1460  return std::any_of(ra_exe_unit.target_exprs.begin(),
1461  ra_exe_unit.target_exprs.end(),
1462  [](const Analyzer::Expr* expr) {
1463  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1464  });
1465 }
1466 
1467 } // namespace
1468 
1470  const CompilationOptions& co,
1471  const ExecutionOptions& eo,
1472  RenderInfo* render_info,
1473  const int64_t queue_time_ms,
1474  const ssize_t previous_count) {
1475  auto timer = DEBUG_TIMER(__func__);
1476  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1477  CompilationOptions co_project = co;
1478  if (project->isSimple()) {
1479  CHECK_EQ(size_t(1), project->inputCount());
1480  const auto input_ra = project->getInput(0);
1481  if (dynamic_cast<const RelSort*>(input_ra)) {
1482  co_project.device_type = ExecutorDeviceType::CPU;
1483  const auto& input_table =
1484  get_temporary_table(&temporary_tables_, -input_ra->getId());
1485  CHECK(input_table);
1486  work_unit.exe_unit.scan_limit =
1487  std::min(input_table->getLimit(), input_table->rowCount());
1488  }
1489  }
1490  return executeWorkUnit(work_unit,
1491  project->getOutputMetainfo(),
1492  false,
1493  co_project,
1494  eo,
1495  render_info,
1496  queue_time_ms,
1497  previous_count);
1498 }
1499 
1501  const CompilationOptions& co_in,
1502  const ExecutionOptions& eo,
1503  const int64_t queue_time_ms) {
1505  auto timer = DEBUG_TIMER(__func__);
1506 
1507  auto co = co_in;
1508 
1509  if (g_cluster) {
1510  throw std::runtime_error("Table functions not supported in distributed mode yet");
1511  }
1512  if (!g_enable_table_functions) {
1513  throw std::runtime_error("Table function support is disabled");
1514  }
1515 
1516  auto table_func_work_unit = createTableFunctionWorkUnit(table_func, eo.just_explain);
1517  const auto body = table_func_work_unit.body;
1518  CHECK(body);
1519 
1520  const auto table_infos =
1521  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1522 
1523  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1524  co.device_type,
1526  nullptr,
1527  executor_),
1528  {}};
1529 
1530  try {
1531  result = {executor_->executeTableFunction(
1532  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1533  body->getOutputMetainfo()};
1534  } catch (const QueryExecutionError& e) {
1537  throw std::runtime_error("Table function ran out of memory during execution");
1538  }
1539  result.setQueueTime(queue_time_ms);
1540  return result;
1541 }
1542 
1543 namespace {
1544 
1545 // Creates a new expression which has the range table index set to 1. This is needed to
1546 // reuse the hash join construction helpers to generate a hash table for the window
1547 // function partition: create an equals expression with left and right sides identical
1548 // except for the range table index.
1549 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1550  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1551  if (tuple) {
1552  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1553  for (const auto& element : tuple->getTuple()) {
1554  transformed_tuple.push_back(transform_to_inner(element.get()));
1555  }
1556  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1557  }
1558  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1559  if (!col) {
1560  throw std::runtime_error("Only columns supported in the window partition for now");
1561  }
1562  return makeExpr<Analyzer::ColumnVar>(
1563  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1564 }
1565 
1566 } // namespace
1567 
1569  const CompilationOptions& co,
1570  const ExecutionOptions& eo,
1571  ColumnCacheMap& column_cache_map,
1572  const int64_t queue_time_ms) {
1573  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1574  CHECK_EQ(query_infos.size(), size_t(1));
1575  if (query_infos.front().info.fragments.size() != 1) {
1576  throw std::runtime_error(
1577  "Only single fragment tables supported for window functions for now");
1578  }
1579  if (eo.executor_type == ::ExecutorType::Extern) {
1580  return;
1581  }
1582  query_infos.push_back(query_infos.front());
1583  auto window_project_node_context = WindowProjectNodeContext::create();
1584  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1585  ++target_index) {
1586  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1587  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1588  if (!window_func) {
1589  continue;
1590  }
1591  // Always use baseline layout hash tables for now, make the expression a tuple.
1592  const auto& partition_keys = window_func->getPartitionKeys();
1593  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1594  if (partition_keys.size() > 1) {
1595  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1596  } else {
1597  if (partition_keys.empty()) {
1598  throw std::runtime_error(
1599  "Empty window function partitions are not supported yet");
1600  }
1601  CHECK_EQ(partition_keys.size(), size_t(1));
1602  partition_key_tuple = partition_keys.front();
1603  }
1604  // Creates a tautology equality with the partition expression on both sides.
1605  const auto partition_key_cond =
1606  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1607  kBW_EQ,
1608  kONE,
1609  partition_key_tuple,
1610  transform_to_inner(partition_key_tuple.get()));
1611  auto context = createWindowFunctionContext(
1612  window_func, partition_key_cond, ra_exe_unit, query_infos, co, column_cache_map);
1613  context->compute();
1614  window_project_node_context->addWindowFunctionContext(std::move(context),
1615  target_index);
1616  }
1617 }
1618 
1619 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1620  const Analyzer::WindowFunction* window_func,
1621  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1622  const RelAlgExecutionUnit& ra_exe_unit,
1623  const std::vector<InputTableInfo>& query_infos,
1624  const CompilationOptions& co,
1625  ColumnCacheMap& column_cache_map) {
1626  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1629  const auto join_table_or_err =
1630  executor_->buildHashTableForQualifier(partition_key_cond,
1631  query_infos,
1632  memory_level,
1634  column_cache_map);
1635  if (!join_table_or_err.fail_reason.empty()) {
1636  throw std::runtime_error(join_table_or_err.fail_reason);
1637  }
1638  CHECK(join_table_or_err.hash_table->getHashType() ==
1640  const auto& order_keys = window_func->getOrderKeys();
1641  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1642  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1643  auto context = std::make_unique<WindowFunctionContext>(
1644  window_func, join_table_or_err.hash_table, elem_count, co.device_type);
1645  for (const auto& order_key : order_keys) {
1646  const auto order_col =
1647  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1648  if (!order_col) {
1649  throw std::runtime_error("Only order by columns supported for now");
1650  }
1651  const int8_t* column;
1652  size_t join_col_elem_count;
1653  std::tie(column, join_col_elem_count) =
1655  *order_col,
1656  query_infos.front().info.fragments.front(),
1657  memory_level,
1658  0,
1659  chunks_owner,
1660  column_cache_map);
1661  CHECK_EQ(join_col_elem_count, elem_count);
1662  context->addOrderColumn(column, order_col.get(), chunks_owner);
1663  }
1664  return context;
1665 }
1666 
1668  const CompilationOptions& co,
1669  const ExecutionOptions& eo,
1670  RenderInfo* render_info,
1671  const int64_t queue_time_ms) {
1672  auto timer = DEBUG_TIMER(__func__);
1673  const auto work_unit =
1674  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1675  return executeWorkUnit(
1676  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1677 }
1678 
1680  const RelLogicalValues* logical_values,
1681  const ExecutionOptions& eo) {
1682  auto timer = DEBUG_TIMER(__func__);
1683  if (eo.just_explain) {
1684  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1685  }
1686 
1688  logical_values->getNumRows(),
1690  /*is_table_function=*/false);
1691 
1692  auto tuple_type = logical_values->getTupleType();
1693  for (size_t i = 0; i < tuple_type.size(); ++i) {
1694  auto& target_meta_info = tuple_type[i];
1695  if (target_meta_info.get_type_info().is_varlen()) {
1696  throw std::runtime_error("Variable length types not supported in VALUES yet.");
1697  }
1698  if (target_meta_info.get_type_info().get_type() == kNULLT) {
1699  // replace w/ bigint
1700  tuple_type[i] =
1701  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
1702  }
1703  query_mem_desc.addColSlotInfo(
1704  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
1705  }
1706  logical_values->setOutputMetainfo(tuple_type);
1707 
1708  std::vector<TargetInfo> target_infos;
1709  for (const auto& tuple_type_component : tuple_type) {
1710  target_infos.emplace_back(TargetInfo{false,
1711  kCOUNT,
1712  tuple_type_component.get_type_info(),
1713  SQLTypeInfo(kNULLT, false),
1714  false,
1715  false});
1716  }
1717  auto rs = std::make_shared<ResultSet>(target_infos,
1720  executor_->getRowSetMemoryOwner(),
1721  executor_);
1722 
1723  if (logical_values->hasRows()) {
1724  CHECK_EQ(logical_values->getRowsSize(), logical_values->size());
1725 
1726  auto storage = rs->allocateStorage();
1727  auto buff = storage->getUnderlyingBuffer();
1728 
1729  for (size_t i = 0; i < logical_values->getNumRows(); i++) {
1730  std::vector<std::shared_ptr<Analyzer::Expr>> row_literals;
1731  int8_t* ptr = buff + i * query_mem_desc.getRowSize();
1732 
1733  for (size_t j = 0; j < logical_values->getRowsSize(); j++) {
1734  auto rex_literal =
1735  dynamic_cast<const RexLiteral*>(logical_values->getValueAt(i, j));
1736  CHECK(rex_literal);
1737  const auto expr = RelAlgTranslator::translateLiteral(rex_literal);
1738  const auto constant = std::dynamic_pointer_cast<Analyzer::Constant>(expr);
1739  CHECK(constant);
1740 
1741  if (constant->get_is_null()) {
1742  CHECK(!target_infos[j].sql_type.is_varlen());
1743  *reinterpret_cast<int64_t*>(ptr) =
1744  inline_int_null_val(target_infos[j].sql_type);
1745  } else {
1746  const auto ti = constant->get_type_info();
1747  const auto datum = constant->get_constval();
1748 
1749  // Initialize the entire 8-byte slot
1750  *reinterpret_cast<int64_t*>(ptr) = EMPTY_KEY_64;
1751 
1752  const auto sz = ti.get_size();
1753  CHECK_GE(sz, int(0));
1754  std::memcpy(ptr, &datum, sz);
1755  }
1756  ptr += 8;
1757  }
1758  }
1759  }
1760  return {rs, tuple_type};
1761 }
1762 
1763 namespace {
1764 
1765 template <class T>
1766 int64_t insert_one_dict_str(T* col_data,
1767  const std::string& columnName,
1768  const SQLTypeInfo& columnType,
1769  const Analyzer::Constant* col_cv,
1770  const Catalog_Namespace::Catalog& catalog) {
1771  if (col_cv->get_is_null()) {
1772  *col_data = inline_fixed_encoding_null_val(columnType);
1773  } else {
1774  const int dict_id = columnType.get_comp_param();
1775  const auto col_datum = col_cv->get_constval();
1776  const auto& str = *col_datum.stringval;
1777  const auto dd = catalog.getMetadataForDict(dict_id);
1778  CHECK(dd && dd->stringDict);
1779  int32_t str_id = dd->stringDict->getOrAdd(str);
1780  if (!dd->dictIsTemp) {
1781  const auto checkpoint_ok = dd->stringDict->checkpoint();
1782  if (!checkpoint_ok) {
1783  throw std::runtime_error("Failed to checkpoint dictionary for column " +
1784  columnName);
1785  }
1786  }
1787  const bool invalid = str_id > max_valid_int_value<T>();
1788  if (invalid || str_id == inline_int_null_value<int32_t>()) {
1789  if (invalid) {
1790  LOG(ERROR) << "Could not encode string: " << str
1791  << ", the encoded value doesn't fit in " << sizeof(T) * 8
1792  << " bits. Will store NULL instead.";
1793  }
1794  str_id = inline_fixed_encoding_null_val(columnType);
1795  }
1796  *col_data = str_id;
1797  }
1798  return *col_data;
1799 }
1800 
1801 template <class T>
1802 int64_t insert_one_dict_str(T* col_data,
1803  const ColumnDescriptor* cd,
1804  const Analyzer::Constant* col_cv,
1805  const Catalog_Namespace::Catalog& catalog) {
1806  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
1807 }
1808 
1809 } // namespace
1810 
1812  const ExecutionOptions& eo) {
1813  auto timer = DEBUG_TIMER(__func__);
1814  if (eo.just_explain) {
1815  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
1816  }
1817 
1818  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
1821  executor_->getRowSetMemoryOwner(),
1822  executor_);
1823 
1824  std::vector<TargetMetaInfo> empty_targets;
1825  return {rs, empty_targets};
1826 }
1827 
1829  // Note: We currently obtain an executor for this method, but we do not need it.
1830  // Therefore, we skip the executor state setup in the regular execution path. In the
1831  // future, we will likely want to use the executor to evaluate expressions in the insert
1832  // statement.
1833 
1834  const auto& targets = query.get_targetlist();
1835  const int table_id = query.get_result_table_id();
1836  const auto& col_id_list = query.get_result_col_list();
1837 
1838  std::vector<const ColumnDescriptor*> col_descriptors;
1839  std::vector<int> col_ids;
1840  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
1841  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
1842  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
1843 
1844  const auto table_descriptor = cat_.getMetadataForTable(table_id);
1845  CHECK(table_descriptor);
1846  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
1847  const TableDescriptor* shard{nullptr};
1848 
1849  for (const int col_id : col_id_list) {
1850  const auto cd = get_column_descriptor(col_id, table_id, cat_);
1851  const auto col_enc = cd->columnType.get_compression();
1852  if (cd->columnType.is_string()) {
1853  switch (col_enc) {
1854  case kENCODING_NONE: {
1855  auto it_ok =
1856  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
1857  CHECK(it_ok.second);
1858  break;
1859  }
1860  case kENCODING_DICT: {
1861  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
1862  CHECK(dd);
1863  const auto it_ok = col_buffers.emplace(
1864  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
1865  CHECK(it_ok.second);
1866  break;
1867  }
1868  default:
1869  CHECK(false);
1870  }
1871  } else if (cd->columnType.is_geometry()) {
1872  auto it_ok =
1873  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
1874  CHECK(it_ok.second);
1875  } else if (cd->columnType.is_array()) {
1876  auto it_ok =
1877  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
1878  CHECK(it_ok.second);
1879  } else {
1880  const auto it_ok = col_buffers.emplace(
1881  col_id,
1882  std::unique_ptr<uint8_t[]>(
1883  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
1884  // the buffer
1885  CHECK(it_ok.second);
1886  }
1887  col_descriptors.push_back(cd);
1888  col_ids.push_back(col_id);
1889  }
1890  size_t col_idx = 0;
1892  insert_data.databaseId = cat_.getCurrentDB().dbId;
1893  insert_data.tableId = table_id;
1894  int64_t int_col_val{0};
1895  for (auto target_entry : targets) {
1896  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
1897  if (!col_cv) {
1898  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
1899  CHECK(col_cast);
1900  CHECK_EQ(kCAST, col_cast->get_optype());
1901  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
1902  }
1903  CHECK(col_cv);
1904  const auto cd = col_descriptors[col_idx];
1905  auto col_datum = col_cv->get_constval();
1906  auto col_type = cd->columnType.get_type();
1907  uint8_t* col_data_bytes{nullptr};
1908  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
1909  (!cd->columnType.is_string() ||
1910  cd->columnType.get_compression() == kENCODING_DICT)) {
1911  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
1912  CHECK(col_data_bytes_it != col_buffers.end());
1913  col_data_bytes = col_data_bytes_it->second.get();
1914  }
1915  switch (col_type) {
1916  case kBOOLEAN: {
1917  auto col_data = col_data_bytes;
1918  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1919  : (col_datum.boolval ? 1 : 0);
1920  break;
1921  }
1922  case kTINYINT: {
1923  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
1924  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1925  : col_datum.tinyintval;
1926  int_col_val = col_datum.tinyintval;
1927  break;
1928  }
1929  case kSMALLINT: {
1930  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
1931  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1932  : col_datum.smallintval;
1933  int_col_val = col_datum.smallintval;
1934  break;
1935  }
1936  case kINT: {
1937  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
1938  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1939  : col_datum.intval;
1940  int_col_val = col_datum.intval;
1941  break;
1942  }
1943  case kBIGINT:
1944  case kDECIMAL:
1945  case kNUMERIC: {
1946  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
1947  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1948  : col_datum.bigintval;
1949  int_col_val = col_datum.bigintval;
1950  break;
1951  }
1952  case kFLOAT: {
1953  auto col_data = reinterpret_cast<float*>(col_data_bytes);
1954  *col_data = col_datum.floatval;
1955  break;
1956  }
1957  case kDOUBLE: {
1958  auto col_data = reinterpret_cast<double*>(col_data_bytes);
1959  *col_data = col_datum.doubleval;
1960  break;
1961  }
1962  case kTEXT:
1963  case kVARCHAR:
1964  case kCHAR: {
1965  switch (cd->columnType.get_compression()) {
1966  case kENCODING_NONE:
1967  str_col_buffers[col_ids[col_idx]].push_back(
1968  col_datum.stringval ? *col_datum.stringval : "");
1969  break;
1970  case kENCODING_DICT: {
1971  switch (cd->columnType.get_size()) {
1972  case 1:
1973  int_col_val = insert_one_dict_str(
1974  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
1975  break;
1976  case 2:
1977  int_col_val = insert_one_dict_str(
1978  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
1979  break;
1980  case 4:
1981  int_col_val = insert_one_dict_str(
1982  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
1983  break;
1984  default:
1985  CHECK(false);
1986  }
1987  break;
1988  }
1989  default:
1990  CHECK(false);
1991  }
1992  break;
1993  }
1994  case kTIME:
1995  case kTIMESTAMP:
1996  case kDATE: {
1997  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
1998  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
1999  : col_datum.bigintval;
2000  break;
2001  }
2002  case kARRAY: {
2003  const auto is_null = col_cv->get_is_null();
2004  const auto size = cd->columnType.get_size();
2005  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2006  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2007  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2008  if (is_null && !is_point_coords) {
2009  if (size > 0) {
2010  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2011  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2012  throw std::runtime_error("Column " + cd->columnName +
2013  " doesn't accept NULL values");
2014  }
2015  int8_t* buf = (int8_t*)checked_malloc(size);
2016  put_null_array(static_cast<void*>(buf), elem_ti, "");
2017  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2018  p += elem_ti.get_size()) {
2019  put_null(static_cast<void*>(p), elem_ti, "");
2020  }
2021  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2022  } else {
2023  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2024  }
2025  break;
2026  }
2027  const auto l = col_cv->get_value_list();
2028  size_t len = l.size() * elem_ti.get_size();
2029  if (size > 0 && static_cast<size_t>(size) != len) {
2030  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2031  std::to_string(size / elem_ti.get_size()) +
2032  " values, " + "received " + std::to_string(l.size()));
2033  }
2034  if (elem_ti.is_string()) {
2035  CHECK(kENCODING_DICT == elem_ti.get_compression());
2036  CHECK(4 == elem_ti.get_size());
2037 
2038  int8_t* buf = (int8_t*)checked_malloc(len);
2039  int32_t* p = reinterpret_cast<int32_t*>(buf);
2040 
2041  int elemIndex = 0;
2042  for (auto& e : l) {
2043  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2044  CHECK(c);
2045 
2046  int_col_val = insert_one_dict_str(
2047  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2048 
2049  elemIndex++;
2050  }
2051  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2052 
2053  } else {
2054  int8_t* buf = (int8_t*)checked_malloc(len);
2055  int8_t* p = buf;
2056  for (auto& e : l) {
2057  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2058  CHECK(c);
2059  p = appendDatum(p, c->get_constval(), elem_ti);
2060  }
2061  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2062  }
2063  break;
2064  }
2065  case kPOINT:
2066  case kLINESTRING:
2067  case kPOLYGON:
2068  case kMULTIPOLYGON:
2069  str_col_buffers[col_ids[col_idx]].push_back(
2070  col_datum.stringval ? *col_datum.stringval : "");
2071  break;
2072  default:
2073  CHECK(false);
2074  }
2075  ++col_idx;
2076  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2077  const auto shard_count = shard_tables.size();
2078  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2079  shard = shard_tables[shard_idx];
2080  }
2081  }
2082  for (const auto& kv : col_buffers) {
2083  insert_data.columnIds.push_back(kv.first);
2084  DataBlockPtr p;
2085  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2086  insert_data.data.push_back(p);
2087  }
2088  for (auto& kv : str_col_buffers) {
2089  insert_data.columnIds.push_back(kv.first);
2090  DataBlockPtr p;
2091  p.stringsPtr = &kv.second;
2092  insert_data.data.push_back(p);
2093  }
2094  for (auto& kv : arr_col_buffers) {
2095  insert_data.columnIds.push_back(kv.first);
2096  DataBlockPtr p;
2097  p.arraysPtr = &kv.second;
2098  insert_data.data.push_back(p);
2099  }
2100  insert_data.numRows = 1;
2101  if (shard) {
2102  shard->fragmenter->insertData(insert_data);
2103  } else {
2104  table_descriptor->fragmenter->insertData(insert_data);
2105  }
2106 
2107  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2110  executor_->getRowSetMemoryOwner(),
2111  executor_);
2112  std::vector<TargetMetaInfo> empty_targets;
2113  return {rs, empty_targets};
2114 }
2115 
2116 namespace {
2117 
2118 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2119 // the executor interface to use the collation directly and remove this conversion.
2120 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2121  std::list<Analyzer::OrderEntry> result;
2122  for (size_t i = 0; i < sort->collationCount(); ++i) {
2123  const auto sort_field = sort->getCollation(i);
2124  result.emplace_back(sort_field.getField() + 1,
2125  sort_field.getSortDir() == SortDirection::Descending,
2126  sort_field.getNullsPosition() == NullSortedPosition::First);
2127  }
2128  return result;
2129 }
2130 
2131 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2132  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2133  if (aggregate) {
2134  return 0;
2135  }
2136  const auto compound = dynamic_cast<const RelCompound*>(ra);
2137  return (compound && compound->isAggregate()) ? 0 : limit;
2138 }
2139 
2140 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2141  return !order_entries.empty() && order_entries.front().is_desc;
2142 }
2143 
2144 } // namespace
2145 
2147  const CompilationOptions& co,
2148  const ExecutionOptions& eo,
2149  RenderInfo* render_info,
2150  const int64_t queue_time_ms) {
2151  auto timer = DEBUG_TIMER(__func__);
2153  const auto source = sort->getInput(0);
2154  const bool is_aggregate = node_is_aggregate(source);
2155  auto it = leaf_results_.find(sort->getId());
2156  if (it != leaf_results_.end()) {
2157  // Add any transient string literals to the sdp on the agg
2158  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2160  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2161  // Handle push-down for LIMIT for multi-node
2162  auto& aggregated_result = it->second;
2163  auto& result_rows = aggregated_result.rs;
2164  const size_t limit = sort->getLimit();
2165  const size_t offset = sort->getOffset();
2166  const auto order_entries = get_order_entries(sort);
2167  if (limit || offset) {
2168  if (!order_entries.empty()) {
2169  result_rows->sort(order_entries, limit + offset);
2170  }
2171  result_rows->dropFirstN(offset);
2172  if (limit) {
2173  result_rows->keepFirstN(limit);
2174  }
2175  }
2176  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2177  sort->setOutputMetainfo(aggregated_result.targets_meta);
2178  return result;
2179  }
2180  while (true) {
2181  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2182  bool is_desc{false};
2183  try {
2184  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2185  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2186  ExecutionOptions eo_copy = {
2188  eo.allow_multifrag,
2189  eo.just_explain,
2190  eo.allow_loop_joins,
2191  eo.with_watchdog,
2192  eo.jit_debug,
2193  eo.just_validate || sort->isEmptyResult(),
2201  eo.executor_type,
2202  };
2203 
2204  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2205  auto source_result = executeWorkUnit(source_work_unit,
2206  source->getOutputMetainfo(),
2207  is_aggregate,
2208  co,
2209  eo_copy,
2210  render_info,
2211  queue_time_ms);
2212  if (render_info && render_info->isPotentialInSituRender()) {
2213  return source_result;
2214  }
2215  if (source_result.isFilterPushDownEnabled()) {
2216  return source_result;
2217  }
2218  auto rows_to_sort = source_result.getRows();
2219  if (eo.just_explain) {
2220  return {rows_to_sort, {}};
2221  }
2222  const size_t limit = sort->getLimit();
2223  const size_t offset = sort->getOffset();
2224  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2225  !use_speculative_top_n(source_work_unit.exe_unit,
2226  rows_to_sort->getQueryMemDesc())) {
2227  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
2228  limit + offset);
2229  }
2230  if (limit || offset) {
2231  if (g_cluster && sort->collationCount() == 0) {
2232  if (offset >= rows_to_sort->rowCount()) {
2233  rows_to_sort->dropFirstN(offset);
2234  } else {
2235  rows_to_sort->keepFirstN(limit + offset);
2236  }
2237  } else {
2238  rows_to_sort->dropFirstN(offset);
2239  if (limit) {
2240  rows_to_sort->keepFirstN(limit);
2241  }
2242  }
2243  }
2244  return {rows_to_sort, source_result.getTargetsMeta()};
2245  } catch (const SpeculativeTopNFailed&) {
2246  CHECK_EQ(size_t(1), groupby_exprs.size());
2247  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2248  }
2249  }
2250  CHECK(false);
2251  return {std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2252  co.device_type,
2254  nullptr,
2255  executor_),
2256  {}};
2257 }
2258 
2260  const RelSort* sort,
2261  const ExecutionOptions& eo) {
2262  const auto source = sort->getInput(0);
2263  const size_t limit = sort->getLimit();
2264  const size_t offset = sort->getOffset();
2265  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2266  const size_t scan_total_limit =
2267  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2268  size_t max_groups_buffer_entry_guess{
2269  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2271  const auto order_entries = get_order_entries(sort);
2272  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2273  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2274  const auto& source_exe_unit = source_work_unit.exe_unit;
2275  if (source_exe_unit.groupby_exprs.size() == 1) {
2276  if (!source_exe_unit.groupby_exprs.front()) {
2277  sort_algorithm = SortAlgorithm::StreamingTopN;
2278  } else {
2279  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2280  first_oe_is_desc(order_entries))) {
2281  sort_algorithm = SortAlgorithm::Default;
2282  }
2283  }
2284  }
2285 
2286  sort->setOutputMetainfo(source->getOutputMetainfo());
2287  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2288  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2289  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2290  std::move(source_exe_unit.input_col_descs),
2291  source_exe_unit.simple_quals,
2292  source_exe_unit.quals,
2293  source_exe_unit.join_quals,
2294  source_exe_unit.groupby_exprs,
2295  source_exe_unit.target_exprs,
2296  nullptr,
2297  {sort_info.order_entries, sort_algorithm, limit, offset},
2298  scan_total_limit,
2299  source_exe_unit.query_features,
2300  source_exe_unit.use_bump_allocator,
2301  source_exe_unit.query_state},
2302  source,
2303  max_groups_buffer_entry_guess,
2304  std::move(source_work_unit.query_rewriter),
2305  source_work_unit.input_permutation,
2306  source_work_unit.left_deep_join_input_sizes};
2307 }
2308 
2309 namespace {
2310 
2317 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2318  CHECK(!table_infos.empty());
2319  const auto& first_table = table_infos.front();
2320  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2321  for (const auto& table_info : table_infos) {
2322  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2323  max_num_groups = table_info.info.getNumTuplesUpperBound();
2324  }
2325  }
2326  return std::max(max_num_groups, size_t(1));
2327 }
2328 
2336  for (const auto target_expr : ra_exe_unit.target_exprs) {
2337  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2338  return false;
2339  }
2340  }
2341  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2342  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2343  return true;
2344  }
2345  return false;
2346 }
2347 
2348 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2349  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2350  ra_exe_unit.simple_quals.empty());
2351 }
2352 
2354  const RelAlgExecutionUnit& ra_exe_unit_in,
2355  const std::vector<InputTableInfo>& table_infos,
2356  const Executor* executor,
2357  const ExecutorDeviceType device_type_in,
2358  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2359  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2360  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2361  const auto target_expr = ra_exe_unit.target_exprs[i];
2362  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2363  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2364  continue;
2365  }
2366  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2367  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2368  CHECK(arg);
2369  const auto& arg_ti = arg->get_type_info();
2370  // Avoid calling getExpressionRange for variable length types (string and array),
2371  // it'd trigger an assertion since that API expects to be called only for types
2372  // for which the notion of range is well-defined. A bit of a kludge, but the
2373  // logic to reject these types anyway is at lower levels in the stack and not
2374  // really worth pulling into a separate function for now.
2375  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2376  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2377  continue;
2378  }
2379  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2380  if (arg_range.getType() != ExpressionRangeType::Integer) {
2381  continue;
2382  }
2383  // When running distributed, the threshold for using the precise implementation
2384  // must be consistent across all leaves, otherwise we could have a mix of precise
2385  // and approximate bitmaps and we cannot aggregate them.
2386  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2387  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2388  const auto sub_bitmap_count =
2389  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2390  int64_t approx_bitmap_sz_bits{0};
2391  const auto error_rate =
2392  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2393  if (error_rate) {
2394  CHECK(error_rate->get_type_info().get_type() == kINT);
2395  CHECK_GE(error_rate->get_constval().intval, 1);
2396  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2397  } else {
2398  approx_bitmap_sz_bits = g_hll_precision_bits;
2399  }
2400  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2401  arg_range.getIntMin(),
2402  approx_bitmap_sz_bits,
2403  true,
2404  device_type,
2405  sub_bitmap_count};
2406  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2407  arg_range.getIntMin(),
2408  bitmap_sz_bits,
2409  false,
2410  device_type,
2411  sub_bitmap_count};
2412  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2413  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2414  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2415  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2416  target_exprs_owned.push_back(precise_count_distinct);
2417  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2418  }
2419  }
2420  return ra_exe_unit;
2421 }
2422 
2424  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2425  const std::vector<TargetMetaInfo>& targets_meta) {
2426  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2427  render_info.targets.clear();
2428  for (size_t i = 0; i < targets_meta.size(); ++i) {
2429  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2430  targets_meta[i].get_resname(),
2431  work_unit_target_exprs[i]->get_shared_ptr(),
2432  false));
2433  }
2434 }
2435 
2436 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2437  const CompilationOptions& co,
2438  const ExecutionOptions& eo) {
2440  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2441 }
2442 
2443 } // namespace
2444 
2446  const RelAlgExecutor::WorkUnit& work_unit,
2447  const std::vector<TargetMetaInfo>& targets_meta,
2448  const bool is_agg,
2449  const CompilationOptions& co_in,
2450  const ExecutionOptions& eo,
2451  RenderInfo* render_info,
2452  const int64_t queue_time_ms,
2453  const ssize_t previous_count) {
2455  auto timer = DEBUG_TIMER(__func__);
2456 
2457  auto co = co_in;
2458  ColumnCacheMap column_cache;
2459  if (is_window_execution_unit(work_unit.exe_unit)) {
2461  throw std::runtime_error("Window functions support is disabled");
2462  }
2464  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2465  }
2466  if (!eo.just_explain && eo.find_push_down_candidates) {
2467  // find potential candidates:
2468  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2469  if (!selected_filters.empty() || eo.just_calcite_explain) {
2470  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2471  }
2472  }
2473  if (render_info && render_info->isPotentialInSituRender()) {
2474  co.allow_lazy_fetch = false;
2475  }
2476  const auto body = work_unit.body;
2477  CHECK(body);
2478  auto it = leaf_results_.find(body->getId());
2479  if (it != leaf_results_.end()) {
2481  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2482  auto& aggregated_result = it->second;
2483  auto& result_rows = aggregated_result.rs;
2484  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2485  body->setOutputMetainfo(aggregated_result.targets_meta);
2486  if (render_info) {
2487  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2488  }
2489  return result;
2490  }
2491  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2492 
2494  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2495  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2496  if (is_window_execution_unit(ra_exe_unit)) {
2497  CHECK_EQ(table_infos.size(), size_t(1));
2498  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2499  max_groups_buffer_entry_guess =
2500  table_infos.front().info.fragments.front().getNumTuples();
2501  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2502  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2503  if (previous_count > 0 && !exe_unit_has_quals(ra_exe_unit)) {
2504  ra_exe_unit.scan_limit = static_cast<size_t>(previous_count);
2505  } else {
2506  // TODO(adb): enable bump allocator path for render queries
2507  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2508  ra_exe_unit.scan_limit = 0;
2509  ra_exe_unit.use_bump_allocator = true;
2510  } else if (eo.executor_type == ::ExecutorType::Extern) {
2511  ra_exe_unit.scan_limit = 0;
2512  } else if (!eo.just_explain) {
2513  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2514  if (filter_count_all >= 0) {
2515  ra_exe_unit.scan_limit = std::max(filter_count_all, ssize_t(1));
2516  }
2517  }
2518  }
2519  }
2520 
2521  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2522  co.device_type,
2524  nullptr,
2525  executor_),
2526  {}};
2527 
2528  auto execute_and_handle_errors =
2529  [&](const auto max_groups_buffer_entry_guess_in,
2530  const bool has_cardinality_estimation) -> ExecutionResult {
2531  // Note that the groups buffer entry guess may be modified during query execution.
2532  // Create a local copy so we can track those changes if we need to attempt a retry
2533  // due to OOM
2534  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2535  try {
2536  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2537  is_agg,
2538  table_infos,
2539  ra_exe_unit,
2540  co,
2541  eo,
2542  cat_,
2543  executor_->row_set_mem_owner_,
2544  render_info,
2545  has_cardinality_estimation,
2546  column_cache),
2547  targets_meta};
2548  } catch (const QueryExecutionError& e) {
2550  return handleOutOfMemoryRetry(
2551  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2552  targets_meta,
2553  is_agg,
2554  co,
2555  eo,
2556  render_info,
2558  queue_time_ms);
2559  }
2560  };
2561 
2562  try {
2563  result = execute_and_handle_errors(
2564  max_groups_buffer_entry_guess,
2566  } catch (const CardinalityEstimationRequired&) {
2567  const auto estimated_groups_buffer_entry_guess =
2568  2 * std::min(groups_approx_upper_bound(table_infos),
2569  getNDVEstimation(work_unit, is_agg, co, eo));
2570  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2571  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2572  }
2573 
2574  result.setQueueTime(queue_time_ms);
2575  if (render_info) {
2576  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2577  if (render_info->isPotentialInSituRender()) {
2578  // return an empty result (with the same queue time, and zero render time)
2579  return {std::make_shared<ResultSet>(
2580  queue_time_ms,
2581  0,
2582  executor_->row_set_mem_owner_
2583  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2584  : nullptr),
2585  {}};
2586  }
2587  }
2588  return result;
2589 }
2590 
2592  const bool is_agg,
2593  const CompilationOptions& co,
2594  const ExecutionOptions& eo) {
2595  const auto count =
2596  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2597  kCOUNT,
2598  nullptr,
2599  false,
2600  nullptr);
2601  const auto count_all_exe_unit =
2602  create_count_all_execution_unit(work_unit.exe_unit, count);
2603  size_t one{1};
2604  ResultSetPtr count_all_result;
2605  try {
2606  ColumnCacheMap column_cache;
2607  count_all_result =
2608  executor_->executeWorkUnit(one,
2609  is_agg,
2610  get_table_infos(work_unit.exe_unit, executor_),
2611  count_all_exe_unit,
2612  co,
2613  eo,
2614  cat_,
2615  executor_->row_set_mem_owner_,
2616  nullptr,
2617  false,
2618  column_cache);
2619  } catch (const std::exception& e) {
2620  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2621  return -1;
2622  }
2623  const auto count_row = count_all_result->getNextRow(false, false);
2624  CHECK_EQ(size_t(1), count_row.size());
2625  const auto& count_tv = count_row.front();
2626  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2627  CHECK(count_scalar_tv);
2628  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2629  CHECK(count_ptr);
2630  CHECK_GE(*count_ptr, 0);
2631  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2632  return std::max(count_upper_bound, size_t(1));
2633 }
2634 
2635 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2636  const auto& ra_exe_unit = work_unit.exe_unit;
2637  if (ra_exe_unit.input_descs.size() != 1) {
2638  return false;
2639  }
2640  const auto& table_desc = ra_exe_unit.input_descs.front();
2641  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2642  return false;
2643  }
2644  const int table_id = table_desc.getTableId();
2645  for (const auto simple_qual : ra_exe_unit.simple_quals) {
2646  const auto comp_expr =
2647  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2648  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2649  return false;
2650  }
2651  const auto lhs = comp_expr->get_left_operand();
2652  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2653  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2654  return false;
2655  }
2656  const auto rhs = comp_expr->get_right_operand();
2657  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2658  if (!rhs_const) {
2659  return false;
2660  }
2661  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2662  if (cd->isVirtualCol) {
2663  CHECK_EQ("rowid", cd->columnName);
2664  return true;
2665  }
2666  }
2667  return false;
2668 }
2669 
2671  const RelAlgExecutor::WorkUnit& work_unit,
2672  const std::vector<TargetMetaInfo>& targets_meta,
2673  const bool is_agg,
2674  const CompilationOptions& co,
2675  const ExecutionOptions& eo,
2676  RenderInfo* render_info,
2677  const bool was_multifrag_kernel_launch,
2678  const int64_t queue_time_ms) {
2679  // Disable the bump allocator
2680  // Note that this will have basically the same affect as using the bump allocator for
2681  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2682  // path and the bump allocator path for kernel per fragment execution.
2683  auto ra_exe_unit_in = work_unit.exe_unit;
2684  ra_exe_unit_in.use_bump_allocator = false;
2685 
2686  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2687  co.device_type,
2689  nullptr,
2690  executor_),
2691  {}};
2692 
2693  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2694  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2695  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2696  false,
2697  false,
2698  eo.allow_loop_joins,
2699  eo.with_watchdog,
2700  eo.jit_debug,
2701  false,
2704  false,
2705  false,
2707  false};
2708 
2709  if (was_multifrag_kernel_launch) {
2710  try {
2711  // Attempt to retry using the kernel per fragment path. The smaller input size
2712  // required may allow the entire kernel to execute in GPU memory.
2713  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2714  "kernels disabled.";
2715  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2716  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
2717  ColumnCacheMap column_cache;
2718  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2719  is_agg,
2720  table_infos,
2721  ra_exe_unit,
2722  co,
2723  eo_no_multifrag,
2724  cat_,
2725  executor_->row_set_mem_owner_,
2726  nullptr,
2727  true,
2728  column_cache),
2729  targets_meta};
2730  result.setQueueTime(queue_time_ms);
2731  } catch (const QueryExecutionError& e) {
2733  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2734  }
2735  }
2736 
2737  if (render_info) {
2738  render_info->setForceNonInSituData();
2739  }
2740 
2741  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
2742  // Only reset the group buffer entry guess if we ran out of slots, which
2743  // suggests a
2744  // highly pathological input which prevented a good estimation of distinct tuple
2745  // count. For projection queries, this will force a per-fragment scan limit, which is
2746  // compatible with the CPU path
2747  VLOG(1) << "Resetting max groups buffer entry guess.";
2748  max_groups_buffer_entry_guess = 0;
2749 
2750  int iteration_ctr = -1;
2751  while (true) {
2752  iteration_ctr++;
2754  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
2755  ColumnCacheMap column_cache;
2756  try {
2757  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2758  is_agg,
2759  table_infos,
2760  ra_exe_unit,
2761  co_cpu,
2762  eo_no_multifrag,
2763  cat_,
2764  executor_->row_set_mem_owner_,
2765  nullptr,
2766  true,
2767  column_cache),
2768  targets_meta};
2769  } catch (const QueryExecutionError& e) {
2770  // Ran out of slots
2771  if (e.getErrorCode() < 0) {
2772  // Even the conservative guess failed; it should only happen when we group
2773  // by a huge cardinality array. Maybe we should throw an exception instead?
2774  // Such a heavy query is entirely capable of exhausting all the host memory.
2775  CHECK(max_groups_buffer_entry_guess);
2776  // Only allow two iterations of increasingly large entry guesses up to a maximum
2777  // of 512MB per column per kernel
2778  if (g_enable_watchdog || iteration_ctr > 1) {
2779  throw std::runtime_error("Query ran out of output slots in the result");
2780  }
2781  max_groups_buffer_entry_guess *= 2;
2782  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
2783  "groups buffer entry "
2784  "guess equal to "
2785  << max_groups_buffer_entry_guess;
2786  } else {
2788  }
2789  continue;
2790  }
2791  result.setQueueTime(queue_time_ms);
2792  return result;
2793  }
2794  return result;
2795 }
2796 
2798  LOG(ERROR) << "Query execution failed with error "
2799  << getErrorMessageFromCode(error_code);
2800  if (error_code == Executor::ERR_SPECULATIVE_TOP_OOM) {
2801  throw SpeculativeTopNFailed();
2802  }
2803  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
2804  // We ran out of GPU memory, this doesn't count as an error if the query is
2805  // allowed to continue on CPU because retry on CPU is explicitly allowed through
2806  // --allow-cpu-retry.
2807  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
2808  if (!g_allow_cpu_retry) {
2809  throw std::runtime_error(
2810  "Query ran out of GPU memory, unable to automatically retry on CPU");
2811  }
2812  return;
2813  }
2814  throw std::runtime_error(getErrorMessageFromCode(error_code));
2815 }
2816 
2818  if (error_code < 0) {
2819  return "Ran out of slots in the query output buffer";
2820  }
2821  switch (error_code) {
2823  return "Division by zero";
2825  return "Query couldn't keep the entire working set of columns in GPU memory";
2827  return "Self joins not supported yet";
2829  return "Not enough host memory to execute the query";
2831  return "Overflow or underflow";
2833  return "Query execution has exceeded the time limit";
2835  return "Query execution has been interrupted";
2837  return "Columnar conversion not supported for variable length types";
2839  return "Too many literals in the query";
2841  return "NONE ENCODED String types are not supported as input result set.";
2843  return "Not enough OpenGL memory to render the query results";
2845  return "Streaming-Top-N not supported in Render Query";
2847  return "Multiple distinct values encountered";
2848  }
2849  return "Other error: code " + std::to_string(error_code);
2850 }
2851 
2853  const SortInfo& sort_info,
2854  const ExecutionOptions& eo) {
2855  const auto compound = dynamic_cast<const RelCompound*>(node);
2856  if (compound) {
2857  return createCompoundWorkUnit(compound, sort_info, eo);
2858  }
2859  const auto project = dynamic_cast<const RelProject*>(node);
2860  if (project) {
2861  return createProjectWorkUnit(project, sort_info, eo);
2862  }
2863  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
2864  if (aggregate) {
2865  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
2866  }
2867  const auto filter = dynamic_cast<const RelFilter*>(node);
2868  CHECK(filter);
2869  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
2870 }
2871 
2872 namespace {
2873 
2875  auto sink = get_data_sink(ra);
2876  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
2877  return join->getJoinType();
2878  }
2879  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
2880  return JoinType::INNER;
2881  }
2882 
2883  return JoinType::INVALID;
2884 }
2885 
2886 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
2887  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2888  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
2889  return nullptr;
2890  }
2891  const auto equi_join_condition =
2892  dynamic_cast<const RexOperator*>(condition->getOperand(0));
2893  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
2894  return nullptr;
2895  }
2896  const auto both_are_null_condition =
2897  dynamic_cast<const RexOperator*>(condition->getOperand(1));
2898  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
2899  both_are_null_condition->size() != 2) {
2900  return nullptr;
2901  }
2902  const auto lhs_is_null =
2903  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
2904  const auto rhs_is_null =
2905  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
2906  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
2907  rhs_is_null->getOperator() != kISNULL) {
2908  return nullptr;
2909  }
2910  CHECK_EQ(size_t(1), lhs_is_null->size());
2911  CHECK_EQ(size_t(1), rhs_is_null->size());
2912  CHECK_EQ(size_t(2), equi_join_condition->size());
2913  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
2914  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
2915  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
2916  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
2917  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
2918  return nullptr;
2919  }
2920  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
2921  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
2922  RexDeepCopyVisitor deep_copy_visitor;
2923  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
2924  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
2925  eq_operands.emplace_back(lhs_op_copy.release());
2926  eq_operands.emplace_back(rhs_op_copy.release());
2927  return boost::make_unique<const RexOperator>(
2928  kBW_EQ, eq_operands, equi_join_condition->getType());
2929  }
2930  return nullptr;
2931 }
2932 
2933 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
2934  const RexScalar* scalar) {
2935  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2936  if (condition && condition->getOperator() == kAND) {
2937  CHECK_GE(condition->size(), size_t(2));
2938  auto acc = get_bitwise_equals(condition->getOperand(0));
2939  if (!acc) {
2940  return nullptr;
2941  }
2942  for (size_t i = 1; i < condition->size(); ++i) {
2943  std::vector<std::unique_ptr<const RexScalar>> and_operands;
2944  and_operands.emplace_back(std::move(acc));
2945  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
2946  acc =
2947  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
2948  }
2949  return acc;
2950  }
2951  return get_bitwise_equals(scalar);
2952 }
2953 
2954 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
2955  CHECK_GE(left_deep_join->inputCount(), size_t(2));
2956  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
2957  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
2958  ++nesting_level) {
2959  if (left_deep_join->getOuterCondition(nesting_level)) {
2960  join_types[nesting_level - 1] = JoinType::LEFT;
2961  }
2962  }
2963  return join_types;
2964 }
2965 
2966 template <class RA>
2967 std::vector<size_t> do_table_reordering(
2968  std::vector<InputDescriptor>& input_descs,
2969  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
2970  const JoinQualsPerNestingLevel& left_deep_join_quals,
2971  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2972  const RA* node,
2973  const std::vector<InputTableInfo>& query_infos,
2974  const Executor* executor) {
2975  if (g_cluster) {
2976  // Disable table reordering in distributed mode. The aggregator does not have enough
2977  // information to break ties
2978  return {};
2979  }
2980  const auto& cat = *executor->getCatalog();
2981  for (const auto& table_info : query_infos) {
2982  if (table_info.table_id < 0) {
2983  continue;
2984  }
2985  const auto td = cat.getMetadataForTable(table_info.table_id);
2986  CHECK(td);
2987  if (table_is_replicated(td)) {
2988  return {};
2989  }
2990  }
2991  const auto input_permutation =
2992  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
2993  input_to_nest_level = get_input_nest_levels(node, input_permutation);
2994  std::tie(input_descs, input_col_descs, std::ignore) =
2995  get_input_desc(node, input_to_nest_level, input_permutation, cat);
2996  return input_permutation;
2997 }
2998 
3000  const RelLeftDeepInnerJoin* left_deep_join) {
3001  std::vector<size_t> input_sizes;
3002  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3003  const auto inputs = get_node_output(left_deep_join->getInput(i));
3004  input_sizes.push_back(inputs.size());
3005  }
3006  return input_sizes;
3007 }
3008 
3009 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3010  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3011  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3012  for (const auto& qual : quals) {
3013  const auto rewritten_qual = rewrite_expr(qual.get());
3014  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3015  }
3016  return rewritten_quals;
3017 }
3018 
3019 } // namespace
3020 
3022  const RelCompound* compound,
3023  const SortInfo& sort_info,
3024  const ExecutionOptions& eo) {
3025  std::vector<InputDescriptor> input_descs;
3026  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3027  auto input_to_nest_level = get_input_nest_levels(compound, {});
3028  std::tie(input_descs, input_col_descs, std::ignore) =
3029  get_input_desc(compound, input_to_nest_level, {}, cat_);
3030  const auto query_infos = get_table_infos(input_descs, executor_);
3031  CHECK_EQ(size_t(1), compound->inputCount());
3032  const auto left_deep_join =
3033  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3034  JoinQualsPerNestingLevel left_deep_join_quals;
3035  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3036  : std::vector<JoinType>{get_join_type(compound)};
3037  std::vector<size_t> input_permutation;
3038  std::vector<size_t> left_deep_join_input_sizes;
3039  if (left_deep_join) {
3040  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3041  left_deep_join_quals = translateLeftDeepJoinFilter(
3042  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3044  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3045  join_types.end()) {
3046  input_permutation = do_table_reordering(input_descs,
3047  input_col_descs,
3048  left_deep_join_quals,
3049  input_to_nest_level,
3050  compound,
3051  query_infos,
3052  executor_);
3053  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3054  std::tie(input_descs, input_col_descs, std::ignore) =
3055  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3056  left_deep_join_quals = translateLeftDeepJoinFilter(
3057  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3058  }
3059  }
3060  QueryFeatureDescriptor query_features;
3061  RelAlgTranslator translator(cat_,
3062  executor_,
3063  input_to_nest_level,
3064  join_types,
3065  now_,
3066  eo.just_explain,
3067  query_features);
3068  const auto scalar_sources =
3069  translate_scalar_sources(compound, translator, eo.executor_type);
3070  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3071  const auto quals_cf = translate_quals(compound, translator);
3072  const auto target_exprs = translate_targets(target_exprs_owned_,
3073  scalar_sources,
3074  groupby_exprs,
3075  compound,
3076  translator,
3077  eo.executor_type);
3078  CHECK_EQ(compound->size(), target_exprs.size());
3079  const RelAlgExecutionUnit exe_unit = {input_descs,
3080  input_col_descs,
3081  quals_cf.simple_quals,
3082  rewrite_quals(quals_cf.quals),
3083  left_deep_join_quals,
3084  groupby_exprs,
3085  target_exprs,
3086  nullptr,
3087  sort_info,
3088  0,
3089  query_features,
3090  false,
3091  query_state_};
3092  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3093  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3094  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3095  compound->setOutputMetainfo(targets_meta);
3096  return {rewritten_exe_unit,
3097  compound,
3099  std::move(query_rewriter),
3100  input_permutation,
3101  left_deep_join_input_sizes};
3102 }
3103 
3104 namespace {
3105 
3106 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3107  CHECK(qual_expr);
3108  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3109  if (!bin_oper || bin_oper->getOperator() != kAND) {
3110  return {qual_expr};
3111  }
3112  CHECK_GE(bin_oper->size(), size_t(2));
3113  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3114  for (size_t i = 1; i < bin_oper->size(); ++i) {
3115  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3116  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3117  }
3118  return lhs_cf;
3119 }
3120 
3121 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3122  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3123  const SQLOps sql_op) {
3124  CHECK(!factors.empty());
3125  auto acc = factors.front();
3126  for (size_t i = 1; i < factors.size(); ++i) {
3127  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3128  }
3129  return acc;
3130 }
3131 
3132 template <class QualsList>
3133 bool list_contains_expression(const QualsList& haystack,
3134  const std::shared_ptr<Analyzer::Expr>& needle) {
3135  for (const auto& qual : haystack) {
3136  if (*qual == *needle) {
3137  return true;
3138  }
3139  }
3140  return false;
3141 }
3142 
3143 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3144 // evaluations of `p` and allows use of the original form in joins if `p`
3145 // can be used for hash joins.
3146 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3147  const std::shared_ptr<Analyzer::Expr>& expr) {
3148  const auto expr_terms = qual_to_disjunctive_form(expr);
3149  CHECK_GE(expr_terms.size(), size_t(1));
3150  const auto& first_term = expr_terms.front();
3151  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3152  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3153  // First, collect the conjunctive components common to all the disjunctive components.
3154  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3155  for (const auto& first_term_factor : first_term_factors.quals) {
3156  bool is_common =
3157  expr_terms.size() > 1; // Only report common factors for disjunction.
3158  for (size_t i = 1; i < expr_terms.size(); ++i) {
3159  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3160  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3161  is_common = false;
3162  break;
3163  }
3164  }
3165  if (is_common) {
3166  common_factors.push_back(first_term_factor);
3167  }
3168  }
3169  if (common_factors.empty()) {
3170  return expr;
3171  }
3172  // Now that the common expressions are known, collect the remaining expressions.
3173  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3174  for (const auto& term : expr_terms) {
3175  const auto term_cf = qual_to_conjunctive_form(term);
3176  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3177  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3178  for (const auto& qual : term_cf.quals) {
3179  if (!list_contains_expression(common_factors, qual)) {
3180  remaining_quals.push_back(qual);
3181  }
3182  }
3183  if (!remaining_quals.empty()) {
3184  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3185  }
3186  }
3187  // Reconstruct the expression with the transformation applied.
3188  const auto common_expr = build_logical_expression(common_factors, kAND);
3189  if (remaining_terms.empty()) {
3190  return common_expr;
3191  }
3192  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3193  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3194 }
3195 
3196 } // namespace
3197 
3198 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3199  const RexScalar* join_condition,
3200  const std::vector<JoinType>& join_types,
3201  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3202  const bool just_explain) const {
3203  QueryFeatureDescriptor query_features;
3204  RelAlgTranslator translator(cat_,
3205  executor_,
3206  input_to_nest_level,
3207  join_types,
3208  now_,
3209  just_explain,
3210  query_features);
3211  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3212  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3213  for (const auto rex_condition_component : rex_condition_cf) {
3214  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3215  const auto join_condition =
3217  bw_equals ? bw_equals.get() : rex_condition_component));
3218  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3219  join_condition_quals.insert(join_condition_quals.end(),
3220  join_condition_cf.quals.begin(),
3221  join_condition_cf.quals.end());
3222  join_condition_quals.insert(join_condition_quals.end(),
3223  join_condition_cf.simple_quals.begin(),
3224  join_condition_cf.simple_quals.end());
3225  }
3226  return combine_equi_join_conditions(join_condition_quals);
3227 }
3228 
3229 // Translate left deep join filter and separate the conjunctive form qualifiers
3230 // per nesting level. The code generated for hash table lookups on each level
3231 // must dominate its uses in deeper nesting levels.
3233  const RelLeftDeepInnerJoin* join,
3234  const std::vector<InputDescriptor>& input_descs,
3235  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3236  const bool just_explain) {
3237  const auto join_types = left_deep_join_types(join);
3238  const auto join_condition_quals = makeJoinQuals(
3239  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3240  MaxRangeTableIndexVisitor rte_idx_visitor;
3241  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3242  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3243  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3244  const auto outer_condition = join->getOuterCondition(rte_idx);
3245  if (outer_condition) {
3246  result[rte_idx - 1].quals =
3247  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3248  CHECK_LE(rte_idx, join_types.size());
3249  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3250  result[rte_idx - 1].type = JoinType::LEFT;
3251  continue;
3252  }
3253  for (const auto qual : join_condition_quals) {
3254  if (visited_quals.count(qual)) {
3255  continue;
3256  }
3257  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3258  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3259  const auto it_ok = visited_quals.emplace(qual);
3260  CHECK(it_ok.second);
3261  result[rte_idx - 1].quals.push_back(qual);
3262  }
3263  }
3264  CHECK_LE(rte_idx, join_types.size());
3265  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3266  result[rte_idx - 1].type = JoinType::INNER;
3267  }
3268  return result;
3269 }
3270 
3271 namespace {
3272 
3273 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3274  const RelAlgNode* ra_node,
3275  const size_t nest_level,
3276  const std::vector<TargetMetaInfo>& in_metainfo,
3277  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3278  CHECK_LE(size_t(1), ra_node->inputCount());
3279  CHECK_GE(size_t(2), ra_node->inputCount());
3280  const auto input = ra_node->getInput(nest_level);
3281  const auto it_rte_idx = input_to_nest_level.find(input);
3282  CHECK(it_rte_idx != input_to_nest_level.end());
3283  const int rte_idx = it_rte_idx->second;
3284  const int table_id = table_id_from_ra(input);
3285  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3286  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3287  int input_idx = 0;
3288  for (const auto& input_meta : in_metainfo) {
3289  inputs.push_back(
3290  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3291  table_id,
3292  scan_ra ? input_idx + 1 : input_idx,
3293  rte_idx));
3294  ++input_idx;
3295  }
3296  return inputs;
3297 }
3298 
3299 } // namespace
3300 
3302  const RelAggregate* aggregate,
3303  const SortInfo& sort_info,
3304  const bool just_explain) {
3305  std::vector<InputDescriptor> input_descs;
3306  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3307  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3308  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3309  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3310  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3311  const auto join_type = get_join_type(aggregate);
3312  QueryFeatureDescriptor query_features;
3313  RelAlgTranslator translator(cat_,
3314  executor_,
3315  input_to_nest_level,
3316  {join_type},
3317  now_,
3318  just_explain,
3319  query_features);
3320  CHECK_EQ(size_t(1), aggregate->inputCount());
3321  const auto source = aggregate->getInput(0);
3322  const auto& in_metainfo = source->getOutputMetainfo();
3323  const auto scalar_sources =
3324  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3325  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3326  const auto target_exprs = translate_targets(
3327  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3328  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3329  aggregate->setOutputMetainfo(targets_meta);
3330  return {RelAlgExecutionUnit{input_descs,
3331  input_col_descs,
3332  {},
3333  {},
3334  {},
3335  groupby_exprs,
3336  target_exprs,
3337  nullptr,
3338  sort_info,
3339  0,
3340  query_features,
3341  false,
3342  query_state_},
3343  aggregate,
3345  nullptr};
3346 }
3347 
3349  const RelProject* project,
3350  const SortInfo& sort_info,
3351  const ExecutionOptions& eo) {
3352  std::vector<InputDescriptor> input_descs;
3353  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3354  auto input_to_nest_level = get_input_nest_levels(project, {});
3355  std::tie(input_descs, input_col_descs, std::ignore) =
3356  get_input_desc(project, input_to_nest_level, {}, cat_);
3357  const auto query_infos = get_table_infos(input_descs, executor_);
3358 
3359  const auto left_deep_join =
3360  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3361  JoinQualsPerNestingLevel left_deep_join_quals;
3362  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3363  : std::vector<JoinType>{get_join_type(project)};
3364  std::vector<size_t> input_permutation;
3365  std::vector<size_t> left_deep_join_input_sizes;
3366  if (left_deep_join) {
3367  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3368  const auto query_infos = get_table_infos(input_descs, executor_);
3369  left_deep_join_quals = translateLeftDeepJoinFilter(
3370  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3372  input_permutation = do_table_reordering(input_descs,
3373  input_col_descs,
3374  left_deep_join_quals,
3375  input_to_nest_level,
3376  project,
3377  query_infos,
3378  executor_);
3379  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3380  std::tie(input_descs, input_col_descs, std::ignore) =
3381  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3382  left_deep_join_quals = translateLeftDeepJoinFilter(
3383  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3384  }
3385  }
3386 
3387  QueryFeatureDescriptor query_features;
3388  RelAlgTranslator translator(cat_,
3389  executor_,
3390  input_to_nest_level,
3391  join_types,
3392  now_,
3393  eo.just_explain,
3394  query_features);
3395  const auto target_exprs_owned =
3396  translate_scalar_sources(project, translator, eo.executor_type);
3397  target_exprs_owned_.insert(
3398  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3399  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3400  const RelAlgExecutionUnit exe_unit = {input_descs,
3401  input_col_descs,
3402  {},
3403  {},
3404  left_deep_join_quals,
3405  {nullptr},
3406  target_exprs,
3407  nullptr,
3408  sort_info,
3409  0,
3410  query_features,
3411  false,
3412  query_state_};
3413  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3414  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3415  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3416  project->setOutputMetainfo(targets_meta);
3417  return {rewritten_exe_unit,
3418  project,
3420  std::move(query_rewriter),
3421  input_permutation,
3422  left_deep_join_input_sizes};
3423 }
3424 
3426  const RelTableFunction* table_func,
3427  const bool just_explain) {
3428  std::vector<InputDescriptor> input_descs;
3429  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3430  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3431  std::tie(input_descs, input_col_descs, std::ignore) =
3432  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3433  const auto query_infos = get_table_infos(input_descs, executor_);
3434  CHECK_EQ(size_t(1), table_func->inputCount());
3435 
3436  QueryFeatureDescriptor query_features; // TODO(adb): remove/make optional
3437  RelAlgTranslator translator(
3438  cat_, executor_, input_to_nest_level, {}, now_, just_explain, query_features);
3439  const auto input_exprs_owned =
3440  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3441  target_exprs_owned_.insert(
3442  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3443  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3444 
3445  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3446  for (auto input_expr : input_exprs) {
3447  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3448  input_col_exprs.push_back(col_var);
3449  }
3450  }
3451  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3452 
3453  const auto& table_function_impl =
3455 
3456  std::vector<Analyzer::Expr*> table_func_outputs;
3457  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3458  const auto ti = table_function_impl.getOutputSQLType(i);
3459  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3460  table_func_outputs.push_back(target_exprs_owned_.back().get());
3461  }
3462 
3463  std::optional<size_t> output_row_multiplier;
3464  if (table_function_impl.hasUserSpecifiedOutputMultiplier()) {
3465  const auto parameter_index = table_function_impl.getOutputRowParameter();
3466  CHECK_GT(parameter_index, size_t(0));
3467  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3468  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3469  if (!parameter_expr_literal) {
3470  throw std::runtime_error(
3471  "Provided output buffer multiplier parameter is not a literal. Only literal "
3472  "values are supported with output buffer multiplier configured table "
3473  "functions.");
3474  }
3475  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3476  if (literal_val < 0) {
3477  throw std::runtime_error("Provided output row multiplier " +
3478  std::to_string(literal_val) +
3479  " is not valid for table functions.");
3480  }
3481  output_row_multiplier = static_cast<size_t>(literal_val);
3482  }
3483 
3484  const TableFunctionExecutionUnit exe_unit = {
3485  input_descs,
3486  input_col_descs,
3487  input_exprs, // table function inputs
3488  input_col_exprs, // table function column inputs (duplicates w/ above)
3489  table_func_outputs, // table function projected exprs
3490  output_row_multiplier, // output buffer multiplier
3491  table_func->getFunctionName()};
3492  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3493  table_func->setOutputMetainfo(targets_meta);
3494  return {exe_unit, table_func};
3495 }
3496 
3497 namespace {
3498 
3499 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3501  const RelAlgTranslator& translator,
3502  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3503  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3504  std::vector<TargetMetaInfo> in_metainfo;
3505  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3506  const auto data_sink_node = get_data_sink(filter);
3507  auto input_it = inputs_owned.begin();
3508  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3509  const auto source = data_sink_node->getInput(nest_level);
3510  const auto scan_source = dynamic_cast<const RelScan*>(source);
3511  if (scan_source) {
3512  CHECK(source->getOutputMetainfo().empty());
3513  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3514  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3515  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3516  }
3517  const auto source_metadata =
3518  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3519  in_metainfo.insert(
3520  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3521  exprs_owned.insert(
3522  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3523  } else {
3524  const auto& source_metadata = source->getOutputMetainfo();
3525  input_it += source_metadata.size();
3526  in_metainfo.insert(
3527  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3528  const auto scalar_sources_owned = synthesize_inputs(
3529  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3530  exprs_owned.insert(
3531  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3532  }
3533  }
3534  return std::make_pair(in_metainfo, exprs_owned);
3535 }
3536 
3537 } // namespace
3538 
3540  const SortInfo& sort_info,
3541  const bool just_explain) {
3542  CHECK_EQ(size_t(1), filter->inputCount());
3543  std::vector<InputDescriptor> input_descs;
3544  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3545  std::vector<TargetMetaInfo> in_metainfo;
3546  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3547  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3548 
3549  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3550  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3551  get_input_desc(filter, input_to_nest_level, {}, cat_);
3552  const auto join_type = get_join_type(filter);
3553  QueryFeatureDescriptor query_features;
3554  RelAlgTranslator translator(cat_,
3555  executor_,
3556  input_to_nest_level,
3557  {join_type},
3558  now_,
3559  just_explain,
3560  query_features);
3561  std::tie(in_metainfo, target_exprs_owned) =
3562  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3563  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3564  const auto qual = fold_expr(filter_expr.get());
3565  target_exprs_owned_.insert(
3566  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3567  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3568  filter->setOutputMetainfo(in_metainfo);
3569  const auto rewritten_qual = rewrite_expr(qual.get());
3570  return {{input_descs,
3571  input_col_descs,
3572  {},
3573  {rewritten_qual ? rewritten_qual : qual},
3574  {},
3575  {nullptr},
3576  target_exprs,
3577  nullptr,
3578  sort_info,
3579  0},
3580  filter,
3582  nullptr};
3583 }
3584 
const size_t getGroupByCount() const
bool hasRows() const
SQLTypeInfo getOutputSQLType(const size_t idx) const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
JoinType
Definition: sqldefs.h:107
HOST DEVICE int get_size() const
Definition: sqltypes.h:258
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define EMPTY_KEY_64
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain)
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:988
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
Definition: sqltypes.h:50
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
bool g_enable_bump_allocator
Definition: Execute.cpp:100
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:139
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2539
size_t size() const override
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:140
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
void setForceNonInSituData()
Definition: RenderInfo.cpp:42
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:75
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:811
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:188
const unsigned runtime_query_interrupt_frequency
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:66
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3287
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:95
const RexScalar * getCondition() const
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:328
Definition: sqldefs.h:38
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1582
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static const TableFunction & get(const std::string &name)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:796
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:990
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:214
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:248
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
size_t getNDVEstimation(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const std::vector< InputDescriptor > input_descs
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1404
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:287
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
bool g_enable_watchdog
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:41
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:407
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:991
Definition: sqldefs.h:73
ExecutorType
size_t getRowsSize() const
size_t getColInputsSize() const
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:992
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:989
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:179
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:979
static std::shared_ptr< Analyzer::Expr > translateLiteral(const RexLiteral *)
const bool allow_multifrag
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:78
CHECK(cgen_state)
const bool just_validate
unsigned getId() const
Classes representing a parse tree.
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
const SortInfo sort_info
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:983
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
const double gpu_input_mem_limit_percent
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:93
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:985
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:987
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
static WindowProjectNodeContext * create()
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:982
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1047
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:993
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:980
size_t getTableFuncInputsSize() const
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1531
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
bool g_enable_interop
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:132
int get_result_table_id() const
Definition: Analyzer.h:1539
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:259
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:94
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const std::vector< size_t > outer_fragment_indices
const RexScalar * getProjectAt(const size_t idx) const
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:105
Definition: sqltypes.h:53
Definition: sqltypes.h:54
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:187
const bool just_calcite_explain
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:842
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:256
bool table_is_temporary(const TableDescriptor *const td)
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::shared_ptr< const query_state::QueryState > query_state_
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
Datum get_constval() const
Definition: Analyzer.h:329
Definition: sqldefs.h:76
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
const RelAlgNode * getSourceNode() const
bool isAggregate() const
size_t getLimit() const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
Definition: sqltypes.h:42
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
bool g_enable_table_functions
Definition: Execute.cpp:95
static void handlePersistentError(const int32_t error_code)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:257
const RexScalar * getTableFuncInputAt(const size_t idx) const
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1044
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool g_skip_intermediate_count
static const int32_t ERR_SPECULATIVE_TOP_OOM
Definition: Execute.h:986
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getValueAt(const size_t row_idx, const size_t col_idx) const
const RexScalar * getInnerCondition() const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:75
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:436
void cleanupPostExecution()
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map)
std::vector< std::string > ColumnNameList
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
Definition: sqltypes.h:46
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
bool is_string() const
Definition: sqltypes.h:399
const size_t inputCount() const
specifies the content in-memory of a row in the table metadata table
const bool allow_runtime_query_interrupt
int8_t * numbersPtr
Definition: sqltypes.h:138
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Definition: sqldefs.h:74
const std::list< int > & get_result_col_list() const
Definition: Analyzer.h:1540
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1400
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:984
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:603
const TableDescriptor * getTableDescriptor() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
Definition: sqldefs.h:72
const std::vector< std::shared_ptr< TargetEntry > > & get_targetlist() const
Definition: Analyzer.h:1523
std::vector< const RelAlgNode * > get_non_join_sequence(const RelAlgNode *ra)
Executor * executor_
int getNestLevel() const
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:163
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:104
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1526
const bool with_watchdog
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:120
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:142
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
const RexScalar * getScalarSource(const size_t i) const
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)