OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 #include "RelAlgTranslator.h"
19 
21 #include "CardinalityEstimator.h"
22 #include "ColumnFetcher.h"
23 #include "EquiJoinCondition.h"
24 #include "ErrorHandling.h"
25 #include "ExpressionRewrite.h"
26 #include "FromTableReordering.h"
27 #include "InputMetadata.h"
28 #include "JoinFilterPushDown.h"
30 #include "RangeTableIndexVisitor.h"
31 #include "RexVisitor.h"
33 #include "UsedColumnsVisitor.h"
34 #include "WindowContext.h"
35 
36 #include "../Parser/ParserNode.h"
37 #include "../Shared/measure.h"
38 
39 #include <algorithm>
40 #include <numeric>
41 
43 extern bool g_enable_bump_allocator;
44 namespace {
45 
46 bool node_is_aggregate(const RelAlgNode* ra) {
47  const auto compound = dynamic_cast<const RelCompound*>(ra);
48  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
49  return ((compound && compound->isAggregate()) || aggregate);
50 }
51 
52 std::unordered_set<PhysicalInput> get_physical_inputs(
53  const Catalog_Namespace::Catalog& cat,
54  const RelAlgNode* ra) {
55  auto phys_inputs = get_physical_inputs(ra);
56  std::unordered_set<PhysicalInput> phys_inputs2;
57  for (auto& phi : phys_inputs) {
58  phys_inputs2.insert(
59  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
60  }
61  return phys_inputs2;
62 }
63 
64 } // namespace
65 
67  const CompilationOptions& co,
68  const ExecutionOptions& eo,
69  RenderInfo* render_info) {
70  auto timer = DEBUG_TIMER(__func__);
72  try {
73  return executeRelAlgQueryNoRetry(query_ra, co, eo, render_info);
74  } catch (const QueryMustRunOnCpu&) {
75  if (!g_allow_cpu_retry) {
76  throw;
77  }
78  }
79  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
81  co.hoist_literals_,
82  co.opt_level_,
84  co.explain_type_,
86  if (render_info) {
87  render_info->setForceNonInSituData();
88  }
89  return executeRelAlgQueryNoRetry(query_ra, co_cpu, eo, render_info);
90 }
91 
93  const CompilationOptions& co,
94  const ExecutionOptions& eo,
95  RenderInfo* render_info) {
97  decltype(subqueries_)().swap(subqueries_);
98 
99  const auto ra = deserialize_ra_dag(
100  query_ra, cat_, this, render_info ? render_info->getRenderQueryOptsPtr() : nullptr);
101 
102  // capture the lock acquistion time
103  auto clock_begin = timer_start();
104  std::lock_guard<std::mutex> lock(executor_->execute_mutex_);
105  int64_t queue_time_ms = timer_stop(clock_begin);
107  executor_->resetInterrupt();
108  }
109  ScopeGuard row_set_holder = [this, &render_info] {
110  if (render_info) {
111  // need to hold onto the RowSetMemOwner for potential
112  // string id lookups during render vega validation
113  render_info->row_set_mem_owner = executor_->row_set_mem_owner_;
114  }
116  };
117  const auto phys_inputs = get_physical_inputs(cat_, ra.get());
118  const auto phys_table_ids = get_physical_table_inputs(ra.get());
119  executor_->setCatalog(&cat_);
120  executor_->setupCaching(phys_inputs, phys_table_ids);
121 
122  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
123  auto ed_seq = RaExecutionSequence(ra.get());
124 
125  if (render_info) {
126  // set render to be non-insitu in certain situations.
128  ed_seq.size() > 1) {
129  // old logic
130  // disallow if more than one ED
131  render_info->setInSituDataIfUnset(false);
132  }
133  }
134 
135  if (eo.find_push_down_candidates) {
136  // this extra logic is mainly due to current limitations on multi-step queries
137  // and/or subqueries.
139  ed_seq, co, eo, render_info, queue_time_ms);
140  }
141 
142  // Dispatch the subqueries first
143  for (auto subquery : subqueries_) {
144  const auto subquery_ra = subquery->getRelAlg();
145  CHECK(subquery_ra);
146  if (subquery_ra->hasContextData()) {
147  continue;
148  }
149  // Execute the subquery and cache the result.
150  RelAlgExecutor ra_executor(executor_, cat_);
151  RaExecutionSequence subquery_seq(subquery_ra);
152  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
153  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
154  }
155  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
156 }
157 
159  AggregatedColRange agg_col_range_cache;
160  const auto phys_inputs = get_physical_inputs(cat_, ra);
161  return executor_->computeColRangesCache(phys_inputs);
162 }
163 
165  const RelAlgNode* ra) {
166  const auto phys_inputs = get_physical_inputs(cat_, ra);
167  return executor_->computeStringDictionaryGenerations(phys_inputs);
168 }
169 
171  const auto phys_table_ids = get_physical_table_inputs(ra);
172  return executor_->computeTableGenerations(phys_table_ids);
173 }
174 
175 Executor* RelAlgExecutor::getExecutor() const {
176  return executor_;
177 }
178 
180  CHECK(executor_);
181  executor_->row_set_mem_owner_ = nullptr;
182  executor_->lit_str_dict_proxy_ = nullptr;
183 }
184 
186  const RaExecutionSequence& seq,
187  const size_t step_idx,
188  const CompilationOptions& co,
189  const ExecutionOptions& eo,
190  RenderInfo* render_info) {
191  INJECT_TIMER(executeRelAlgQueryStep);
192  auto exe_desc_ptr = seq.getDescriptor(step_idx);
193  CHECK(exe_desc_ptr);
194  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
195 
196  size_t shard_count{0};
197  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
198  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
199  };
200 
201  if (sort) {
202  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
204  source_work_unit.exe_unit, *executor_->getCatalog());
205  if (!shard_count) {
206  // No point in sorting on the leaf, only execute the input to the sort node.
207  CHECK_EQ(size_t(1), sort->inputCount());
208  const auto source = sort->getInput(0);
209  if (sort->collationCount() || node_is_aggregate(source)) {
210  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
211  CHECK_EQ(temp_seq.size(), size_t(1));
212  // Use subseq to avoid clearing existing temporary tables
213  return {executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo, nullptr, 0),
214  merge_type(source),
215  source->getId(),
216  false};
217  }
218  }
219  }
220  return {executeRelAlgSubSeq(seq,
221  std::make_pair(step_idx, step_idx + 1),
222  co,
223  eo,
224  render_info,
226  merge_type(exe_desc_ptr->getBody()),
227  exe_desc_ptr->getBody()->getId(),
228  false};
229 }
230 
232  const AggregatedColRange& agg_col_range,
233  const StringDictionaryGenerations& string_dictionary_generations,
234  const TableGenerations& table_generations) {
235  // capture the lock acquistion time
236  auto clock_begin = timer_start();
238  executor_->resetInterrupt();
239  }
240  queue_time_ms_ = timer_stop(clock_begin);
241  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
242  executor_->table_generations_ = table_generations;
243  executor_->agg_col_range_cache_ = agg_col_range;
244  executor_->string_dictionary_generations_ = string_dictionary_generations;
245 }
246 
248  const CompilationOptions& co,
249  const ExecutionOptions& eo,
250  RenderInfo* render_info,
251  const int64_t queue_time_ms,
252  const bool with_existing_temp_tables) {
254  if (!with_existing_temp_tables) {
255  decltype(temporary_tables_)().swap(temporary_tables_);
256  }
257  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
258  executor_->catalog_ = &cat_;
259  executor_->temporary_tables_ = &temporary_tables_;
260 
261  time(&now_);
262  CHECK(!seq.empty());
263  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
264 
265  for (size_t i = 0; i < exec_desc_count; i++) {
266  // only render on the last step
267  executeRelAlgStep(seq,
268  i,
269  co,
270  eo,
271  (i == exec_desc_count - 1) ? render_info : nullptr,
272  queue_time_ms);
273  }
274 
275  return seq.getDescriptor(exec_desc_count - 1)->getResult();
276 }
277 
279  const RaExecutionSequence& seq,
280  const std::pair<size_t, size_t> interval,
281  const CompilationOptions& co,
282  const ExecutionOptions& eo,
283  RenderInfo* render_info,
284  const int64_t queue_time_ms) {
286  executor_->catalog_ = &cat_;
287  executor_->temporary_tables_ = &temporary_tables_;
288 
289  time(&now_);
290  CHECK(!eo.just_explain);
291 
292  for (size_t i = interval.first; i < interval.second; i++) {
293  // only render on the last step
294  executeRelAlgStep(seq,
295  i,
296  co,
297  eo,
298  (i == interval.second - 1) ? render_info : nullptr,
299  queue_time_ms);
300  }
301 
302  return seq.getDescriptor(interval.second - 1)->getResult();
303 }
304 
306  const size_t step_idx,
307  const CompilationOptions& co,
308  const ExecutionOptions& eo,
309  RenderInfo* render_info,
310  const int64_t queue_time_ms) {
313  auto exec_desc_ptr = seq.getDescriptor(step_idx);
314  CHECK(exec_desc_ptr);
315  auto& exec_desc = *exec_desc_ptr;
316  const auto body = exec_desc.getBody();
317  if (body->isNop()) {
318  handleNop(exec_desc);
319  return;
320  }
321  const ExecutionOptions eo_work_unit{
323  eo.allow_multifrag,
324  eo.just_explain,
325  eo.allow_loop_joins,
326  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
327  eo.jit_debug,
328  eo.just_validate,
334 
335  const auto compound = dynamic_cast<const RelCompound*>(body);
336  if (compound) {
337  if (compound->isDeleteViaSelect()) {
338  executeDeleteViaCompound(compound, co, eo_work_unit, render_info, queue_time_ms);
339  } else if (compound->isUpdateViaSelect()) {
340  executeUpdateViaCompound(compound, co, eo_work_unit, render_info, queue_time_ms);
341  } else {
342  exec_desc.setResult(
343  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
344  if (exec_desc.getResult().isFilterPushDownEnabled()) {
345  return;
346  }
347  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
348  }
349  return;
350  }
351  const auto project = dynamic_cast<const RelProject*>(body);
352  if (project) {
353  if (project->isDeleteViaSelect()) {
354  executeDeleteViaProject(project, co, eo_work_unit, render_info, queue_time_ms);
355  } else if (project->isUpdateViaSelect()) {
356  executeUpdateViaProject(project, co, eo_work_unit, render_info, queue_time_ms);
357  } else {
358  ssize_t prev_count = -1;
359  // Disabling the intermediate count optimization in distributed, as the previous
360  // execution descriptor will likely not hold the aggregated result.
361  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
362  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
363  CHECK(prev_exec_desc);
364  if (dynamic_cast<const RelCompound*>(prev_exec_desc->getBody())) {
365  const auto& prev_exe_result = prev_exec_desc->getResult();
366  const auto prev_result = prev_exe_result.getRows();
367  if (prev_result) {
368  prev_count = static_cast<ssize_t>(prev_result->rowCount());
369  }
370  }
371  }
372  exec_desc.setResult(executeProject(
373  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
374  if (exec_desc.getResult().isFilterPushDownEnabled()) {
375  return;
376  }
377  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
378  }
379  return;
380  }
381  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
382  if (aggregate) {
383  exec_desc.setResult(
384  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
385  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
386  return;
387  }
388  const auto filter = dynamic_cast<const RelFilter*>(body);
389  if (filter) {
390  exec_desc.setResult(
391  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
392  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
393  return;
394  }
395  const auto sort = dynamic_cast<const RelSort*>(body);
396  if (sort) {
397  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
398  if (exec_desc.getResult().isFilterPushDownEnabled()) {
399  return;
400  }
401  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
402  return;
403  }
404  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
405  if (logical_values) {
406  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
407  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
408  return;
409  }
410  const auto modify = dynamic_cast<const RelModify*>(body);
411  if (modify) {
412  exec_desc.setResult(executeModify(modify, eo_work_unit));
413  return;
414  }
415  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
416  if (table_func) {
417  exec_desc.setResult(
418  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
419  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
420  return;
421  }
422  CHECK(false);
423 }
424 
426  // just set the result of the previous node as the result of no op
427  auto body = ed.getBody();
428  CHECK(dynamic_cast<const RelAggregate*>(body));
429  CHECK_EQ(size_t(1), body->inputCount());
430  const auto input = body->getInput(0);
431  body->setOutputMetainfo(input->getOutputMetainfo());
432  const auto it = temporary_tables_.find(-input->getId());
433  CHECK(it != temporary_tables_.end());
434  // set up temp table as it could be used by the outer query or next step
435  addTemporaryTable(-body->getId(), it->second);
436 
437  ed.setResult({it->second, input->getOutputMetainfo()});
438 }
439 
440 namespace {
441 
442 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
443  public:
445 
446  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
447  return synthesized_physical_inputs_owned;
448  }
449 
450  std::unordered_set<const RexInput*> visitInput(
451  const RexInput* rex_input) const override {
452  const auto input_ra = rex_input->getSourceNode();
453  CHECK(input_ra);
454  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
455  if (scan_ra) {
456  const auto td = scan_ra->getTableDescriptor();
457  if (td) {
458  const auto col_id = rex_input->getIndex();
459  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
460  if (cd && cd->columnType.get_physical_cols() > 0) {
461  CHECK(IS_GEO(cd->columnType.get_type()));
462  std::unordered_set<const RexInput*> synthesized_physical_inputs;
463  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
464  auto physical_input =
465  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
466  synthesized_physical_inputs_owned.emplace_back(physical_input);
467  synthesized_physical_inputs.insert(physical_input);
468  }
469  return synthesized_physical_inputs;
470  }
471  }
472  }
473  return {rex_input};
474  }
475 
476  protected:
477  std::unordered_set<const RexInput*> aggregateResult(
478  const std::unordered_set<const RexInput*>& aggregate,
479  const std::unordered_set<const RexInput*>& next_result) const override {
480  auto result = aggregate;
481  result.insert(next_result.begin(), next_result.end());
482  return result;
483  }
484 
485  private:
486  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
488 };
489 
490 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
491  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
492  CHECK_EQ(size_t(2), join->inputCount());
493  return join;
494  }
495  CHECK_EQ(size_t(1), ra_node->inputCount());
496  auto only_src = ra_node->getInput(0);
497  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
498  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
499  return is_join ? only_src : ra_node;
500 }
501 
502 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
504  RexUsedInputsVisitor visitor(cat);
505  const auto filter_expr = compound->getFilterExpr();
506  std::unordered_set<const RexInput*> used_inputs =
507  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
508  const auto sources_size = compound->getScalarSourcesSize();
509  for (size_t i = 0; i < sources_size; ++i) {
510  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
511  used_inputs.insert(source_inputs.begin(), source_inputs.end());
512  }
513  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
514  return std::make_pair(used_inputs, used_inputs_owned);
515 }
516 
517 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
519  CHECK_EQ(size_t(1), aggregate->inputCount());
520  std::unordered_set<const RexInput*> used_inputs;
521  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
522  const auto source = aggregate->getInput(0);
523  const auto& in_metainfo = source->getOutputMetainfo();
524  const auto group_count = aggregate->getGroupByCount();
525  CHECK_GE(in_metainfo.size(), group_count);
526  for (size_t i = 0; i < group_count; ++i) {
527  auto synthesized_used_input = new RexInput(source, i);
528  used_inputs_owned.emplace_back(synthesized_used_input);
529  used_inputs.insert(synthesized_used_input);
530  }
531  for (const auto& agg_expr : aggregate->getAggExprs()) {
532  for (size_t i = 0; i < agg_expr->size(); ++i) {
533  const auto operand_idx = agg_expr->getOperand(i);
534  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
535  auto synthesized_used_input = new RexInput(source, operand_idx);
536  used_inputs_owned.emplace_back(synthesized_used_input);
537  used_inputs.insert(synthesized_used_input);
538  }
539  }
540  return std::make_pair(used_inputs, used_inputs_owned);
541 }
542 
543 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
545  RexUsedInputsVisitor visitor(cat);
546  std::unordered_set<const RexInput*> used_inputs;
547  for (size_t i = 0; i < project->size(); ++i) {
548  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
549  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
550  }
551  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
552  return std::make_pair(used_inputs, used_inputs_owned);
553 }
554 
555 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
557  const Catalog_Namespace::Catalog& cat) {
558  RexUsedInputsVisitor visitor(cat);
559  std::unordered_set<const RexInput*> used_inputs;
560  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
561  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
562  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
563  }
564  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
565  return std::make_pair(used_inputs, used_inputs_owned);
566 }
567 
568 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
570  std::unordered_set<const RexInput*> used_inputs;
571  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
572  const auto data_sink_node = get_data_sink(filter);
573  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
574  const auto source = data_sink_node->getInput(nest_level);
575  const auto scan_source = dynamic_cast<const RelScan*>(source);
576  if (scan_source) {
577  CHECK(source->getOutputMetainfo().empty());
578  for (size_t i = 0; i < scan_source->size(); ++i) {
579  auto synthesized_used_input = new RexInput(scan_source, i);
580  used_inputs_owned.emplace_back(synthesized_used_input);
581  used_inputs.insert(synthesized_used_input);
582  }
583  } else {
584  const auto& partial_in_metadata = source->getOutputMetainfo();
585  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
586  auto synthesized_used_input = new RexInput(source, i);
587  used_inputs_owned.emplace_back(synthesized_used_input);
588  used_inputs.insert(synthesized_used_input);
589  }
590  }
591  }
592  return std::make_pair(used_inputs, used_inputs_owned);
593 }
594 
595 int table_id_from_ra(const RelAlgNode* ra_node) {
596  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
597  if (scan_ra) {
598  const auto td = scan_ra->getTableDescriptor();
599  CHECK(td);
600  return td->tableId;
601  }
602  return -ra_node->getId();
603 }
604 
605 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
606  const RelAlgNode* ra_node,
607  const std::vector<size_t>& input_permutation) {
608  const auto data_sink_node = get_data_sink(ra_node);
609  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
610  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
611  const auto input_node_idx =
612  input_permutation.empty() ? input_idx : input_permutation[input_idx];
613  const auto input_ra = data_sink_node->getInput(input_node_idx);
614  const auto it_ok = input_to_nest_level.emplace(input_ra, input_idx);
615  CHECK(it_ok.second);
616  LOG_IF(INFO, !input_permutation.empty())
617  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
618  }
619  return input_to_nest_level;
620 }
621 
622 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
624  const Catalog_Namespace::Catalog& cat) {
625  const auto data_sink_node = get_data_sink(ra_node);
626  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
627  CHECK_EQ(join->inputCount(), 2u);
628  const auto condition = join->getCondition();
629  RexUsedInputsVisitor visitor(cat);
630  auto condition_inputs = visitor.visit(condition);
631  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
632  visitor.get_inputs_owned());
633  return std::make_pair(condition_inputs, condition_inputs_owned);
634  }
635 
636  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
637  CHECK_GE(left_deep_join->inputCount(), 2u);
638  const auto condition = left_deep_join->getInnerCondition();
639  RexUsedInputsVisitor visitor(cat);
640  auto result = visitor.visit(condition);
641  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
642  ++nesting_level) {
643  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
644  if (outer_condition) {
645  const auto outer_result = visitor.visit(outer_condition);
646  result.insert(outer_result.begin(), outer_result.end());
647  }
648  }
649  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
650  return std::make_pair(result, used_inputs_owned);
651  }
652 
653  CHECK_EQ(ra_node->inputCount(), 1u);
654  return std::make_pair(std::unordered_set<const RexInput*>{},
655  std::vector<std::shared_ptr<RexInput>>{});
656 }
657 
658 std::vector<const RelAlgNode*> get_non_join_sequence(const RelAlgNode* ra) {
659  std::vector<const RelAlgNode*> seq;
660  for (auto join = dynamic_cast<const RelJoin*>(ra); join;
661  join = static_cast<const RelJoin*>(join->getInput(0))) {
662  CHECK_EQ(size_t(2), join->inputCount());
663  seq.emplace_back(join->getInput(1));
664  auto lhs = join->getInput(0);
665  if (!dynamic_cast<const RelJoin*>(lhs)) {
666  seq.emplace_back(lhs);
667  break;
668  }
669  }
670  std::reverse(seq.begin(), seq.end());
671  return seq;
672 }
673 
675  std::vector<InputDescriptor>& input_descs,
676  const Catalog_Namespace::Catalog& cat,
677  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
678  const RelAlgNode* ra_node,
679  const std::unordered_set<const RexInput*>& source_used_inputs,
680  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
681  std::unordered_set<InputDescriptor> input_descs_unique(input_descs.begin(),
682  input_descs.end());
683  const auto non_join_src_seq = get_non_join_sequence(get_data_sink(ra_node));
684  std::unordered_map<const RelAlgNode*, int> non_join_to_nest_level;
685  for (const auto node : non_join_src_seq) {
686  non_join_to_nest_level.insert(std::make_pair(node, non_join_to_nest_level.size()));
687  }
688  for (const auto used_input : source_used_inputs) {
689  const auto input_ra = used_input->getSourceNode();
690  const int table_id = table_id_from_ra(input_ra);
691  const auto col_id = used_input->getIndex();
692  auto it = input_to_nest_level.find(input_ra);
693  if (it == input_to_nest_level.end()) {
694  throw std::runtime_error("Bushy joins not supported");
695  }
696  const int input_desc = it->second;
697  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
698  dynamic_cast<const RelScan*>(input_ra)
699  ? cat.getColumnIdBySpi(table_id, col_id + 1)
700  : col_id,
701  table_id,
702  input_desc));
703  }
704 }
705 
706 template <class RA>
707 std::pair<std::vector<InputDescriptor>,
708  std::list<std::shared_ptr<const InputColDescriptor>>>
709 get_input_desc_impl(const RA* ra_node,
710  const std::unordered_set<const RexInput*>& used_inputs,
711  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
712  const std::vector<size_t>& input_permutation,
713  const Catalog_Namespace::Catalog& cat) {
714  std::vector<InputDescriptor> input_descs;
715  const auto data_sink_node = get_data_sink(ra_node);
716  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
717  const auto input_node_idx =
718  input_permutation.empty() ? input_idx : input_permutation[input_idx];
719  const auto input_ra = data_sink_node->getInput(input_node_idx);
720  const int table_id = table_id_from_ra(input_ra);
721  input_descs.emplace_back(table_id, input_idx);
722  }
723  std::sort(input_descs.begin(),
724  input_descs.end(),
725  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
726  return lhs.getNestLevel() < rhs.getNestLevel();
727  });
728  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
729  collect_used_input_desc(input_descs,
730  cat,
731  input_col_descs_unique,
732  ra_node,
733  used_inputs,
734  input_to_nest_level);
735  std::unordered_set<const RexInput*> join_source_used_inputs;
736  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
737  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
738  get_join_source_used_inputs(ra_node, cat);
739  collect_used_input_desc(input_descs,
740  cat,
741  input_col_descs_unique,
742  ra_node,
743  join_source_used_inputs,
744  input_to_nest_level);
745  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
746  input_col_descs_unique.begin(), input_col_descs_unique.end());
747 
748  std::sort(
749  input_col_descs.begin(),
750  input_col_descs.end(),
751  [](std::shared_ptr<const InputColDescriptor> const& lhs,
752  std::shared_ptr<const InputColDescriptor> const& rhs) {
753  if (lhs->getScanDesc().getNestLevel() == rhs->getScanDesc().getNestLevel()) {
754  return lhs->getColId() < rhs->getColId();
755  }
756  return lhs->getScanDesc().getNestLevel() < rhs->getScanDesc().getNestLevel();
757  });
758  return {input_descs,
759  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
760  input_col_descs.end())};
761 }
762 
763 template <class RA>
764 std::tuple<std::vector<InputDescriptor>,
765  std::list<std::shared_ptr<const InputColDescriptor>>,
766  std::vector<std::shared_ptr<RexInput>>>
767 get_input_desc(const RA* ra_node,
768  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
769  const std::vector<size_t>& input_permutation,
770  const Catalog_Namespace::Catalog& cat) {
771  std::unordered_set<const RexInput*> used_inputs;
772  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
773  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
774  auto input_desc_pair = get_input_desc_impl(
775  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
776  return std::make_tuple(
777  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
778 }
779 
780 size_t get_scalar_sources_size(const RelCompound* compound) {
781  return compound->getScalarSourcesSize();
782 }
783 
784 size_t get_scalar_sources_size(const RelProject* project) {
785  return project->size();
786 }
787 
788 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
789  return table_func->getTableFuncInputsSize();
790 }
791 
792 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
793  return compound->getScalarSource(i);
794 }
795 
796 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
797  return project->getProjectAt(i);
798 }
799 
800 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
801  return table_func->getTableFuncInputAt(i);
802 }
803 
804 std::shared_ptr<Analyzer::Expr> set_transient_dict(
805  const std::shared_ptr<Analyzer::Expr> expr) {
806  const auto& ti = expr->get_type_info();
807  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
808  return expr;
809  }
810  auto transient_dict_ti = ti;
811  transient_dict_ti.set_compression(kENCODING_DICT);
812  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
813  transient_dict_ti.set_fixed_size();
814  return expr->add_cast(transient_dict_ti);
815 }
816 
818  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
819  const std::shared_ptr<Analyzer::Expr>& expr) {
820  try {
821  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
822  } catch (...) {
823  scalar_sources.push_back(fold_expr(expr.get()));
824  }
825 }
826 
827 template <class RA>
828 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
829  const RA* ra_node,
830  const RelAlgTranslator& translator) {
831  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
832  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
833  const auto scalar_rex = scalar_at(i, ra_node);
834  if (dynamic_cast<const RexRef*>(scalar_rex)) {
835  // RexRef are synthetic scalars we append at the end of the real ones
836  // for the sake of taking memory ownership, no real work needed here.
837  continue;
838  }
839 
840  const auto scalar_expr =
841  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
842  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
843  set_transient_dict_maybe(scalar_sources, rewritten_expr);
844  }
845 
846  return scalar_sources;
847 }
848 
849 std::shared_ptr<Analyzer::Expr> cast_to_column_type(std::shared_ptr<Analyzer::Expr> expr,
850  int32_t tableId,
851  const Catalog_Namespace::Catalog& cat,
852  const std::string& colName) {
853  const auto cd = *cat.getMetadataForColumn(tableId, colName);
854 
855  auto cast_ti = cd.columnType;
856 
857  // Type needs to be scrubbed because otherwise NULL values could get cut off or
858  // truncated
859  auto cast_logical_ti = get_logical_type_info(cast_ti);
860  if (cast_logical_ti.is_varlen() && cast_logical_ti.is_array()) {
861  return expr;
862  }
863 
864  // CastIR.cpp Executor::codegenCast() doesn't know how to cast from a ColumnVar
865  // so it CHECK's unless casting is skipped here.
866  if (std::dynamic_pointer_cast<Analyzer::ColumnVar>(expr)) {
867  return expr;
868  }
869 
870  // Cast the expression to match the type of the output column.
871  return expr->add_cast(cast_logical_ti);
872 }
873 
874 template <class RA>
875 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
876  const RA* ra_node,
877  const RelAlgTranslator& translator,
878  int32_t tableId,
879  const Catalog_Namespace::Catalog& cat,
880  const ColumnNameList& colNames,
881  size_t starting_projection_column_idx) {
882  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
883  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
884  const auto scalar_rex = scalar_at(i, ra_node);
885  if (dynamic_cast<const RexRef*>(scalar_rex)) {
886  // RexRef are synthetic scalars we append at the end of the real ones
887  // for the sake of taking memory ownership, no real work needed here.
888  continue;
889  }
890 
891  std::shared_ptr<Analyzer::Expr> translated_expr;
892  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
893  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
894  tableId,
895  cat,
896  colNames[i - starting_projection_column_idx]);
897  } else {
898  translated_expr = translator.translateScalarRex(scalar_rex);
899  }
900  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
901  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
902  set_transient_dict_maybe(scalar_sources, rewritten_expr);
903  }
904 
905  return scalar_sources;
906 }
907 
908 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
909  const RelCompound* compound,
910  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
911  if (!compound->isAggregate()) {
912  return {nullptr};
913  }
914  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
915  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
916  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
917  }
918  return groupby_exprs;
919 }
920 
921 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
922  const RelAggregate* aggregate,
923  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
924  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
925  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
926  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
927  }
928  return groupby_exprs;
929 }
930 
932  const RelAlgTranslator& translator) {
933  const auto filter_rex = compound->getFilterExpr();
934  const auto filter_expr =
935  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
936  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
938 }
939 
940 std::vector<Analyzer::Expr*> translate_targets(
941  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
942  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
943  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
944  const RelCompound* compound,
945  const RelAlgTranslator& translator) {
946  std::vector<Analyzer::Expr*> target_exprs;
947  for (size_t i = 0; i < compound->size(); ++i) {
948  const auto target_rex = compound->getTargetExpr(i);
949  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
950  std::shared_ptr<Analyzer::Expr> target_expr;
951  if (target_rex_agg) {
952  target_expr =
953  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
954  } else {
955  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
956  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
957  if (target_rex_ref) {
958  const auto ref_idx = target_rex_ref->getIndex();
959  CHECK_GE(ref_idx, size_t(1));
960  CHECK_LE(ref_idx, groupby_exprs.size());
961  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
962  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
963  } else {
964  target_expr = translator.translateScalarRex(target_rex_scalar);
965  auto rewritten_expr = rewrite_expr(target_expr.get());
966  target_expr = fold_expr(rewritten_expr.get());
967  try {
968  target_expr = set_transient_dict(target_expr);
969  } catch (...) {
970  // noop
971  }
972  }
973  }
974  CHECK(target_expr);
975  target_exprs_owned.push_back(target_expr);
976  target_exprs.push_back(target_expr.get());
977  }
978  return target_exprs;
979 }
980 
981 std::vector<Analyzer::Expr*> translate_targets(
982  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
983  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
984  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
985  const RelAggregate* aggregate,
986  const RelAlgTranslator& translator) {
987  std::vector<Analyzer::Expr*> target_exprs;
988  size_t group_key_idx = 0;
989  for (const auto& groupby_expr : groupby_exprs) {
990  auto target_expr =
991  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
992  target_exprs_owned.push_back(target_expr);
993  target_exprs.push_back(target_expr.get());
994  }
995 
996  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
997  auto target_expr =
998  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
999  CHECK(target_expr);
1000  target_expr = fold_expr(target_expr.get());
1001  target_exprs_owned.push_back(target_expr);
1002  target_exprs.push_back(target_expr.get());
1003  }
1004  return target_exprs;
1005 }
1006 
1007 std::vector<Analyzer::Expr*> translate_targets_for_update(
1008  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1009  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1010  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1011  const RelCompound* compound,
1012  const RelAlgTranslator& translator,
1013  int32_t tableId,
1014  const Catalog_Namespace::Catalog& cat,
1015  const ColumnNameList& colNames,
1016  size_t starting_projection_column_idx) {
1017  std::vector<Analyzer::Expr*> target_exprs;
1018  for (size_t i = 0; i < compound->size(); ++i) {
1019  const auto target_rex = compound->getTargetExpr(i);
1020  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1021  std::shared_ptr<Analyzer::Expr> target_expr;
1022  if (target_rex_agg) {
1023  target_expr =
1024  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1025  } else {
1026  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1027  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1028  if (target_rex_ref) {
1029  const auto ref_idx = target_rex_ref->getIndex();
1030  CHECK_GE(ref_idx, size_t(1));
1031  CHECK_LE(ref_idx, groupby_exprs.size());
1032  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1033  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1034  } else {
1035  if (i >= starting_projection_column_idx &&
1036  i < get_scalar_sources_size(compound) - 1) {
1037  target_expr =
1038  cast_to_column_type(translator.translateScalarRex(target_rex_scalar),
1039  tableId,
1040  cat,
1041  colNames[i - starting_projection_column_idx]);
1042  } else {
1043  target_expr = translator.translateScalarRex(target_rex_scalar);
1044  }
1045  auto rewritten_expr = rewrite_expr(target_expr.get());
1046  target_expr = fold_expr(rewritten_expr.get());
1047  }
1048  }
1049  CHECK(target_expr);
1050  target_exprs_owned.push_back(target_expr);
1051  target_exprs.push_back(target_expr.get());
1052  }
1053  return target_exprs;
1054 }
1055 
1057  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1058  return agg_expr && agg_expr->get_is_distinct();
1059 }
1060 
1061 bool is_agg(const Analyzer::Expr* expr) {
1062  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1063  if (agg_expr && agg_expr->get_contains_agg()) {
1064  auto agg_type = agg_expr->get_aggtype();
1065  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1066  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1067  return true;
1068  }
1069  }
1070  return false;
1071 }
1072 
1073 std::vector<TargetMetaInfo> get_modify_manipulated_targets_meta(
1074  ModifyManipulationTarget const* manip_node,
1075  const std::vector<Analyzer::Expr*>& target_exprs) {
1076  std::vector<TargetMetaInfo> targets_meta;
1077 
1078  for (int i = 0; i < (manip_node->getTargetColumnCount()); ++i) {
1079  CHECK(target_exprs[i]);
1080  // TODO(alex): remove the count distinct type fixup.
1081  targets_meta.emplace_back(manip_node->getTargetColumns()[i],
1082  is_count_distinct(target_exprs[i])
1083  ? SQLTypeInfo(kBIGINT, false)
1084  : target_exprs[i]->get_type_info());
1085  }
1086 
1087  return targets_meta;
1088 }
1089 
1091  if (is_count_distinct(&expr)) {
1092  return SQLTypeInfo(kBIGINT, false);
1093  } else if (is_agg(&expr)) {
1095  }
1096  return get_logical_type_info(expr.get_type_info());
1097 }
1098 
1099 template <class RA>
1100 std::vector<TargetMetaInfo> get_targets_meta(
1101  const RA* ra_node,
1102  const std::vector<Analyzer::Expr*>& target_exprs) {
1103  std::vector<TargetMetaInfo> targets_meta;
1104  for (size_t i = 0; i < ra_node->size(); ++i) {
1105  CHECK(target_exprs[i]);
1106  // TODO(alex): remove the count distinct type fixup.
1107  targets_meta.emplace_back(ra_node->getFieldName(i),
1108  get_logical_type_for_expr(*target_exprs[i]),
1109  target_exprs[i]->get_type_info());
1110  }
1111  return targets_meta;
1112 }
1113 
1114 } // namespace
1115 
1117  const CompilationOptions& co,
1118  const ExecutionOptions& eo,
1119  RenderInfo* render_info,
1120  const int64_t queue_time_ms) {
1121  if (!compound->validateTargetColumns(
1123  throw std::runtime_error(
1124  "Unsupported update operation encountered. (None-encoded string column updates "
1125  "are not supported.)");
1126  }
1127 
1128  const auto work_unit = createCompoundWorkUnit(
1129  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1130  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1131  CompilationOptions co_project = co;
1133 
1134  try {
1136 
1137  UpdateTransactionParameters update_params(compound->getModifiedTableDescriptor(),
1138  compound->getTargetColumns(),
1139  compound->getOutputMetainfo(),
1140  compound->isVarlenUpdateRequired());
1141  auto update_callback = yieldUpdateCallback(update_params);
1142  executor_->executeUpdate(work_unit.exe_unit,
1143  table_infos,
1144  co_project,
1145  eo,
1146  cat_,
1147  executor_->row_set_mem_owner_,
1148  update_callback,
1149  compound->isAggregate());
1150  update_params.finalizeTransaction();
1151  } catch (...) {
1152  LOG(INFO) << "Update operation failed.";
1153  throw;
1154  }
1155 }
1156 
1158  const CompilationOptions& co,
1159  const ExecutionOptions& eo,
1160  RenderInfo* render_info,
1161  const int64_t queue_time_ms) {
1162  if (!project->validateTargetColumns(
1164  throw std::runtime_error(
1165  "Unsupported update operation encountered. (None-encoded string column updates "
1166  "are not supported.)");
1167  }
1168 
1169  auto work_unit =
1170  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1171  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1172  CompilationOptions co_project = co;
1174 
1175  if (project->isSimple()) {
1176  CHECK_EQ(size_t(1), project->inputCount());
1177  const auto input_ra = project->getInput(0);
1178  if (dynamic_cast<const RelSort*>(input_ra)) {
1179  const auto& input_table =
1180  get_temporary_table(&temporary_tables_, -input_ra->getId());
1181  CHECK(input_table);
1182  work_unit.exe_unit.scan_limit = input_table->rowCount();
1183  }
1184  }
1185 
1186  try {
1188 
1189  UpdateTransactionParameters update_params(project->getModifiedTableDescriptor(),
1190  project->getTargetColumns(),
1191  project->getOutputMetainfo(),
1192  project->isVarlenUpdateRequired());
1193  auto update_callback = yieldUpdateCallback(update_params);
1194  executor_->executeUpdate(work_unit.exe_unit,
1195  table_infos,
1196  co_project,
1197  eo,
1198  cat_,
1199  executor_->row_set_mem_owner_,
1200  update_callback);
1201  update_params.finalizeTransaction();
1202  } catch (...) {
1203  LOG(INFO) << "Update operation failed.";
1204  throw;
1205  }
1206 }
1207 
1209  const CompilationOptions& co,
1210  const ExecutionOptions& eo,
1211  RenderInfo* render_info,
1212  const int64_t queue_time_ms) {
1213  auto* table_descriptor = compound->getModifiedTableDescriptor();
1214  if (!table_descriptor->hasDeletedCol) {
1215  throw std::runtime_error(
1216  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1217  }
1218 
1219  const auto work_unit = createModifyCompoundWorkUnit(
1220  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1221  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1222  CompilationOptions co_project = co;
1224 
1225  try {
1227 
1228  DeleteTransactionParameters delete_params;
1229  auto delete_callback = yieldDeleteCallback(delete_params);
1230 
1231  executor_->executeUpdate(work_unit.exe_unit,
1232  table_infos,
1233  co_project,
1234  eo,
1235  cat_,
1236  executor_->row_set_mem_owner_,
1237  delete_callback);
1238  delete_params.finalizeTransaction();
1239  } catch (...) {
1240  LOG(INFO) << "Delete operation failed.";
1241  throw;
1242  }
1243 }
1244 
1246  const CompilationOptions& co,
1247  const ExecutionOptions& eo,
1248  RenderInfo* render_info,
1249  const int64_t queue_time_ms) {
1250  auto* table_descriptor = project->getModifiedTableDescriptor();
1251  if (!table_descriptor->hasDeletedCol) {
1252  throw std::runtime_error(
1253  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1254  }
1255 
1256  auto work_unit = createModifyProjectWorkUnit(
1257  project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1258  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1259  CompilationOptions co_project = co;
1261 
1262  if (project->isSimple()) {
1263  CHECK_EQ(size_t(1), project->inputCount());
1264  const auto input_ra = project->getInput(0);
1265  if (dynamic_cast<const RelSort*>(input_ra)) {
1266  const auto& input_table =
1267  get_temporary_table(&temporary_tables_, -input_ra->getId());
1268  CHECK(input_table);
1269  work_unit.exe_unit.scan_limit = input_table->rowCount();
1270  }
1271  }
1272 
1273  try {
1275 
1276  DeleteTransactionParameters delete_params;
1277  auto delete_callback = yieldDeleteCallback(delete_params);
1278 
1279  executor_->executeUpdate(work_unit.exe_unit,
1280  table_infos,
1281  co_project,
1282  eo,
1283  cat_,
1284  executor_->row_set_mem_owner_,
1285  delete_callback);
1286  delete_params.finalizeTransaction();
1287  } catch (...) {
1288  LOG(INFO) << "Delete operation failed.";
1289  throw;
1290  }
1291 }
1292 
1294  const CompilationOptions& co,
1295  const ExecutionOptions& eo,
1296  RenderInfo* render_info,
1297  const int64_t queue_time_ms) {
1298  const auto work_unit = createCompoundWorkUnit(
1299  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1300  CompilationOptions co_compound = co;
1301  return executeWorkUnit(work_unit,
1302  compound->getOutputMetainfo(),
1303  compound->isAggregate(),
1304  co_compound,
1305  eo,
1306  render_info,
1307  queue_time_ms);
1308 }
1309 
1311  const CompilationOptions& co,
1312  const ExecutionOptions& eo,
1313  RenderInfo* render_info,
1314  const int64_t queue_time_ms) {
1315  const auto work_unit = createAggregateWorkUnit(
1316  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1317  return executeWorkUnit(work_unit,
1318  aggregate->getOutputMetainfo(),
1319  true,
1320  co,
1321  eo,
1322  render_info,
1323  queue_time_ms);
1324 }
1325 
1326 namespace {
1327 
1328 // Returns true iff the execution unit contains window functions.
1330  return std::any_of(ra_exe_unit.target_exprs.begin(),
1331  ra_exe_unit.target_exprs.end(),
1332  [](const Analyzer::Expr* expr) {
1333  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1334  });
1335 }
1336 
1337 } // namespace
1338 
1340  const CompilationOptions& co,
1341  const ExecutionOptions& eo,
1342  RenderInfo* render_info,
1343  const int64_t queue_time_ms,
1344  const ssize_t previous_count) {
1345  auto work_unit =
1346  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1347  CompilationOptions co_project = co;
1348  if (project->isSimple()) {
1349  CHECK_EQ(size_t(1), project->inputCount());
1350  const auto input_ra = project->getInput(0);
1351  if (dynamic_cast<const RelSort*>(input_ra)) {
1352  co_project.device_type_ = ExecutorDeviceType::CPU;
1353  const auto& input_table =
1354  get_temporary_table(&temporary_tables_, -input_ra->getId());
1355  CHECK(input_table);
1356  work_unit.exe_unit.scan_limit =
1357  std::min(input_table->getLimit(), input_table->rowCount());
1358  }
1359  }
1360  return executeWorkUnit(work_unit,
1361  project->getOutputMetainfo(),
1362  false,
1363  co_project,
1364  eo,
1365  render_info,
1366  queue_time_ms,
1367  previous_count);
1368 }
1369 
1371  const CompilationOptions& co_in,
1372  const ExecutionOptions& eo,
1373  const int64_t queue_time_ms) {
1375 
1376  auto co = co_in;
1377 
1378  if (g_cluster) {
1379  throw std::runtime_error("Table functions not supported in distributed mode yet");
1380  }
1381  if (!g_enable_table_functions) {
1382  throw std::runtime_error("Table function support is disabled");
1383  }
1384 
1385  auto table_func_work_unit = createTableFunctionWorkUnit(table_func, eo.just_explain);
1386  const auto body = table_func_work_unit.body;
1387  CHECK(body);
1388 
1389  const auto table_infos =
1390  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1391 
1392  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1393  co.device_type_,
1395  nullptr,
1396  executor_),
1397  {}};
1398 
1399  try {
1400  result = {executor_->executeTableFunction(
1401  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1402  body->getOutputMetainfo()};
1403  } catch (const QueryExecutionError& e) {
1406  throw std::runtime_error("Table function ran out of memory during execution");
1407  }
1408  result.setQueueTime(queue_time_ms);
1409  return result;
1410 }
1411 
1412 namespace {
1413 
1414 // Creates a new expression which has the range table index set to 1. This is needed to
1415 // reuse the hash join construction helpers to generate a hash table for the window
1416 // function partition: create an equals expression with left and right sides identical
1417 // except for the range table index.
1418 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1419  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1420  if (tuple) {
1421  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1422  for (const auto& element : tuple->getTuple()) {
1423  transformed_tuple.push_back(transform_to_inner(element.get()));
1424  }
1425  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1426  }
1427  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1428  if (!col) {
1429  throw std::runtime_error("Only columns supported in the window partition for now");
1430  }
1431  return makeExpr<Analyzer::ColumnVar>(
1432  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1433 }
1434 
1435 } // namespace
1436 
1438  const CompilationOptions& co,
1439  const ExecutionOptions& eo,
1440  ColumnCacheMap& column_cache_map,
1441  const int64_t queue_time_ms) {
1442  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1443  CHECK_EQ(query_infos.size(), size_t(1));
1444  if (query_infos.front().info.fragments.size() != 1) {
1445  throw std::runtime_error(
1446  "Only single fragment tables supported for window functions for now");
1447  }
1448  query_infos.push_back(query_infos.front());
1449  auto window_project_node_context = WindowProjectNodeContext::create();
1450  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1451  ++target_index) {
1452  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1453  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1454  if (!window_func) {
1455  continue;
1456  }
1457  // Always use baseline layout hash tables for now, make the expression a tuple.
1458  const auto& partition_keys = window_func->getPartitionKeys();
1459  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1460  if (partition_keys.size() > 1) {
1461  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1462  } else {
1463  if (partition_keys.empty()) {
1464  throw std::runtime_error(
1465  "Empty window function partitions are not supported yet");
1466  }
1467  CHECK_EQ(partition_keys.size(), size_t(1));
1468  partition_key_tuple = partition_keys.front();
1469  }
1470  // Creates a tautology equality with the partition expression on both sides.
1471  const auto partition_key_cond =
1472  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1473  kBW_EQ,
1474  kONE,
1475  partition_key_tuple,
1476  transform_to_inner(partition_key_tuple.get()));
1477  auto context = createWindowFunctionContext(
1478  window_func, partition_key_cond, ra_exe_unit, query_infos, co, column_cache_map);
1479  context->compute();
1480  window_project_node_context->addWindowFunctionContext(std::move(context),
1481  target_index);
1482  }
1483 }
1484 
1485 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1486  const Analyzer::WindowFunction* window_func,
1487  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1488  const RelAlgExecutionUnit& ra_exe_unit,
1489  const std::vector<InputTableInfo>& query_infos,
1490  const CompilationOptions& co,
1491  ColumnCacheMap& column_cache_map) {
1492  const auto memory_level = co.device_type_ == ExecutorDeviceType::GPU
1495  const auto join_table_or_err =
1496  executor_->buildHashTableForQualifier(partition_key_cond,
1497  query_infos,
1498  memory_level,
1500  column_cache_map);
1501  if (!join_table_or_err.fail_reason.empty()) {
1502  throw std::runtime_error(join_table_or_err.fail_reason);
1503  }
1504  CHECK(join_table_or_err.hash_table->getHashType() ==
1506  const auto& order_keys = window_func->getOrderKeys();
1507  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1508  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1509  auto context = std::make_unique<WindowFunctionContext>(
1510  window_func, join_table_or_err.hash_table, elem_count, co.device_type_);
1511  for (const auto& order_key : order_keys) {
1512  const auto order_col =
1513  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1514  if (!order_col) {
1515  throw std::runtime_error("Only order by columns supported for now");
1516  }
1517  const int8_t* column;
1518  size_t join_col_elem_count;
1519  std::tie(column, join_col_elem_count) =
1521  *order_col,
1522  query_infos.front().info.fragments.front(),
1523  memory_level,
1524  0,
1525  chunks_owner,
1526  column_cache_map);
1527  CHECK_EQ(join_col_elem_count, elem_count);
1528  context->addOrderColumn(column, order_col.get(), chunks_owner);
1529  }
1530  return context;
1531 }
1532 
1534  const CompilationOptions& co,
1535  const ExecutionOptions& eo,
1536  RenderInfo* render_info,
1537  const int64_t queue_time_ms) {
1538  const auto work_unit =
1539  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1540  return executeWorkUnit(
1541  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1542 }
1543 
1545  const ExecutionOptions& eo) {
1546  if (eo.just_explain) {
1547  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
1548  }
1549 
1550  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
1553  executor_->getRowSetMemoryOwner(),
1554  executor_);
1555 
1556  std::vector<TargetMetaInfo> empty_targets;
1557  return {rs, empty_targets};
1558 }
1559 
1561  const RelLogicalValues* logical_values,
1562  const ExecutionOptions& eo) {
1563  if (eo.just_explain) {
1564  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1565  }
1567  1,
1569  /*is_table_function=*/false);
1570 
1571  const auto& tuple_type = logical_values->getTupleType();
1572  for (size_t i = 0; i < tuple_type.size(); ++i) {
1573  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
1574  }
1575  logical_values->setOutputMetainfo(tuple_type);
1576  std::vector<std::unique_ptr<Analyzer::ColumnVar>> owned_column_expressions;
1577  std::vector<Analyzer::Expr*> target_expressions;
1578  for (const auto& tuple_component : tuple_type) {
1579  const auto column_var =
1580  new Analyzer::ColumnVar(tuple_component.get_type_info(), 0, 0, 0);
1581  target_expressions.push_back(column_var);
1582  owned_column_expressions.emplace_back(column_var);
1583  }
1584  std::vector<TargetInfo> target_infos;
1585  for (const auto& tuple_type_component : tuple_type) {
1586  target_infos.emplace_back(TargetInfo{false,
1587  kCOUNT,
1588  tuple_type_component.get_type_info(),
1589  SQLTypeInfo(kNULLT, false),
1590  false,
1591  false});
1592  }
1593  auto rs = std::make_shared<ResultSet>(target_infos,
1596  executor_->getRowSetMemoryOwner(),
1597  executor_);
1598  return {rs, tuple_type};
1599 }
1600 
1601 namespace {
1602 
1603 // TODO(alex): Once we're fully migrated to the relational algebra model, change
1604 // the executor interface to use the collation directly and remove this conversion.
1605 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
1606  std::list<Analyzer::OrderEntry> result;
1607  for (size_t i = 0; i < sort->collationCount(); ++i) {
1608  const auto sort_field = sort->getCollation(i);
1609  result.emplace_back(sort_field.getField() + 1,
1610  sort_field.getSortDir() == SortDirection::Descending,
1611  sort_field.getNullsPosition() == NullSortedPosition::First);
1612  }
1613  return result;
1614 }
1615 
1616 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
1617  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
1618  if (aggregate) {
1619  return 0;
1620  }
1621  const auto compound = dynamic_cast<const RelCompound*>(ra);
1622  return (compound && compound->isAggregate()) ? 0 : limit;
1623 }
1624 
1625 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
1626  return !order_entries.empty() && order_entries.front().is_desc;
1627 }
1628 
1629 } // namespace
1630 
1632  const CompilationOptions& co,
1633  const ExecutionOptions& eo,
1634  RenderInfo* render_info,
1635  const int64_t queue_time_ms) {
1636  CHECK_EQ(size_t(1), sort->inputCount());
1637  const auto source = sort->getInput(0);
1638  if (dynamic_cast<const RelSort*>(source)) {
1639  throw std::runtime_error("Sort node not supported as input to another sort");
1640  }
1641  const bool is_aggregate = node_is_aggregate(source);
1642  auto it = leaf_results_.find(sort->getId());
1643  if (it != leaf_results_.end()) {
1644  // Add any transient string literals to the sdp on the agg
1645  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
1647  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
1648 
1649  // Handle push-down for LIMIT for multi-node
1650  auto& aggregated_result = it->second;
1651  auto& result_rows = aggregated_result.rs;
1652  const size_t limit = sort->getLimit();
1653  const size_t offset = sort->getOffset();
1654  const auto order_entries = get_order_entries(sort);
1655  if (limit || offset) {
1656  if (!order_entries.empty()) {
1657  result_rows->sort(order_entries, limit + offset);
1658  }
1659  result_rows->dropFirstN(offset);
1660  if (limit) {
1661  result_rows->keepFirstN(limit);
1662  }
1663  }
1664  ExecutionResult result(result_rows, aggregated_result.targets_meta);
1665  sort->setOutputMetainfo(aggregated_result.targets_meta);
1666 
1667  return result;
1668  }
1669  while (true) {
1670  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1671  bool is_desc{false};
1672  try {
1673  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
1674  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
1675  ExecutionOptions eo_copy = {
1677  eo.allow_multifrag,
1678  eo.just_explain,
1679  eo.allow_loop_joins,
1680  eo.with_watchdog,
1681  eo.jit_debug,
1682  eo.just_validate || sort->isEmptyResult(),
1688  };
1689 
1690  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
1691  auto source_result = executeWorkUnit(source_work_unit,
1692  source->getOutputMetainfo(),
1693  is_aggregate,
1694  co,
1695  eo_copy,
1696  render_info,
1697  queue_time_ms);
1698  if (render_info && render_info->isPotentialInSituRender()) {
1699  return source_result;
1700  }
1701  if (source_result.isFilterPushDownEnabled()) {
1702  return source_result;
1703  }
1704  auto rows_to_sort = source_result.getRows();
1705  if (eo.just_explain) {
1706  return {rows_to_sort, {}};
1707  }
1708  const size_t limit = sort->getLimit();
1709  const size_t offset = sort->getOffset();
1710  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
1711  !use_speculative_top_n(source_work_unit.exe_unit,
1712  rows_to_sort->getQueryMemDesc())) {
1713  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
1714  limit + offset);
1715  }
1716  if (limit || offset) {
1717  if (g_cluster && sort->collationCount() == 0) {
1718  if (offset >= rows_to_sort->rowCount()) {
1719  rows_to_sort->dropFirstN(offset);
1720  } else {
1721  rows_to_sort->keepFirstN(limit + offset);
1722  }
1723  } else {
1724  rows_to_sort->dropFirstN(offset);
1725  if (limit) {
1726  rows_to_sort->keepFirstN(limit);
1727  }
1728  }
1729  }
1730  return {rows_to_sort, source_result.getTargetsMeta()};
1731  } catch (const SpeculativeTopNFailed&) {
1732  CHECK_EQ(size_t(1), groupby_exprs.size());
1733  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
1734  }
1735  }
1736  CHECK(false);
1737  return {std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1738  co.device_type_,
1740  nullptr,
1741  executor_),
1742  {}};
1743 }
1744 
1746  const RelSort* sort,
1747  const bool just_explain) {
1748  const auto source = sort->getInput(0);
1749  const size_t limit = sort->getLimit();
1750  const size_t offset = sort->getOffset();
1751  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
1752  const size_t scan_total_limit =
1753  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
1754  size_t max_groups_buffer_entry_guess{
1755  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
1757  const auto order_entries = get_order_entries(sort);
1758  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
1759  auto source_work_unit = createWorkUnit(source, sort_info, just_explain);
1760  const auto& source_exe_unit = source_work_unit.exe_unit;
1761  if (source_exe_unit.groupby_exprs.size() == 1) {
1762  if (!source_exe_unit.groupby_exprs.front()) {
1763  sort_algorithm = SortAlgorithm::StreamingTopN;
1764  } else {
1765  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
1766  first_oe_is_desc(order_entries))) {
1767  sort_algorithm = SortAlgorithm::Default;
1768  }
1769  }
1770  }
1771 
1772  sort->setOutputMetainfo(source->getOutputMetainfo());
1773  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
1774  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
1775  return {{source_exe_unit.input_descs,
1776  std::move(source_exe_unit.input_col_descs),
1777  source_exe_unit.simple_quals,
1778  source_exe_unit.quals,
1779  source_exe_unit.join_quals,
1780  source_exe_unit.groupby_exprs,
1781  source_exe_unit.target_exprs,
1782  nullptr,
1783  {sort_info.order_entries, sort_algorithm, limit, offset},
1784  scan_total_limit,
1785  source_exe_unit.query_features},
1786  source,
1787  max_groups_buffer_entry_guess,
1788  std::move(source_work_unit.query_rewriter),
1789  source_work_unit.input_permutation,
1790  source_work_unit.left_deep_join_input_sizes};
1791 }
1792 
1793 namespace {
1794 
1801 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
1802  CHECK(!table_infos.empty());
1803  const auto& first_table = table_infos.front();
1804  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
1805  for (const auto& table_info : table_infos) {
1806  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
1807  max_num_groups = table_info.info.getNumTuplesUpperBound();
1808  }
1809  }
1810  return std::max(max_num_groups, size_t(1));
1811 }
1812 
1820  for (const auto target_expr : ra_exe_unit.target_exprs) {
1821  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1822  return false;
1823  }
1824  }
1825  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1826  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
1827  return true;
1828  }
1829  return false;
1830 }
1831 
1832 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
1833  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
1834  ra_exe_unit.simple_quals.empty());
1835 }
1836 
1838  const RelAlgExecutionUnit& ra_exe_unit_in,
1839  const std::vector<InputTableInfo>& table_infos,
1840  const Executor* executor,
1841  const ExecutorDeviceType device_type_in,
1842  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
1843  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
1844  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
1845  const auto target_expr = ra_exe_unit.target_exprs[i];
1846  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1847  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
1848  continue;
1849  }
1850  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
1851  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
1852  CHECK(arg);
1853  const auto& arg_ti = arg->get_type_info();
1854  // Avoid calling getExpressionRange for variable length types (string and array),
1855  // it'd trigger an assertion since that API expects to be called only for types
1856  // for which the notion of range is well-defined. A bit of a kludge, but the
1857  // logic to reject these types anyway is at lower levels in the stack and not
1858  // really worth pulling into a separate function for now.
1859  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
1860  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
1861  continue;
1862  }
1863  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
1864  if (arg_range.getType() != ExpressionRangeType::Integer) {
1865  continue;
1866  }
1867  // When running distributed, the threshold for using the precise implementation
1868  // must be consistent across all leaves, otherwise we could have a mix of precise
1869  // and approximate bitmaps and we cannot aggregate them.
1870  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
1871  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
1872  const auto sub_bitmap_count =
1873  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
1874  int64_t approx_bitmap_sz_bits{0};
1875  const auto error_rate =
1876  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
1877  if (error_rate) {
1878  CHECK(error_rate->get_type_info().get_type() == kINT);
1879  CHECK_GE(error_rate->get_constval().intval, 1);
1880  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
1881  } else {
1882  approx_bitmap_sz_bits = g_hll_precision_bits;
1883  }
1884  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
1885  arg_range.getIntMin(),
1886  approx_bitmap_sz_bits,
1887  true,
1888  device_type,
1889  sub_bitmap_count};
1890  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
1891  arg_range.getIntMin(),
1892  bitmap_sz_bits,
1893  false,
1894  device_type,
1895  sub_bitmap_count};
1896  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
1897  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
1898  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
1899  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
1900  target_exprs_owned.push_back(precise_count_distinct);
1901  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
1902  }
1903  }
1904  return ra_exe_unit;
1905 }
1906 
1908  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
1909  const std::vector<TargetMetaInfo>& targets_meta) {
1910  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
1911  render_info.targets.clear();
1912  for (size_t i = 0; i < targets_meta.size(); ++i) {
1913  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
1914  targets_meta[i].get_resname(),
1915  work_unit_target_exprs[i]->get_shared_ptr(),
1916  false));
1917  }
1918 }
1919 
1920 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
1921  const CompilationOptions& co,
1922  const ExecutionOptions& eo) {
1924  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
1925 }
1926 
1927 } // namespace
1928 
1930  const RelAlgExecutor::WorkUnit& work_unit,
1931  const std::vector<TargetMetaInfo>& targets_meta,
1932  const bool is_agg,
1933  const CompilationOptions& co_in,
1934  const ExecutionOptions& eo,
1935  RenderInfo* render_info,
1936  const int64_t queue_time_ms,
1937  const ssize_t previous_count) {
1939  auto timer = DEBUG_TIMER(__func__);
1940 
1941  auto co = co_in;
1942  ColumnCacheMap column_cache;
1943  if (is_window_execution_unit(work_unit.exe_unit)) {
1944  if (g_cluster) {
1945  throw std::runtime_error(
1946  "Window functions support not supported in distributed mode");
1947  }
1949  throw std::runtime_error("Window functions support is disabled");
1950  }
1951  co.device_type_ = ExecutorDeviceType::CPU;
1952  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
1953  }
1954  if (!eo.just_explain && eo.find_push_down_candidates) {
1955  // find potential candidates:
1956  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
1957  if (!selected_filters.empty() || eo.just_calcite_explain) {
1958  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
1959  }
1960  }
1961  const auto body = work_unit.body;
1962  CHECK(body);
1963  auto it = leaf_results_.find(body->getId());
1964  if (it != leaf_results_.end()) {
1966  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
1967  auto& aggregated_result = it->second;
1968  auto& result_rows = aggregated_result.rs;
1969  ExecutionResult result(result_rows, aggregated_result.targets_meta);
1970  body->setOutputMetainfo(aggregated_result.targets_meta);
1971  if (render_info) {
1972  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
1973  }
1974  return result;
1975  }
1976  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1977 
1979  work_unit.exe_unit, table_infos, executor_, co.device_type_, target_exprs_owned_);
1980  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
1981  if (is_window_execution_unit(ra_exe_unit)) {
1982  CHECK_EQ(table_infos.size(), size_t(1));
1983  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
1984  max_groups_buffer_entry_guess =
1985  table_infos.front().info.fragments.front().getNumTuples();
1986  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
1987  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
1988  if (previous_count > 0 && !exe_unit_has_quals(ra_exe_unit)) {
1989  ra_exe_unit.scan_limit = static_cast<size_t>(previous_count);
1990  } else {
1991  // TODO(adb): enable bump allocator path for render queries
1992  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
1993  ra_exe_unit.scan_limit = 0;
1994  ra_exe_unit.use_bump_allocator = true;
1995  } else if (!eo.just_explain) {
1996  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
1997  if (filter_count_all >= 0) {
1998  ra_exe_unit.scan_limit = std::max(filter_count_all, ssize_t(1));
1999  }
2000  }
2001  }
2002  }
2003 
2004  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2005  co.device_type_,
2007  nullptr,
2008  executor_),
2009  {}};
2010 
2011  auto execute_and_handle_errors =
2012  [&](const auto max_groups_buffer_entry_guess_in,
2013  const bool has_cardinality_estimation) -> ExecutionResult {
2014  // Note that the groups buffer entry guess may be modified during query execution.
2015  // Create a local copy so we can track those changes if we need to attempt a retry due
2016  // to OOM
2017  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2018  try {
2019  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2020  is_agg,
2021  table_infos,
2022  ra_exe_unit,
2023  co,
2024  eo,
2025  cat_,
2026  executor_->row_set_mem_owner_,
2027  render_info,
2028  has_cardinality_estimation,
2029  column_cache),
2030  targets_meta};
2031  } catch (const QueryExecutionError& e) {
2033  return handleOutOfMemoryRetry(
2034  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2035  targets_meta,
2036  is_agg,
2037  co,
2038  eo,
2039  render_info,
2041  queue_time_ms);
2042  }
2043  };
2044 
2045  try {
2046  result = execute_and_handle_errors(
2047  max_groups_buffer_entry_guess,
2049  } catch (const CardinalityEstimationRequired&) {
2050  const auto estimated_groups_buffer_entry_guess =
2051  2 * std::min(groups_approx_upper_bound(table_infos),
2052  getNDVEstimation(work_unit, is_agg, co, eo));
2053  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2054  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2055  }
2056 
2057  result.setQueueTime(queue_time_ms);
2058  if (render_info) {
2059  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2060  if (render_info->isPotentialInSituRender()) {
2061  // return an empty result (with the same queue time, and zero render time)
2062  return {
2063  std::make_shared<ResultSet>(queue_time_ms, 0, executor_->row_set_mem_owner_),
2064  {}};
2065  }
2066  }
2067  return result;
2068 }
2069 
2071  const bool is_agg,
2072  const CompilationOptions& co,
2073  const ExecutionOptions& eo) {
2074  const auto count =
2075  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2076  kCOUNT,
2077  nullptr,
2078  false,
2079  nullptr);
2080  const auto count_all_exe_unit =
2081  create_count_all_execution_unit(work_unit.exe_unit, count);
2082  size_t one{1};
2083  ResultSetPtr count_all_result;
2084  try {
2085  ColumnCacheMap column_cache;
2086  count_all_result =
2087  executor_->executeWorkUnit(one,
2088  is_agg,
2089  get_table_infos(work_unit.exe_unit, executor_),
2090  count_all_exe_unit,
2091  co,
2092  eo,
2093  cat_,
2094  executor_->row_set_mem_owner_,
2095  nullptr,
2096  false,
2097  column_cache);
2098  } catch (const std::exception& e) {
2099  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2100  return -1;
2101  }
2102  const auto count_row = count_all_result->getNextRow(false, false);
2103  CHECK_EQ(size_t(1), count_row.size());
2104  const auto& count_tv = count_row.front();
2105  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2106  CHECK(count_scalar_tv);
2107  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2108  CHECK(count_ptr);
2109  CHECK_GE(*count_ptr, 0);
2110  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2111  return std::max(count_upper_bound, size_t(1));
2112 }
2113 
2114 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2115  const auto& ra_exe_unit = work_unit.exe_unit;
2116  if (ra_exe_unit.input_descs.size() != 1) {
2117  return false;
2118  }
2119  const auto& table_desc = ra_exe_unit.input_descs.front();
2120  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2121  return false;
2122  }
2123  const int table_id = table_desc.getTableId();
2124  for (const auto simple_qual : ra_exe_unit.simple_quals) {
2125  const auto comp_expr =
2126  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2127  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2128  return false;
2129  }
2130  const auto lhs = comp_expr->get_left_operand();
2131  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2132  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2133  return false;
2134  }
2135  const auto rhs = comp_expr->get_right_operand();
2136  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2137  if (!rhs_const) {
2138  return false;
2139  }
2140  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2141  if (cd->isVirtualCol) {
2142  CHECK_EQ("rowid", cd->columnName);
2143  return true;
2144  }
2145  }
2146  return false;
2147 }
2148 
2150  const RelAlgExecutor::WorkUnit& work_unit,
2151  const std::vector<TargetMetaInfo>& targets_meta,
2152  const bool is_agg,
2153  const CompilationOptions& co,
2154  const ExecutionOptions& eo,
2155  RenderInfo* render_info,
2156  const bool was_multifrag_kernel_launch,
2157  const int64_t queue_time_ms) {
2158  // Disable the bump allocator
2159  // Note that this will have basically the same affect as using the bump allocator for
2160  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2161  // path and the bump allocator path for kernel per fragment execution.
2162  auto ra_exe_unit_in = work_unit.exe_unit;
2163  ra_exe_unit_in.use_bump_allocator = false;
2164 
2165  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2166  co.device_type_,
2168  nullptr,
2169  executor_),
2170  {}};
2171 
2172  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2173  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2174  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2175  false,
2176  false,
2177  eo.allow_loop_joins,
2178  eo.with_watchdog,
2179  eo.jit_debug,
2180  false,
2183  false,
2184  false,
2186 
2187  if (was_multifrag_kernel_launch) {
2188  try {
2189  // Attempt to retry using the kernel per fragment path. The smaller input size
2190  // required may allow the entire kernel to execute in GPU memory.
2191  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2192  "kernels disabled.";
2193  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2194  ra_exe_unit_in, table_infos, executor_, co.device_type_, target_exprs_owned_);
2195  ColumnCacheMap column_cache;
2196  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2197  is_agg,
2198  table_infos,
2199  ra_exe_unit,
2200  co,
2201  eo_no_multifrag,
2202  cat_,
2203  executor_->row_set_mem_owner_,
2204  nullptr,
2205  true,
2206  column_cache),
2207  targets_meta};
2208  result.setQueueTime(queue_time_ms);
2209  } catch (const QueryExecutionError& e) {
2211  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2212  }
2213  }
2214 
2215  if (render_info) {
2216  render_info->setForceNonInSituData();
2217  }
2218 
2220  co.hoist_literals_,
2221  co.opt_level_,
2223 
2224  // Only reset the group buffer entry guess if we ran out of slots, which
2225  // suggests a
2226  // highly pathological input which prevented a good estimation of distinct tuple
2227  // count. For projection queries, this will force a per-fragment scan limit, which is
2228  // compatible with the CPU path
2229  VLOG(1) << "Resetting max groups buffer entry guess.";
2230  max_groups_buffer_entry_guess = 0;
2231 
2232  int iteration_ctr = -1;
2233  while (true) {
2234  iteration_ctr++;
2236  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type_, target_exprs_owned_);
2237  ColumnCacheMap column_cache;
2238  try {
2239  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2240  is_agg,
2241  table_infos,
2242  ra_exe_unit,
2243  co_cpu,
2244  eo_no_multifrag,
2245  cat_,
2246  executor_->row_set_mem_owner_,
2247  nullptr,
2248  true,
2249  column_cache),
2250  targets_meta};
2251  } catch (const QueryExecutionError& e) {
2252  // Ran out of slots
2253  if (e.getErrorCode() < 0) {
2254  // Even the conservative guess failed; it should only happen when we group
2255  // by a huge cardinality array. Maybe we should throw an exception instead?
2256  // Such a heavy query is entirely capable of exhausting all the host memory.
2257  CHECK(max_groups_buffer_entry_guess);
2258  // Only allow two iterations of increasingly large entry guesses up to a maximum
2259  // of 512MB per column per kernel
2260  if (g_enable_watchdog || iteration_ctr > 1) {
2261  throw std::runtime_error("Query ran out of output slots in the result");
2262  }
2263  max_groups_buffer_entry_guess *= 2;
2264  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
2265  "groups buffer entry "
2266  "guess equal to "
2267  << max_groups_buffer_entry_guess;
2268  } else {
2270  }
2271  continue;
2272  }
2273  result.setQueueTime(queue_time_ms);
2274  return result;
2275  }
2276  return result;
2277 }
2278 
2280  LOG(ERROR) << "Query execution failed with error "
2281  << getErrorMessageFromCode(error_code);
2282  if (error_code == Executor::ERR_SPECULATIVE_TOP_OOM) {
2283  throw SpeculativeTopNFailed();
2284  }
2285  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
2286  // We ran out of GPU memory, this doesn't count as an error if the query is
2287  // allowed to continue on CPU because retry on CPU is explicitly allowed through
2288  // --allow-cpu-retry.
2289  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
2290  if (!g_allow_cpu_retry) {
2291  throw std::runtime_error(
2292  "Query ran out of GPU memory, unable to automatically retry on CPU");
2293  }
2294  return;
2295  }
2296  throw std::runtime_error(getErrorMessageFromCode(error_code));
2297 }
2298 
2300  if (error_code < 0) {
2301  return "Ran out of slots in the query output buffer";
2302  }
2303  switch (error_code) {
2305  return "Division by zero";
2307  return "Query couldn't keep the entire working set of columns in GPU memory";
2309  return "Self joins not supported yet";
2311  return "Not enough host memory to execute the query";
2313  return "Overflow or underflow";
2315  return "Query execution has exceeded the time limit";
2317  return "Query execution has been interrupted";
2319  return "Columnar conversion not supported for variable length types";
2321  return "Too many literals in the query";
2323  return "NONE ENCODED String types are not supported as input result set.";
2325  return "Not enough OpenGL memory to render the query results";
2327  return "Streaming-Top-N not supported in Render Query";
2328  }
2329  return "Other error: code " + std::to_string(error_code);
2330 }
2331 
2333  const SortInfo& sort_info,
2334  const bool just_explain) {
2335  const auto compound = dynamic_cast<const RelCompound*>(node);
2336  if (compound) {
2337  return createCompoundWorkUnit(compound, sort_info, just_explain);
2338  }
2339  const auto project = dynamic_cast<const RelProject*>(node);
2340  if (project) {
2341  return createProjectWorkUnit(project, sort_info, just_explain);
2342  }
2343  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
2344  if (aggregate) {
2345  return createAggregateWorkUnit(aggregate, sort_info, just_explain);
2346  }
2347  const auto filter = dynamic_cast<const RelFilter*>(node);
2348  CHECK(filter);
2349  return createFilterWorkUnit(filter, sort_info, just_explain);
2350 }
2351 
2352 namespace {
2353 
2355  auto sink = get_data_sink(ra);
2356  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
2357  return join->getJoinType();
2358  }
2359  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
2360  return JoinType::INNER;
2361  }
2362 
2363  return JoinType::INVALID;
2364 }
2365 
2366 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
2367  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2368  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
2369  return nullptr;
2370  }
2371  const auto equi_join_condition =
2372  dynamic_cast<const RexOperator*>(condition->getOperand(0));
2373  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
2374  return nullptr;
2375  }
2376  const auto both_are_null_condition =
2377  dynamic_cast<const RexOperator*>(condition->getOperand(1));
2378  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
2379  both_are_null_condition->size() != 2) {
2380  return nullptr;
2381  }
2382  const auto lhs_is_null =
2383  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
2384  const auto rhs_is_null =
2385  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
2386  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
2387  rhs_is_null->getOperator() != kISNULL) {
2388  return nullptr;
2389  }
2390  CHECK_EQ(size_t(1), lhs_is_null->size());
2391  CHECK_EQ(size_t(1), rhs_is_null->size());
2392  CHECK_EQ(size_t(2), equi_join_condition->size());
2393  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
2394  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
2395  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
2396  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
2397  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
2398  return nullptr;
2399  }
2400  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
2401  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
2402  RexDeepCopyVisitor deep_copy_visitor;
2403  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
2404  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
2405  eq_operands.emplace_back(lhs_op_copy.release());
2406  eq_operands.emplace_back(rhs_op_copy.release());
2407  return boost::make_unique<const RexOperator>(
2408  kBW_EQ, eq_operands, equi_join_condition->getType());
2409  }
2410  return nullptr;
2411 }
2412 
2413 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
2414  const RexScalar* scalar) {
2415  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2416  if (condition && condition->getOperator() == kAND) {
2417  CHECK_GE(condition->size(), size_t(2));
2418  auto acc = get_bitwise_equals(condition->getOperand(0));
2419  if (!acc) {
2420  return nullptr;
2421  }
2422  for (size_t i = 1; i < condition->size(); ++i) {
2423  std::vector<std::unique_ptr<const RexScalar>> and_operands;
2424  and_operands.emplace_back(std::move(acc));
2425  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
2426  acc =
2427  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
2428  }
2429  return acc;
2430  }
2431  return get_bitwise_equals(scalar);
2432 }
2433 
2434 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
2435  CHECK_GE(left_deep_join->inputCount(), size_t(2));
2436  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
2437  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
2438  ++nesting_level) {
2439  if (left_deep_join->getOuterCondition(nesting_level)) {
2440  join_types[nesting_level - 1] = JoinType::LEFT;
2441  }
2442  }
2443  return join_types;
2444 }
2445 
2446 template <class RA>
2447 std::vector<size_t> do_table_reordering(
2448  std::vector<InputDescriptor>& input_descs,
2449  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
2450  const JoinQualsPerNestingLevel& left_deep_join_quals,
2451  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2452  const RA* node,
2453  const std::vector<InputTableInfo>& query_infos,
2454  const Executor* executor) {
2455  if (g_cluster) {
2456  // Disable table reordering in distributed mode. The aggregator does not have enough
2457  // information to break ties
2458  return {};
2459  }
2460  const auto& cat = *executor->getCatalog();
2461  for (const auto& table_info : query_infos) {
2462  if (table_info.table_id < 0) {
2463  continue;
2464  }
2465  const auto td = cat.getMetadataForTable(table_info.table_id);
2466  CHECK(td);
2467  if (table_is_replicated(td)) {
2468  return {};
2469  }
2470  }
2471  const auto input_permutation =
2472  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
2473  input_to_nest_level = get_input_nest_levels(node, input_permutation);
2474  std::tie(input_descs, input_col_descs, std::ignore) =
2475  get_input_desc(node, input_to_nest_level, input_permutation, cat);
2476  return input_permutation;
2477 }
2478 
2480  const RelLeftDeepInnerJoin* left_deep_join) {
2481  std::vector<size_t> input_sizes;
2482  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
2483  const auto inputs = get_node_output(left_deep_join->getInput(i));
2484  input_sizes.push_back(inputs.size());
2485  }
2486  return input_sizes;
2487 }
2488 
2489 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
2490  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
2491  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
2492  for (const auto& qual : quals) {
2493  const auto rewritten_qual = rewrite_expr(qual.get());
2494  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
2495  }
2496  return rewritten_quals;
2497 }
2498 
2499 } // namespace
2500 
2502  const RelCompound* compound,
2503  const SortInfo& sort_info,
2504  const bool just_explain) {
2505  std::vector<InputDescriptor> input_descs;
2506  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2507  auto input_to_nest_level = get_input_nest_levels(compound, {});
2508  std::tie(input_descs, input_col_descs, std::ignore) =
2509  get_input_desc(compound, input_to_nest_level, {}, cat_);
2510  const auto query_infos = get_table_infos(input_descs, executor_);
2511  CHECK_EQ(size_t(1), compound->inputCount());
2512  const auto left_deep_join =
2513  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
2514  JoinQualsPerNestingLevel left_deep_join_quals;
2515  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2516  : std::vector<JoinType>{get_join_type(compound)};
2517  if (left_deep_join) {
2518  left_deep_join_quals = translateLeftDeepJoinFilter(
2519  left_deep_join, input_descs, input_to_nest_level, just_explain);
2520  }
2521  QueryFeatureDescriptor query_features;
2522  RelAlgTranslator translator(cat_,
2523  executor_,
2524  input_to_nest_level,
2525  join_types,
2526  now_,
2527  just_explain,
2528  query_features);
2529  size_t starting_projection_column_idx =
2530  get_scalar_sources_size(compound) - compound->getTargetColumnCount() - 1;
2531  CHECK_GT(starting_projection_column_idx, 0u);
2532  const auto scalar_sources =
2534  translator,
2535  compound->getModifiedTableDescriptor()->tableId,
2536  cat_,
2537  compound->getTargetColumns(),
2538  starting_projection_column_idx);
2539  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
2540  const auto quals_cf = translate_quals(compound, translator);
2541  decltype(target_exprs_owned_) target_exprs_owned;
2542  translate_targets_for_update(target_exprs_owned,
2543  scalar_sources,
2544  groupby_exprs,
2545  compound,
2546  translator,
2547  compound->getModifiedTableDescriptor()->tableId,
2548  cat_,
2549  compound->getTargetColumns(),
2550  starting_projection_column_idx);
2551  target_exprs_owned_.insert(
2552  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
2553  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
2554  CHECK_EQ(compound->size(), target_exprs.size());
2555 
2556  const auto update_expr_iter =
2557  std::next(target_exprs.cbegin(), starting_projection_column_idx);
2558  decltype(target_exprs) filtered_target_exprs(update_expr_iter, target_exprs.end());
2559 
2560  UsedColumnsVisitor used_columns_visitor;
2561  std::unordered_set<int> id_accumulator;
2562 
2563  for (auto const& expr :
2564  boost::make_iterator_range(update_expr_iter, target_exprs.end())) {
2565  auto used_column_ids = used_columns_visitor.visit(expr);
2566  id_accumulator.insert(used_column_ids.begin(), used_column_ids.end());
2567  }
2568  for (auto const& expr : quals_cf.simple_quals) {
2569  auto simple_quals_used_column_ids = used_columns_visitor.visit(expr.get());
2570  id_accumulator.insert(simple_quals_used_column_ids.begin(),
2571  simple_quals_used_column_ids.end());
2572  }
2573  for (auto const& expr : quals_cf.quals) {
2574  auto quals_used_column_ids = used_columns_visitor.visit(expr.get());
2575  id_accumulator.insert(quals_used_column_ids.begin(), quals_used_column_ids.end());
2576  }
2577 
2578  decltype(input_col_descs) filtered_input_col_descs;
2579  for (auto col_desc : input_col_descs) {
2580  if (id_accumulator.find(col_desc->getColId()) != id_accumulator.end()) {
2581  filtered_input_col_descs.push_back(col_desc);
2582  }
2583  }
2584 
2585  const RelAlgExecutionUnit exe_unit = {input_descs,
2586  filtered_input_col_descs,
2587  quals_cf.simple_quals,
2588  rewrite_quals(quals_cf.quals),
2589  left_deep_join_quals,
2590  groupby_exprs,
2591  filtered_target_exprs,
2592  nullptr,
2593  sort_info,
2594  0,
2595  query_features,
2596  false};
2597  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
2598  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
2599  const auto targets_meta =
2600  get_modify_manipulated_targets_meta(compound, rewritten_exe_unit.target_exprs);
2601  compound->setOutputMetainfo(targets_meta);
2602  return {rewritten_exe_unit,
2603  compound,
2605  std::move(query_rewriter)};
2606 }
2607 
2609  const RelCompound* compound,
2610  const SortInfo& sort_info,
2611  const bool just_explain) {
2612  std::vector<InputDescriptor> input_descs;
2613  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2614  auto input_to_nest_level = get_input_nest_levels(compound, {});
2615  std::tie(input_descs, input_col_descs, std::ignore) =
2616  get_input_desc(compound, input_to_nest_level, {}, cat_);
2617  const auto query_infos = get_table_infos(input_descs, executor_);
2618  CHECK_EQ(size_t(1), compound->inputCount());
2619  const auto left_deep_join =
2620  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
2621  JoinQualsPerNestingLevel left_deep_join_quals;
2622  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2623  : std::vector<JoinType>{get_join_type(compound)};
2624  std::vector<size_t> input_permutation;
2625  std::vector<size_t> left_deep_join_input_sizes;
2626  if (left_deep_join) {
2627  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
2628  left_deep_join_quals = translateLeftDeepJoinFilter(
2629  left_deep_join, input_descs, input_to_nest_level, just_explain);
2631  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
2632  join_types.end()) {
2633  input_permutation = do_table_reordering(input_descs,
2634  input_col_descs,
2635  left_deep_join_quals,
2636  input_to_nest_level,
2637  compound,
2638  query_infos,
2639  executor_);
2640  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
2641  std::tie(input_descs, input_col_descs, std::ignore) =
2642  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
2643  left_deep_join_quals = translateLeftDeepJoinFilter(
2644  left_deep_join, input_descs, input_to_nest_level, just_explain);
2645  }
2646  }
2647  QueryFeatureDescriptor query_features;
2648  RelAlgTranslator translator(cat_,
2649  executor_,
2650  input_to_nest_level,
2651  join_types,
2652  now_,
2653  just_explain,
2654  query_features);
2655  const auto scalar_sources = translate_scalar_sources(compound, translator);
2656  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
2657  const auto quals_cf = translate_quals(compound, translator);
2658  const auto target_exprs = translate_targets(
2659  target_exprs_owned_, scalar_sources, groupby_exprs, compound, translator);
2660  CHECK_EQ(compound->size(), target_exprs.size());
2661  const RelAlgExecutionUnit exe_unit = {input_descs,
2662  input_col_descs,
2663  quals_cf.simple_quals,
2664  rewrite_quals(quals_cf.quals),
2665  left_deep_join_quals,
2666  groupby_exprs,
2667  target_exprs,
2668  nullptr,
2669  sort_info,
2670  0,
2671  query_features,
2672  false};
2673  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
2674  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
2675  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
2676  compound->setOutputMetainfo(targets_meta);
2677  return {rewritten_exe_unit,
2678  compound,
2680  std::move(query_rewriter),
2681  input_permutation,
2682  left_deep_join_input_sizes};
2683 }
2684 
2685 namespace {
2686 
2687 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
2688  CHECK(qual_expr);
2689  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
2690  if (!bin_oper || bin_oper->getOperator() != kAND) {
2691  return {qual_expr};
2692  }
2693  CHECK_GE(bin_oper->size(), size_t(2));
2694  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
2695  for (size_t i = 1; i < bin_oper->size(); ++i) {
2696  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
2697  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
2698  }
2699  return lhs_cf;
2700 }
2701 
2702 std::shared_ptr<Analyzer::Expr> build_logical_expression(
2703  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
2704  const SQLOps sql_op) {
2705  CHECK(!factors.empty());
2706  auto acc = factors.front();
2707  for (size_t i = 1; i < factors.size(); ++i) {
2708  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
2709  }
2710  return acc;
2711 }
2712 
2713 template <class QualsList>
2714 bool list_contains_expression(const QualsList& haystack,
2715  const std::shared_ptr<Analyzer::Expr>& needle) {
2716  for (const auto& qual : haystack) {
2717  if (*qual == *needle) {
2718  return true;
2719  }
2720  }
2721  return false;
2722 }
2723 
2724 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
2725 // evaluations of `p` and allows use of the original form in joins if `p`
2726 // can be used for hash joins.
2727 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
2728  const std::shared_ptr<Analyzer::Expr>& expr) {
2729  const auto expr_terms = qual_to_disjunctive_form(expr);
2730  CHECK_GE(expr_terms.size(), size_t(1));
2731  const auto& first_term = expr_terms.front();
2732  const auto first_term_factors = qual_to_conjunctive_form(first_term);
2733  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
2734  // First, collect the conjunctive components common to all the disjunctive components.
2735  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
2736  for (const auto& first_term_factor : first_term_factors.quals) {
2737  bool is_common =
2738  expr_terms.size() > 1; // Only report common factors for disjunction.
2739  for (size_t i = 1; i < expr_terms.size(); ++i) {
2740  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
2741  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
2742  is_common = false;
2743  break;
2744  }
2745  }
2746  if (is_common) {
2747  common_factors.push_back(first_term_factor);
2748  }
2749  }
2750  if (common_factors.empty()) {
2751  return expr;
2752  }
2753  // Now that the common expressions are known, collect the remaining expressions.
2754  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
2755  for (const auto& term : expr_terms) {
2756  const auto term_cf = qual_to_conjunctive_form(term);
2757  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
2758  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
2759  for (const auto& qual : term_cf.quals) {
2760  if (!list_contains_expression(common_factors, qual)) {
2761  remaining_quals.push_back(qual);
2762  }
2763  }
2764  if (!remaining_quals.empty()) {
2765  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
2766  }
2767  }
2768  // Reconstruct the expression with the transformation applied.
2769  const auto common_expr = build_logical_expression(common_factors, kAND);
2770  if (remaining_terms.empty()) {
2771  return common_expr;
2772  }
2773  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
2774  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
2775 }
2776 
2777 } // namespace
2778 
2779 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
2780  const RexScalar* join_condition,
2781  const std::vector<JoinType>& join_types,
2782  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2783  const bool just_explain) const {
2784  QueryFeatureDescriptor query_features;
2785  RelAlgTranslator translator(cat_,
2786  executor_,
2787  input_to_nest_level,
2788  join_types,
2789  now_,
2790  just_explain,
2791  query_features);
2792  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
2793  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
2794  for (const auto rex_condition_component : rex_condition_cf) {
2795  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
2796  const auto join_condition =
2798  bw_equals ? bw_equals.get() : rex_condition_component));
2799  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
2800  join_condition_quals.insert(join_condition_quals.end(),
2801  join_condition_cf.quals.begin(),
2802  join_condition_cf.quals.end());
2803  join_condition_quals.insert(join_condition_quals.end(),
2804  join_condition_cf.simple_quals.begin(),
2805  join_condition_cf.simple_quals.end());
2806  }
2807  return combine_equi_join_conditions(join_condition_quals);
2808 }
2809 
2810 // Translate left deep join filter and separate the conjunctive form qualifiers
2811 // per nesting level. The code generated for hash table lookups on each level
2812 // must dominate its uses in deeper nesting levels.
2814  const RelLeftDeepInnerJoin* join,
2815  const std::vector<InputDescriptor>& input_descs,
2816  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2817  const bool just_explain) {
2818  const auto join_types = left_deep_join_types(join);
2819  const auto join_condition_quals = makeJoinQuals(
2820  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
2821  MaxRangeTableIndexVisitor rte_idx_visitor;
2822  JoinQualsPerNestingLevel result(input_descs.size() - 1);
2823  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
2824  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
2825  const auto outer_condition = join->getOuterCondition(rte_idx);
2826  if (outer_condition) {
2827  result[rte_idx - 1].quals =
2828  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
2829  CHECK_LE(rte_idx, join_types.size());
2830  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
2831  result[rte_idx - 1].type = JoinType::LEFT;
2832  continue;
2833  }
2834  for (const auto qual : join_condition_quals) {
2835  if (visited_quals.count(qual)) {
2836  continue;
2837  }
2838  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
2839  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
2840  const auto it_ok = visited_quals.emplace(qual);
2841  CHECK(it_ok.second);
2842  result[rte_idx - 1].quals.push_back(qual);
2843  }
2844  }
2845  CHECK_LE(rte_idx, join_types.size());
2846  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
2847  result[rte_idx - 1].type = JoinType::INNER;
2848  }
2849  return result;
2850 }
2851 
2852 namespace {
2853 
2854 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
2855  const RelAlgNode* ra_node,
2856  const size_t nest_level,
2857  const std::vector<TargetMetaInfo>& in_metainfo,
2858  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
2859  CHECK_LE(size_t(1), ra_node->inputCount());
2860  CHECK_GE(size_t(2), ra_node->inputCount());
2861  const auto input = ra_node->getInput(nest_level);
2862  const auto it_rte_idx = input_to_nest_level.find(input);
2863  CHECK(it_rte_idx != input_to_nest_level.end());
2864  const int rte_idx = it_rte_idx->second;
2865  const int table_id = table_id_from_ra(input);
2866  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
2867  const auto scan_ra = dynamic_cast<const RelScan*>(input);
2868  int input_idx = 0;
2869  for (const auto& input_meta : in_metainfo) {
2870  inputs.push_back(
2871  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
2872  table_id,
2873  scan_ra ? input_idx + 1 : input_idx,
2874  rte_idx));
2875  ++input_idx;
2876  }
2877  return inputs;
2878 }
2879 
2880 } // namespace
2881 
2883  const RelAggregate* aggregate,
2884  const SortInfo& sort_info,
2885  const bool just_explain) {
2886  std::vector<InputDescriptor> input_descs;
2887  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2888  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
2889  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
2890  std::tie(input_descs, input_col_descs, used_inputs_owned) =
2891  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
2892  const auto join_type = get_join_type(aggregate);
2893  QueryFeatureDescriptor query_features;
2894  RelAlgTranslator translator(cat_,
2895  executor_,
2896  input_to_nest_level,
2897  {join_type},
2898  now_,
2899  just_explain,
2900  query_features);
2901  CHECK_EQ(size_t(1), aggregate->inputCount());
2902  const auto source = aggregate->getInput(0);
2903  const auto& in_metainfo = source->getOutputMetainfo();
2904  const auto scalar_sources =
2905  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
2906  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
2907  const auto target_exprs = translate_targets(
2908  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
2909  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
2910  aggregate->setOutputMetainfo(targets_meta);
2911  return {{input_descs,
2912  input_col_descs,
2913  {},
2914  {},
2915  {},
2916  groupby_exprs,
2917  target_exprs,
2918  nullptr,
2919  sort_info,
2920  0,
2921  query_features,
2922  false},
2923  aggregate,
2925  nullptr};
2926 }
2927 
2929  const RelProject* project,
2930  const SortInfo& sort_info,
2931  const bool just_explain) {
2932  std::vector<InputDescriptor> input_descs;
2933  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2934  auto input_to_nest_level = get_input_nest_levels(project, {});
2935  std::tie(input_descs, input_col_descs, std::ignore) =
2936  get_input_desc(project, input_to_nest_level, {}, cat_);
2937  const auto left_deep_join =
2938  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
2939  JoinQualsPerNestingLevel left_deep_join_quals;
2940  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2941  : std::vector<JoinType>{get_join_type(project)};
2942  if (left_deep_join) {
2943  left_deep_join_quals = translateLeftDeepJoinFilter(
2944  left_deep_join, input_descs, input_to_nest_level, just_explain);
2945  }
2946  QueryFeatureDescriptor query_features;
2947  RelAlgTranslator translator(cat_,
2948  executor_,
2949  input_to_nest_level,
2950  join_types,
2951  now_,
2952  just_explain,
2953  query_features);
2954  size_t starting_projection_column_idx =
2955  get_scalar_sources_size(project) - project->getTargetColumnCount() - 1;
2956  CHECK_GT(starting_projection_column_idx, 0u);
2957  auto target_exprs_owned =
2959  translator,
2960  project->getModifiedTableDescriptor()->tableId,
2961  cat_,
2962  project->getTargetColumns(),
2963  starting_projection_column_idx);
2964  target_exprs_owned_.insert(
2965  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
2966  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
2967  CHECK_EQ(project->size(), target_exprs.size());
2968 
2969  const auto update_expr_iter =
2970  std::next(target_exprs.cbegin(), starting_projection_column_idx);
2971  decltype(target_exprs) filtered_target_exprs(update_expr_iter, target_exprs.end());
2972 
2973  UsedColumnsVisitor used_columns_visitor;
2974  std::unordered_set<int> id_accumulator;
2975 
2976  for (auto const& expr :
2977  boost::make_iterator_range(update_expr_iter, target_exprs.end())) {
2978  auto used_column_ids = used_columns_visitor.visit(expr);
2979  id_accumulator.insert(used_column_ids.begin(), used_column_ids.end());
2980  }
2981 
2982  decltype(input_col_descs) filtered_input_col_descs;
2983  for (auto col_desc : input_col_descs) {
2984  if (id_accumulator.find(col_desc->getColId()) != id_accumulator.end()) {
2985  filtered_input_col_descs.push_back(col_desc);
2986  }
2987  }
2988 
2989  const auto targets_meta =
2990  get_modify_manipulated_targets_meta(project, filtered_target_exprs);
2991  project->setOutputMetainfo(targets_meta);
2992  return {{input_descs,
2993  filtered_input_col_descs,
2994  {},
2995  {},
2996  left_deep_join_quals,
2997  {nullptr},
2998  filtered_target_exprs,
2999  nullptr,
3000  sort_info,
3001  0,
3002  query_features,
3003  false},
3004  project,
3006  nullptr};
3007 }
3008 
3010  const SortInfo& sort_info,
3011  const bool just_explain) {
3012  std::vector<InputDescriptor> input_descs;
3013  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3014  auto input_to_nest_level = get_input_nest_levels(project, {});
3015  std::tie(input_descs, input_col_descs, std::ignore) =
3016  get_input_desc(project, input_to_nest_level, {}, cat_);
3017  const auto query_infos = get_table_infos(input_descs, executor_);
3018 
3019  const auto left_deep_join =
3020  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3021  JoinQualsPerNestingLevel left_deep_join_quals;
3022  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3023  : std::vector<JoinType>{get_join_type(project)};
3024  std::vector<size_t> input_permutation;
3025  std::vector<size_t> left_deep_join_input_sizes;
3026  if (left_deep_join) {
3027  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3028  const auto query_infos = get_table_infos(input_descs, executor_);
3029  left_deep_join_quals = translateLeftDeepJoinFilter(
3030  left_deep_join, input_descs, input_to_nest_level, just_explain);
3032  input_permutation = do_table_reordering(input_descs,
3033  input_col_descs,
3034  left_deep_join_quals,
3035  input_to_nest_level,
3036  project,
3037  query_infos,
3038  executor_);
3039  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3040  std::tie(input_descs, input_col_descs, std::ignore) =
3041  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3042  left_deep_join_quals = translateLeftDeepJoinFilter(
3043  left_deep_join, input_descs, input_to_nest_level, just_explain);
3044  }
3045  }
3046 
3047  QueryFeatureDescriptor query_features;
3048  RelAlgTranslator translator(cat_,
3049  executor_,
3050  input_to_nest_level,
3051  join_types,
3052  now_,
3053  just_explain,
3054  query_features);
3055  const auto target_exprs_owned = translate_scalar_sources(project, translator);
3056  target_exprs_owned_.insert(
3057  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3058  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3059  const RelAlgExecutionUnit exe_unit = {input_descs,
3060  input_col_descs,
3061  {},
3062  {},
3063  left_deep_join_quals,
3064  {nullptr},
3065  target_exprs,
3066  nullptr,
3067  sort_info,
3068  0,
3069  query_features,
3070  false};
3071  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3072  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3073  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3074  project->setOutputMetainfo(targets_meta);
3075  return {rewritten_exe_unit,
3076  project,
3078  std::move(query_rewriter),
3079  input_permutation,
3080  left_deep_join_input_sizes};
3081 }
3082 
3084  const RelTableFunction* table_func,
3085  const bool just_explain) {
3086  std::vector<InputDescriptor> input_descs;
3087  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3088  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3089  std::tie(input_descs, input_col_descs, std::ignore) =
3090  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3091  const auto query_infos = get_table_infos(input_descs, executor_);
3092  CHECK_EQ(size_t(1), table_func->inputCount());
3093 
3094  QueryFeatureDescriptor query_features; // TODO(adb): remove/make optional
3095  RelAlgTranslator translator(
3096  cat_, executor_, input_to_nest_level, {}, now_, just_explain, query_features);
3097  const auto input_exprs_owned = translate_scalar_sources(table_func, translator);
3098  target_exprs_owned_.insert(
3099  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3100  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3101 
3102  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3103  for (auto input_expr : input_exprs) {
3104  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3105  input_col_exprs.push_back(col_var);
3106  }
3107  }
3108  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3109 
3110  const auto& table_function_impl =
3112 
3113  std::vector<Analyzer::Expr*> table_func_outputs;
3114  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3115  const auto ti = table_function_impl.getOutputSQLType(i);
3116  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3117  table_func_outputs.push_back(target_exprs_owned_.back().get());
3118  }
3119 
3120  std::optional<size_t> output_row_multiplier;
3121  if (table_function_impl.hasUserSpecifiedOutputMultiplier()) {
3122  const auto parameter_index = table_function_impl.getOutputRowParameter();
3123  CHECK_GT(parameter_index, size_t(0));
3124  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3125  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3126  if (!parameter_expr_literal) {
3127  throw std::runtime_error(
3128  "Provided output buffer multiplier parameter is not a literal. Only literal "
3129  "values are supported with output buffer multiplier configured table "
3130  "functions.");
3131  }
3132  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3133  if (literal_val < 0) {
3134  throw std::runtime_error("Provided output row multiplier " +
3135  std::to_string(literal_val) +
3136  " is not valid for table functions.");
3137  }
3138  output_row_multiplier = static_cast<size_t>(literal_val);
3139  }
3140 
3141  const TableFunctionExecutionUnit exe_unit = {
3142  input_descs,
3143  input_col_descs,
3144  input_exprs, // table function inputs
3145  input_col_exprs, // table function column inputs (duplicates w/ above)
3146  table_func_outputs, // table function projected exprs
3147  output_row_multiplier, // output buffer multiplier
3148  table_func->getFunctionName()};
3149  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3150  table_func->setOutputMetainfo(targets_meta);
3151  return {exe_unit, table_func};
3152 }
3153 
3154 namespace {
3155 
3156 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3158  const RelAlgTranslator& translator,
3159  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3160  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3161  std::vector<TargetMetaInfo> in_metainfo;
3162  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3163  const auto data_sink_node = get_data_sink(filter);
3164  auto input_it = inputs_owned.begin();
3165  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3166  const auto source = data_sink_node->getInput(nest_level);
3167  const auto scan_source = dynamic_cast<const RelScan*>(source);
3168  if (scan_source) {
3169  CHECK(source->getOutputMetainfo().empty());
3170  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3171  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3172  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3173  }
3174  const auto source_metadata =
3175  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3176  in_metainfo.insert(
3177  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3178  exprs_owned.insert(
3179  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3180  } else {
3181  const auto& source_metadata = source->getOutputMetainfo();
3182  input_it += source_metadata.size();
3183  in_metainfo.insert(
3184  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3185  const auto scalar_sources_owned = synthesize_inputs(
3186  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3187  exprs_owned.insert(
3188  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3189  }
3190  }
3191  return std::make_pair(in_metainfo, exprs_owned);
3192 }
3193 
3194 } // namespace
3195 
3197  const SortInfo& sort_info,
3198  const bool just_explain) {
3199  CHECK_EQ(size_t(1), filter->inputCount());
3200  std::vector<InputDescriptor> input_descs;
3201  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3202  std::vector<TargetMetaInfo> in_metainfo;
3203  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3204  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3205 
3206  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3207  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3208  get_input_desc(filter, input_to_nest_level, {}, cat_);
3209  const auto join_type = get_join_type(filter);
3210  QueryFeatureDescriptor query_features;
3211  RelAlgTranslator translator(cat_,
3212  executor_,
3213  input_to_nest_level,
3214  {join_type},
3215  now_,
3216  just_explain,
3217  query_features);
3218  std::tie(in_metainfo, target_exprs_owned) =
3219  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3220  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3221  const auto qual = fold_expr(filter_expr.get());
3222  target_exprs_owned_.insert(
3223  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3224  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3225  filter->setOutputMetainfo(in_metainfo);
3226  const auto rewritten_qual = rewrite_expr(qual.get());
3227  return {{input_descs,
3228  input_col_descs,
3229  {},
3230  {rewritten_qual ? rewritten_qual : qual},
3231  {},
3232  {nullptr},
3233  target_exprs,
3234  nullptr,
3235  sort_info,
3236  0},
3237  filter,
3239  nullptr};
3240 }
3241 
const size_t getGroupByCount() const
SQLTypeInfo getOutputSQLType(const size_t idx) const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
ExecutionResult executeRelAlgQueryNoRetry(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:198
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
const RenderQueryOptions * getRenderQueryOptsPtr() const
Definition: RenderInfo.cpp:100
JoinType
Definition: sqldefs.h:98
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain)
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1042
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const bool just_explain)
AggregatedColRange computeColRangesCache(const RelAlgNode *ra)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
bool g_enable_bump_allocator
Definition: Execute.cpp:99
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
bool g_cluster
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner
Definition: RenderInfo.h:34
std::vector< std::shared_ptr< RexSubQuery > > subqueries_
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const bool just_explain)
void setForceNonInSituData()
Definition: RenderInfo.cpp:42
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator)
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:70
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
const RexScalar * getOuterCondition(const size_t nesting_level) const
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:884
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:185
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:65
size_t size() const override
static std::shared_ptr< Catalog > get(const std::string &dbName)
Definition: Catalog.cpp:2990
static SpeculativeTopNBlacklist speculative_topn_blacklist_
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:38
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
TableGenerations computeTableGenerations(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:95
const RexScalar * getCondition() const
const ExecutorOptLevel opt_level_
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
Definition: sqldefs.h:38
std::vector< TargetMetaInfo > get_modify_manipulated_targets_meta(ModifyManipulationTarget const *manip_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1585
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
#define CHECK_GE(x, y)
Definition: Logger.h:203
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static const TableFunction & get(const std::string &name)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
Definition: sqldefs.h:30
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const bool just_explain)
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1044
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
T visit(const Analyzer::Expr *expr) const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:214
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
size_t getNDVEstimation(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const std::vector< InputDescriptor > input_descs
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1404
#define CHECK_GT(x, y)
Definition: Logger.h:202
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
void handleNop(RaExecutionDesc &ed)
std::shared_ptr< const RelAlgNode > deserialize_ra_dag(const std::string &query_ra, const Catalog_Namespace::Catalog &cat, RelAlgExecutor *ra_executor, const RenderQueryOptions *render_opts)
#define LOG_IF(severity, condition)
Definition: Logger.h:276
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
bool g_enable_watchdog
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:42
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:468
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1045
Definition: sqldefs.h:71
size_t getColInputsSize() const
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1046
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1043
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:179
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1033
const bool allow_multifrag
std::vector< Analyzer::Expr * > translate_targets_for_update(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
WorkUnit createModifyProjectWorkUnit(const RelProject *project, const SortInfo &sort_info, const bool just_explain)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:77
CHECK(cgen_state)
const bool just_validate
unsigned getId() const
const SortInfo sort_info
ColumnNameList const & getTargetColumns() const
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1037
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const JoinQualsPerNestingLevel join_quals
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
const double gpu_input_mem_limit_percent
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:92
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1039
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1041
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:71
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
ColumnValidationFunction yieldColumnValidator(TableDescriptorType const *table_descriptor)
const bool register_intel_jit_listener_
static WindowProjectNodeContext * create()
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1036
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const bool output_columnar_hint
bool get_is_distinct() const
Definition: Analyzer.h:1047
WorkUnit createModifyCompoundWorkUnit(const RelCompound *compound, const SortInfo &sort_info, const bool just_explain)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1034
size_t getTableFuncInputsSize() const
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
ExecutorDeviceType device_type_
Executor * getExecutor() const
std::shared_ptr< Analyzer::Expr > cast_to_column_type(std::shared_ptr< Analyzer::Expr > expr, int32_t tableId, const Catalog_Namespace::Catalog &cat, const std::string &colName)
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:261
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
bool g_enable_window_functions
Definition: Execute.cpp:93
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RexScalar * getProjectAt(const size_t idx) const
const ExecutorExplainType explain_type_
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: sqldefs.h:69
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:189
const bool just_calcite_explain
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator)
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:201
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
void executeDeleteViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
Definition: sqldefs.h:71
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
size_t collationCount() const
const RelAlgNode * getSourceNode() const
size_t getLimit() const
void executeUpdateViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
int table_id_from_ra(const RelAlgNode *ra_node)
bool g_enable_table_functions
Definition: Execute.cpp:94
static void handlePersistentError(const int32_t error_code)
const RexScalar * getTableFuncInputAt(const size_t idx) const
void executeDeleteViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1044
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool g_skip_intermediate_count
static const int32_t ERR_SPECULATIVE_TOP_OOM
Definition: Execute.h:1040
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
void executeUpdateViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const RexScalar * getInnerCondition() const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:296
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_allow_cpu_retry
Definition: Execute.cpp:74
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
WorkUnit createSortInputWorkUnit(const RelSort *, const bool just_explain)
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
const Expr * get_left_operand() const
Definition: Analyzer.h:436
void cleanupPostExecution()
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map)
ExecutionResult executeRelAlgQuery(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
const RexScalar * scalar_at(const size_t i, const RelCompound *compound)
Definition: sqltypes.h:48
std::vector< Analyzer::Expr * > target_exprs
SQLTypeInfo columnType
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
auto const isVarlenUpdateRequired() const
const bool with_dynamic_watchdog_
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
Definition: sqldefs.h:71
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1400
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1038
const TableDescriptor * getTableDescriptor() const
Definition: sqldefs.h:71
std::vector< const RelAlgNode * > get_non_join_sequence(const RelAlgNode *ra)
Executor * executor_
int getNestLevel() const
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:167
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
#define VLOG(n)
Definition: Logger.h:280
Type timer_start()
Definition: measure.h:40
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
std::vector< std::string > ColumnNameList
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1432
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:142
TableDescriptor const * getModifiedTableDescriptor() const
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
const RexScalar * getScalarSource(const size_t i) const
StringDictionaryGenerations computeStringDictionaryGenerations(const RelAlgNode *ra)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
bool validateTargetColumns(VALIDATION_FUNCTOR validator) const
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)