OmniSciDB  04ee39c94c
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
18 #include "RelAlgTranslator.h"
19 
21 #include "CardinalityEstimator.h"
22 #include "ColumnFetcher.h"
23 #include "EquiJoinCondition.h"
24 #include "ErrorHandling.h"
25 #include "ExpressionRewrite.h"
26 #include "FromTableReordering.h"
27 #include "InputMetadata.h"
28 #include "JoinFilterPushDown.h"
30 #include "RangeTableIndexVisitor.h"
31 #include "RexVisitor.h"
32 #include "UsedColumnsVisitor.h"
33 #include "WindowContext.h"
34 
35 #include "../Parser/ParserNode.h"
36 #include "../Shared/measure.h"
37 
38 #include <algorithm>
39 #include <numeric>
40 
42 extern bool g_enable_bump_allocator;
43 namespace {
44 
45 bool node_is_aggregate(const RelAlgNode* ra) {
46  const auto compound = dynamic_cast<const RelCompound*>(ra);
47  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
48  return ((compound && compound->isAggregate()) || aggregate);
49 }
50 
51 } // namespace
52 
54  const CompilationOptions& co,
55  const ExecutionOptions& eo,
56  RenderInfo* render_info) {
57  INJECT_TIMER(executeRelAlgQuery);
58  try {
59  return executeRelAlgQueryNoRetry(query_ra, co, eo, render_info);
60  } catch (const QueryMustRunOnCpu&) {
61  if (!g_allow_cpu_retry) {
62  throw;
63  }
64  }
65  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
67  co.hoist_literals_,
68  co.opt_level_,
70  co.explain_type_,
72  if (render_info) {
73  render_info->setForceNonInSituData();
74  }
75  return executeRelAlgQueryNoRetry(query_ra, co_cpu, eo, render_info);
76 }
77 
79  const CompilationOptions& co,
80  const ExecutionOptions& eo,
81  RenderInfo* render_info) {
82  INJECT_TIMER(executeRelAlgQueryNoRetry);
83  decltype(subqueries_)().swap(subqueries_);
84 
85  const auto ra = deserialize_ra_dag(
86  query_ra, cat_, this, render_info ? render_info->getRenderQueryOptsPtr() : nullptr);
87 
88  // capture the lock acquistion time
89  auto clock_begin = timer_start();
90  std::lock_guard<std::mutex> lock(executor_->execute_mutex_);
91  int64_t queue_time_ms = timer_stop(clock_begin);
93  executor_->resetInterrupt();
94  }
95  ScopeGuard row_set_holder = [this, &render_info] {
96  if (render_info) {
97  // need to hold onto the RowSetMemOwner for potential
98  // string id lookups during render vega validation
99  render_info->row_set_mem_owner = executor_->row_set_mem_owner_;
100  }
101  cleanupPostExecution();
102  };
103  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
104  executor_->catalog_ = &cat_;
105  executor_->agg_col_range_cache_ = computeColRangesCache(ra.get());
106  executor_->string_dictionary_generations_ =
107  computeStringDictionaryGenerations(ra.get());
108  executor_->table_generations_ = computeTableGenerations(ra.get());
109 
110  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
111  auto ed_list = get_execution_descriptors(ra.get());
112 
113  if (render_info) {
114  // set render to be non-insitu in certain situations.
116  ed_list.size() > 1) {
117  // old logic
118  // disallow if more than one ED
119  render_info->setInSituDataIfUnset(false);
120  }
121  }
122 
123  if (eo.find_push_down_candidates) {
124  // this extra logic is mainly due to current limitations on multi-step queries
125  // and/or subqueries.
126  return executeRelAlgQueryWithFilterPushDown(
127  ed_list, co, eo, render_info, queue_time_ms);
128  }
129 
130  // Dispatch the subqueries first
131  for (auto subquery : subqueries_) {
132  const auto subquery_ra = subquery->getRelAlg();
133  CHECK(subquery_ra);
134  if (subquery_ra->hasContextData()) {
135  continue;
136  }
137  // Execute the subquery and cache the result.
138  RelAlgExecutor ra_executor(executor_, cat_);
139  auto result = ra_executor.executeRelAlgSubQuery(subquery.get(), co, eo);
140  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
141  }
142  return executeRelAlgSeq(ed_list, co, eo, render_info, queue_time_ms);
143 }
144 
145 namespace {
146 
147 std::unordered_set<int> get_physical_table_ids(
148  const std::unordered_set<PhysicalInput>& phys_inputs) {
149  std::unordered_set<int> physical_table_ids;
150  for (const auto& phys_input : phys_inputs) {
151  physical_table_ids.insert(phys_input.table_id);
152  }
153  return physical_table_ids;
154 }
155 
156 std::unordered_set<PhysicalInput> get_physical_inputs(
157  const Catalog_Namespace::Catalog& cat,
158  const RelAlgNode* ra) {
159  auto phys_inputs = get_physical_inputs(ra);
160  std::unordered_set<PhysicalInput> phys_inputs2;
161  for (auto& phi : phys_inputs) {
162  phys_inputs2.insert(
163  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
164  }
165  return phys_inputs2;
166 }
167 
168 } // namespace
169 
171  AggregatedColRange agg_col_range_cache;
172  const auto phys_inputs = get_physical_inputs(cat_, ra);
173  const auto phys_table_ids = get_physical_table_ids(phys_inputs);
174  std::vector<InputTableInfo> query_infos;
175  executor_->catalog_ = &cat_;
176  for (const int table_id : phys_table_ids) {
177  query_infos.emplace_back(InputTableInfo{table_id, executor_->getTableInfo(table_id)});
178  }
179  for (const auto& phys_input : phys_inputs) {
180  const auto cd = cat_.getMetadataForColumn(phys_input.table_id, phys_input.col_id);
181  CHECK(cd);
182  const auto& col_ti =
183  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
184  if (col_ti.is_number() || col_ti.is_boolean() || col_ti.is_time() ||
185  (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT)) {
186  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
187  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
188  const auto col_range =
189  getLeafColumnRange(col_var.get(), query_infos, executor_, false);
190  agg_col_range_cache.setColRange(phys_input, col_range);
191  }
192  }
193  return agg_col_range_cache;
194 }
195 
197  const RelAlgNode* ra) {
198  StringDictionaryGenerations string_dictionary_generations;
199  const auto phys_inputs = get_physical_inputs(cat_, ra);
200  for (const auto& phys_input : phys_inputs) {
201  const auto cd = cat_.getMetadataForColumn(phys_input.table_id, phys_input.col_id);
202  CHECK(cd);
203  const auto& col_ti =
204  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
205  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
206  const int dict_id = col_ti.get_comp_param();
207  const auto dd = cat_.getMetadataForDict(dict_id);
208  CHECK(dd && dd->stringDict);
209  string_dictionary_generations.setGeneration(dict_id,
210  dd->stringDict->storageEntryCount());
211  }
212  }
213  return string_dictionary_generations;
214 }
215 
217  const auto phys_table_ids = get_physical_table_inputs(ra);
218  TableGenerations table_generations;
219  for (const int table_id : phys_table_ids) {
220  const auto table_info = executor_->getTableInfo(table_id);
221  table_generations.setGeneration(
222  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
223  }
224  return table_generations;
225 }
226 
227 Executor* RelAlgExecutor::getExecutor() const {
228  return executor_;
229 }
230 
232  CHECK(executor_);
233  executor_->row_set_mem_owner_ = nullptr;
234  executor_->lit_str_dict_proxy_ = nullptr;
235 }
236 
238  const RaExecutionDesc& exec_desc,
239  const CompilationOptions& co,
240  const ExecutionOptions& eo,
241  RenderInfo* render_info) {
242  INJECT_TIMER(executeRelAlgQueryStep);
243  auto first_exec_desc = exec_desc;
244  const auto sort = dynamic_cast<const RelSort*>(first_exec_desc.getBody());
245  size_t shard_count{0};
246  if (sort) {
247  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
249  source_work_unit.exe_unit, *executor_->getCatalog());
250  if (!shard_count) {
251  // No point in sorting on the leaf, only execute the input to the sort node.
252  CHECK_EQ(size_t(1), sort->inputCount());
253  const auto source = sort->getInput(0);
254  if (sort->collationCount() || node_is_aggregate(source)) {
255  first_exec_desc = RaExecutionDesc(source);
256  }
257  }
258  }
259  std::vector<RaExecutionDesc> first_exec_desc_singleton_list{first_exec_desc};
260  const auto merge_type = (node_is_aggregate(first_exec_desc.getBody()) && !shard_count)
263  // Execute the current step. Keep the existing temporary table map intact, since we may
264  // need to use results from previous query steps.
265  return {executeRelAlgSeq(
266  first_exec_desc_singleton_list, co, eo, render_info, queue_time_ms_, true),
267  merge_type,
268  first_exec_desc.getBody()->getId(),
269  false};
270 }
271 
273  const AggregatedColRange& agg_col_range,
274  const StringDictionaryGenerations& string_dictionary_generations,
275  const TableGenerations& table_generations) {
276  // capture the lock acquistion time
277  auto clock_begin = timer_start();
279  executor_->resetInterrupt();
280  }
281  queue_time_ms_ = timer_stop(clock_begin);
282  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
283  executor_->table_generations_ = table_generations;
284  executor_->agg_col_range_cache_ = agg_col_range;
285  executor_->string_dictionary_generations_ = string_dictionary_generations;
286 }
287 
289  const CompilationOptions& co,
290  const ExecutionOptions& eo) {
291  INJECT_TIMER(executeRelAlgSubQuery);
292  auto ed_list = get_execution_descriptors(subquery->getRelAlg());
293  return executeRelAlgSeq(ed_list, co, eo, nullptr, 0);
294 }
295 
296 ExecutionResult RelAlgExecutor::executeRelAlgSeq(std::vector<RaExecutionDesc>& exec_descs,
297  const CompilationOptions& co,
298  const ExecutionOptions& eo,
299  RenderInfo* render_info,
300  const int64_t queue_time_ms,
301  const bool with_existing_temp_tables) {
302  INJECT_TIMER(executeRelAlgSeq);
303  if (!with_existing_temp_tables) {
304  decltype(temporary_tables_)().swap(temporary_tables_);
305  }
306  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
307  executor_->catalog_ = &cat_;
308  executor_->temporary_tables_ = &temporary_tables_;
309 
310  time(&now_);
311  CHECK(!exec_descs.empty());
312  const auto exec_desc_count = eo.just_explain ? size_t(1) : exec_descs.size();
313 
314  size_t i = 0;
315  for (auto it = exec_descs.begin(); it != exec_descs.end(); ++it, i++) {
316  // only render on the last step
317  executeRelAlgStep(i,
318  it,
319  co,
320  eo,
321  (it == std::prev(exec_descs.end()) ? render_info : nullptr),
322  queue_time_ms);
323  }
324 
325  return exec_descs[exec_desc_count - 1].getResult();
326 }
327 
329  std::vector<RaExecutionDesc>::iterator start_desc,
330  std::vector<RaExecutionDesc>::iterator end_desc,
331  const CompilationOptions& co,
332  const ExecutionOptions& eo,
333  RenderInfo* render_info,
334  const int64_t queue_time_ms) {
335  INJECT_TIMER(executeRelAlgSubSeq);
336  executor_->catalog_ = &cat_;
337  executor_->temporary_tables_ = &temporary_tables_;
338 
339  time(&now_);
340  CHECK(!eo.just_explain);
341 
342  size_t i = 0;
343  for (auto it = start_desc; it != end_desc; ++it, i++) {
344  // only render on the last step
345  executeRelAlgStep(i,
346  it,
347  co,
348  eo,
349  (it == std::prev(end_desc) ? render_info : nullptr),
350  queue_time_ms);
351  }
352 
353  return std::prev(end_desc)->getResult();
354 }
355 
357  const size_t i,
358  std::vector<RaExecutionDesc>::iterator exec_desc_itr,
359  const CompilationOptions& co,
360  const ExecutionOptions& eo,
361  RenderInfo* render_info,
362  const int64_t queue_time_ms) {
363  INJECT_TIMER(executeRelAlgStep);
365  auto& exec_desc = *exec_desc_itr;
366  const auto body = exec_desc.getBody();
367  if (body->isNop()) {
368  handleNop(exec_desc);
369  return;
370  }
371  const ExecutionOptions eo_work_unit{
373  eo.allow_multifrag,
374  eo.just_explain,
375  eo.allow_loop_joins,
376  eo.with_watchdog && (i == 0 || dynamic_cast<const RelProject*>(body)),
377  eo.jit_debug,
378  eo.just_validate,
384 
385  const auto compound = dynamic_cast<const RelCompound*>(body);
386  if (compound) {
387  if (compound->isDeleteViaSelect()) {
388  executeDeleteViaCompound(compound, co, eo_work_unit, render_info, queue_time_ms);
389  } else if (compound->isUpdateViaSelect()) {
390  executeUpdateViaCompound(compound, co, eo_work_unit, render_info, queue_time_ms);
391  } else {
392  exec_desc.setResult(
393  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
394  if (exec_desc.getResult().isFilterPushDownEnabled()) {
395  return;
396  }
397  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
398  }
399  return;
400  }
401  const auto project = dynamic_cast<const RelProject*>(body);
402  if (project) {
403  if (project->isDeleteViaSelect()) {
404  executeDeleteViaProject(project, co, eo_work_unit, render_info, queue_time_ms);
405  } else if (project->isUpdateViaSelect()) {
406  executeUpdateViaProject(project, co, eo_work_unit, render_info, queue_time_ms);
407  } else {
408  ssize_t prev_count = -1;
409  // Disabling the intermediate count optimization in distributed, as the previous
410  // execution descriptor will likely not hold the aggregated result.
411  if (g_skip_intermediate_count && i > 0 && !g_cluster) {
412  auto& prev_exec_desc = *(exec_desc_itr - 1);
413  if (dynamic_cast<const RelCompound*>(prev_exec_desc.getBody())) {
414  auto prev_desc = prev_exec_desc;
415  const auto& prev_exe_result = prev_desc.getResult();
416  const auto prev_result = prev_exe_result.getRows();
417  if (prev_result) {
418  prev_count = static_cast<ssize_t>(prev_result->rowCount());
419  }
420  }
421  }
422  exec_desc.setResult(executeProject(
423  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
424  if (exec_desc.getResult().isFilterPushDownEnabled()) {
425  return;
426  }
427  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
428  }
429  return;
430  }
431  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
432  if (aggregate) {
433  exec_desc.setResult(
434  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
435  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
436  return;
437  }
438  const auto filter = dynamic_cast<const RelFilter*>(body);
439  if (filter) {
440  exec_desc.setResult(
441  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
442  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
443  return;
444  }
445  const auto sort = dynamic_cast<const RelSort*>(body);
446  if (sort) {
447  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
448  if (exec_desc.getResult().isFilterPushDownEnabled()) {
449  return;
450  }
451  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
452  return;
453  }
454  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
455  if (logical_values) {
456  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
457  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
458  return;
459  }
460  const auto modify = dynamic_cast<const RelModify*>(body);
461  if (modify) {
462  exec_desc.setResult(executeModify(modify, eo_work_unit));
463  return;
464  }
465  CHECK(false);
466 }
467 
469  // just set the result of the previous node as the result of no op
470  auto body = ed.getBody();
471  CHECK(dynamic_cast<const RelAggregate*>(body));
472  CHECK_EQ(size_t(1), body->inputCount());
473  const auto input = body->getInput(0);
474  body->setOutputMetainfo(input->getOutputMetainfo());
475  const auto it = temporary_tables_.find(-input->getId());
476  CHECK(it != temporary_tables_.end());
477  // set up temp table as it could be used by the outer query or next step
478  addTemporaryTable(-body->getId(), it->second);
479 
480  ed.setResult({it->second, input->getOutputMetainfo()});
481 }
482 
483 namespace {
484 
485 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
486  public:
488 
489  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
490  return synthesized_physical_inputs_owned;
491  }
492 
493  std::unordered_set<const RexInput*> visitInput(
494  const RexInput* rex_input) const override {
495  const auto input_ra = rex_input->getSourceNode();
496  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
497  if (scan_ra) {
498  const auto td = scan_ra->getTableDescriptor();
499  if (td) {
500  const auto col_id = rex_input->getIndex();
501  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
502  if (cd && cd->columnType.get_physical_cols() > 0) {
503  CHECK(IS_GEO(cd->columnType.get_type()));
504  std::unordered_set<const RexInput*> synthesized_physical_inputs;
505  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
506  auto physical_input =
507  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
508  synthesized_physical_inputs_owned.emplace_back(physical_input);
509  synthesized_physical_inputs.insert(physical_input);
510  }
511  return synthesized_physical_inputs;
512  }
513  }
514  }
515  return {rex_input};
516  }
517 
518  protected:
519  std::unordered_set<const RexInput*> aggregateResult(
520  const std::unordered_set<const RexInput*>& aggregate,
521  const std::unordered_set<const RexInput*>& next_result) const override {
522  auto result = aggregate;
523  result.insert(next_result.begin(), next_result.end());
524  return result;
525  }
526 
527  private:
528  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
530 };
531 
532 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
533  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
534  CHECK_EQ(size_t(2), join->inputCount());
535  return join;
536  }
537  CHECK_EQ(size_t(1), ra_node->inputCount());
538  auto only_src = ra_node->getInput(0);
539  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
540  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
541  return is_join ? only_src : ra_node;
542 }
543 
544 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
546  RexUsedInputsVisitor visitor(cat);
547  const auto filter_expr = compound->getFilterExpr();
548  std::unordered_set<const RexInput*> used_inputs =
549  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
550  const auto sources_size = compound->getScalarSourcesSize();
551  for (size_t i = 0; i < sources_size; ++i) {
552  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
553  used_inputs.insert(source_inputs.begin(), source_inputs.end());
554  }
555  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
556  return std::make_pair(used_inputs, used_inputs_owned);
557 }
558 
559 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
561  CHECK_EQ(size_t(1), aggregate->inputCount());
562  std::unordered_set<const RexInput*> used_inputs;
563  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
564  const auto source = aggregate->getInput(0);
565  const auto& in_metainfo = source->getOutputMetainfo();
566  const auto group_count = aggregate->getGroupByCount();
567  CHECK_GE(in_metainfo.size(), group_count);
568  for (size_t i = 0; i < group_count; ++i) {
569  auto synthesized_used_input = new RexInput(source, i);
570  used_inputs_owned.emplace_back(synthesized_used_input);
571  used_inputs.insert(synthesized_used_input);
572  }
573  for (const auto& agg_expr : aggregate->getAggExprs()) {
574  for (size_t i = 0; i < agg_expr->size(); ++i) {
575  const auto operand_idx = agg_expr->getOperand(i);
576  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
577  auto synthesized_used_input = new RexInput(source, operand_idx);
578  used_inputs_owned.emplace_back(synthesized_used_input);
579  used_inputs.insert(synthesized_used_input);
580  }
581  }
582  return std::make_pair(used_inputs, used_inputs_owned);
583 }
584 
585 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
587  RexUsedInputsVisitor visitor(cat);
588  std::unordered_set<const RexInput*> used_inputs;
589  for (size_t i = 0; i < project->size(); ++i) {
590  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
591  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
592  }
593  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
594  return std::make_pair(used_inputs, used_inputs_owned);
595 }
596 
597 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
599  std::unordered_set<const RexInput*> used_inputs;
600  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
601  const auto data_sink_node = get_data_sink(filter);
602  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
603  const auto source = data_sink_node->getInput(nest_level);
604  const auto scan_source = dynamic_cast<const RelScan*>(source);
605  if (scan_source) {
606  CHECK(source->getOutputMetainfo().empty());
607  for (size_t i = 0; i < scan_source->size(); ++i) {
608  auto synthesized_used_input = new RexInput(scan_source, i);
609  used_inputs_owned.emplace_back(synthesized_used_input);
610  used_inputs.insert(synthesized_used_input);
611  }
612  } else {
613  const auto& partial_in_metadata = source->getOutputMetainfo();
614  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
615  auto synthesized_used_input = new RexInput(source, i);
616  used_inputs_owned.emplace_back(synthesized_used_input);
617  used_inputs.insert(synthesized_used_input);
618  }
619  }
620  }
621  return std::make_pair(used_inputs, used_inputs_owned);
622 }
623 
624 int table_id_from_ra(const RelAlgNode* ra_node) {
625  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
626  if (scan_ra) {
627  const auto td = scan_ra->getTableDescriptor();
628  CHECK(td);
629  return td->tableId;
630  }
631  return -ra_node->getId();
632 }
633 
634 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
635  const RelAlgNode* ra_node,
636  const std::vector<size_t>& input_permutation) {
637  const auto data_sink_node = get_data_sink(ra_node);
638  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
639  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
640  const auto input_node_idx =
641  input_permutation.empty() ? input_idx : input_permutation[input_idx];
642  const auto input_ra = data_sink_node->getInput(input_node_idx);
643  const auto it_ok = input_to_nest_level.emplace(input_ra, input_idx);
644  CHECK(it_ok.second);
645  LOG_IF(INFO, !input_permutation.empty())
646  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
647  }
648  return input_to_nest_level;
649 }
650 
651 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
653  const Catalog_Namespace::Catalog& cat) {
654  const auto data_sink_node = get_data_sink(ra_node);
655  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
656  CHECK_EQ(join->inputCount(), 2u);
657  const auto condition = join->getCondition();
658  RexUsedInputsVisitor visitor(cat);
659  auto condition_inputs = visitor.visit(condition);
660  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
661  visitor.get_inputs_owned());
662  return std::make_pair(condition_inputs, condition_inputs_owned);
663  }
664 
665  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
666  CHECK_GE(left_deep_join->inputCount(), 2u);
667  const auto condition = left_deep_join->getInnerCondition();
668  RexUsedInputsVisitor visitor(cat);
669  auto result = visitor.visit(condition);
670  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
671  ++nesting_level) {
672  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
673  if (outer_condition) {
674  const auto outer_result = visitor.visit(outer_condition);
675  result.insert(outer_result.begin(), outer_result.end());
676  }
677  }
678  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
679  return std::make_pair(result, used_inputs_owned);
680  }
681 
682  CHECK_EQ(ra_node->inputCount(), 1u);
683  return std::make_pair(std::unordered_set<const RexInput*>{},
684  std::vector<std::shared_ptr<RexInput>>{});
685 }
686 
687 std::vector<const RelAlgNode*> get_non_join_sequence(const RelAlgNode* ra) {
688  std::vector<const RelAlgNode*> seq;
689  for (auto join = dynamic_cast<const RelJoin*>(ra); join;
690  join = static_cast<const RelJoin*>(join->getInput(0))) {
691  CHECK_EQ(size_t(2), join->inputCount());
692  seq.emplace_back(join->getInput(1));
693  auto lhs = join->getInput(0);
694  if (!dynamic_cast<const RelJoin*>(lhs)) {
695  seq.emplace_back(lhs);
696  break;
697  }
698  }
699  std::reverse(seq.begin(), seq.end());
700  return seq;
701 }
702 
704  std::vector<InputDescriptor>& input_descs,
705  const Catalog_Namespace::Catalog& cat,
706  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
707  const RelAlgNode* ra_node,
708  const std::unordered_set<const RexInput*>& source_used_inputs,
709  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
710  std::unordered_set<InputDescriptor> input_descs_unique(input_descs.begin(),
711  input_descs.end());
712  const auto non_join_src_seq = get_non_join_sequence(get_data_sink(ra_node));
713  std::unordered_map<const RelAlgNode*, int> non_join_to_nest_level;
714  for (const auto node : non_join_src_seq) {
715  non_join_to_nest_level.insert(std::make_pair(node, non_join_to_nest_level.size()));
716  }
717  for (const auto used_input : source_used_inputs) {
718  const auto input_ra = used_input->getSourceNode();
719  const int table_id = table_id_from_ra(input_ra);
720  const auto col_id = used_input->getIndex();
721  auto it = input_to_nest_level.find(input_ra);
722  if (it == input_to_nest_level.end()) {
723  throw std::runtime_error("Bushy joins not supported");
724  }
725  const int input_desc = it->second;
726  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
727  dynamic_cast<const RelScan*>(input_ra)
728  ? cat.getColumnIdBySpi(table_id, col_id + 1)
729  : col_id,
730  table_id,
731  input_desc));
732  }
733 }
734 
735 template <class RA>
736 std::pair<std::vector<InputDescriptor>,
737  std::list<std::shared_ptr<const InputColDescriptor>>>
738 get_input_desc_impl(const RA* ra_node,
739  const std::unordered_set<const RexInput*>& used_inputs,
740  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
741  const std::vector<size_t>& input_permutation,
742  const Catalog_Namespace::Catalog& cat) {
743  std::vector<InputDescriptor> input_descs;
744  const auto data_sink_node = get_data_sink(ra_node);
745  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
746  const auto input_node_idx =
747  input_permutation.empty() ? input_idx : input_permutation[input_idx];
748  const auto input_ra = data_sink_node->getInput(input_node_idx);
749  const int table_id = table_id_from_ra(input_ra);
750  input_descs.emplace_back(table_id, input_idx);
751  }
752  std::sort(input_descs.begin(),
753  input_descs.end(),
754  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
755  return lhs.getNestLevel() < rhs.getNestLevel();
756  });
757  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
758  collect_used_input_desc(input_descs,
759  cat,
760  input_col_descs_unique,
761  ra_node,
762  used_inputs,
763  input_to_nest_level);
764  std::unordered_set<const RexInput*> join_source_used_inputs;
765  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
766  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
767  get_join_source_used_inputs(ra_node, cat);
768  collect_used_input_desc(input_descs,
769  cat,
770  input_col_descs_unique,
771  ra_node,
772  join_source_used_inputs,
773  input_to_nest_level);
774  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
775  input_col_descs_unique.begin(), input_col_descs_unique.end());
776 
777  std::sort(
778  input_col_descs.begin(),
779  input_col_descs.end(),
780  [](std::shared_ptr<const InputColDescriptor> const& lhs,
781  std::shared_ptr<const InputColDescriptor> const& rhs) {
782  if (lhs->getScanDesc().getNestLevel() == rhs->getScanDesc().getNestLevel()) {
783  return lhs->getColId() < rhs->getColId();
784  }
785  return lhs->getScanDesc().getNestLevel() < rhs->getScanDesc().getNestLevel();
786  });
787  return {input_descs,
788  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
789  input_col_descs.end())};
790 }
791 
792 template <class RA>
793 std::tuple<std::vector<InputDescriptor>,
794  std::list<std::shared_ptr<const InputColDescriptor>>,
795  std::vector<std::shared_ptr<RexInput>>>
796 get_input_desc(const RA* ra_node,
797  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
798  const std::vector<size_t>& input_permutation,
799  const Catalog_Namespace::Catalog& cat) {
800  std::unordered_set<const RexInput*> used_inputs;
801  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
802  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
803  auto input_desc_pair = get_input_desc_impl(
804  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
805  return std::make_tuple(
806  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
807 }
808 
809 size_t get_scalar_sources_size(const RelCompound* compound) {
810  return compound->getScalarSourcesSize();
811 }
812 
813 size_t get_scalar_sources_size(const RelProject* project) {
814  return project->size();
815 }
816 
817 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
818  return compound->getScalarSource(i);
819 }
820 
821 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
822  return project->getProjectAt(i);
823 }
824 
825 std::shared_ptr<Analyzer::Expr> set_transient_dict(
826  const std::shared_ptr<Analyzer::Expr> expr) {
827  const auto& ti = expr->get_type_info();
828  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
829  return expr;
830  }
831  auto transient_dict_ti = ti;
832  transient_dict_ti.set_compression(kENCODING_DICT);
833  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
834  transient_dict_ti.set_fixed_size();
835  return expr->add_cast(transient_dict_ti);
836 }
837 
839  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
840  const std::shared_ptr<Analyzer::Expr>& expr) {
841  try {
842  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
843  } catch (...) {
844  scalar_sources.push_back(fold_expr(expr.get()));
845  }
846 }
847 
848 template <class RA>
849 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
850  const RA* ra_node,
851  const RelAlgTranslator& translator) {
852  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
853  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
854  const auto scalar_rex = scalar_at(i, ra_node);
855  if (dynamic_cast<const RexRef*>(scalar_rex)) {
856  // RexRef are synthetic scalars we append at the end of the real ones
857  // for the sake of taking memory ownership, no real work needed here.
858  continue;
859  }
860 
861  const auto scalar_expr =
862  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
863  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
864  set_transient_dict_maybe(scalar_sources, rewritten_expr);
865  }
866 
867  return scalar_sources;
868 }
869 
870 std::shared_ptr<Analyzer::Expr> cast_to_column_type(std::shared_ptr<Analyzer::Expr> expr,
871  int32_t tableId,
872  const Catalog_Namespace::Catalog& cat,
873  const std::string& colName) {
874  const auto cd = *cat.getMetadataForColumn(tableId, colName);
875 
876  auto cast_ti = cd.columnType;
877 
878  // Type needs to be scrubbed because otherwise NULL values could get cut off or
879  // truncated
880  auto cast_logical_ti = get_logical_type_info(cast_ti);
881  if (cast_logical_ti.is_varlen() && cast_logical_ti.is_array()) {
882  return expr;
883  }
884 
885  // CastIR.cpp Executor::codegenCast() doesn't know how to cast from a ColumnVar
886  // so it CHECK's unless casting is skipped here.
887  if (std::dynamic_pointer_cast<Analyzer::ColumnVar>(expr)) {
888  return expr;
889  }
890 
891  // Cast the expression to match the type of the output column.
892  return expr->add_cast(cast_logical_ti);
893 }
894 
895 template <class RA>
896 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
897  const RA* ra_node,
898  const RelAlgTranslator& translator,
899  int32_t tableId,
900  const Catalog_Namespace::Catalog& cat,
901  const ColumnNameList& colNames,
902  size_t starting_projection_column_idx) {
903  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
904  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
905  const auto scalar_rex = scalar_at(i, ra_node);
906  if (dynamic_cast<const RexRef*>(scalar_rex)) {
907  // RexRef are synthetic scalars we append at the end of the real ones
908  // for the sake of taking memory ownership, no real work needed here.
909  continue;
910  }
911 
912  std::shared_ptr<Analyzer::Expr> translated_expr;
913  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
914  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
915  tableId,
916  cat,
917  colNames[i - starting_projection_column_idx]);
918  } else {
919  translated_expr = translator.translateScalarRex(scalar_rex);
920  }
921  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
922  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
923  set_transient_dict_maybe(scalar_sources, rewritten_expr);
924  }
925 
926  return scalar_sources;
927 }
928 
929 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
930  const RelCompound* compound,
931  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
932  if (!compound->isAggregate()) {
933  return {nullptr};
934  }
935  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
936  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
937  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
938  }
939  return groupby_exprs;
940 }
941 
942 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
943  const RelAggregate* aggregate,
944  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
945  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
946  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
947  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
948  }
949  return groupby_exprs;
950 }
951 
953  const RelAlgTranslator& translator) {
954  const auto filter_rex = compound->getFilterExpr();
955  const auto filter_expr =
956  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
957  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
959 }
960 
961 std::vector<Analyzer::Expr*> translate_targets(
962  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
963  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
964  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
965  const RelCompound* compound,
966  const RelAlgTranslator& translator) {
967  std::vector<Analyzer::Expr*> target_exprs;
968  for (size_t i = 0; i < compound->size(); ++i) {
969  const auto target_rex = compound->getTargetExpr(i);
970  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
971  std::shared_ptr<Analyzer::Expr> target_expr;
972  if (target_rex_agg) {
973  target_expr =
974  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
975  } else {
976  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
977  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
978  if (target_rex_ref) {
979  const auto ref_idx = target_rex_ref->getIndex();
980  CHECK_GE(ref_idx, size_t(1));
981  CHECK_LE(ref_idx, groupby_exprs.size());
982  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
983  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
984  } else {
985  target_expr = translator.translateScalarRex(target_rex_scalar);
986  auto rewritten_expr = rewrite_expr(target_expr.get());
987  target_expr = fold_expr(rewritten_expr.get());
988  try {
989  target_expr = set_transient_dict(target_expr);
990  } catch (...) {
991  // noop
992  }
993  }
994  }
995  CHECK(target_expr);
996  target_exprs_owned.push_back(target_expr);
997  target_exprs.push_back(target_expr.get());
998  }
999  return target_exprs;
1000 }
1001 
1002 std::vector<Analyzer::Expr*> translate_targets(
1003  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1004  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1005  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1006  const RelAggregate* aggregate,
1007  const RelAlgTranslator& translator) {
1008  std::vector<Analyzer::Expr*> target_exprs;
1009  size_t group_key_idx = 0;
1010  for (const auto& groupby_expr : groupby_exprs) {
1011  auto target_expr =
1012  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1013  target_exprs_owned.push_back(target_expr);
1014  target_exprs.push_back(target_expr.get());
1015  }
1016 
1017  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1018  auto target_expr =
1019  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1020  CHECK(target_expr);
1021  target_expr = fold_expr(target_expr.get());
1022  target_exprs_owned.push_back(target_expr);
1023  target_exprs.push_back(target_expr.get());
1024  }
1025  return target_exprs;
1026 }
1027 
1028 std::vector<Analyzer::Expr*> translate_targets_for_update(
1029  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1030  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1031  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1032  const RelCompound* compound,
1033  const RelAlgTranslator& translator,
1034  int32_t tableId,
1035  const Catalog_Namespace::Catalog& cat,
1036  const ColumnNameList& colNames,
1037  size_t starting_projection_column_idx) {
1038  std::vector<Analyzer::Expr*> target_exprs;
1039  for (size_t i = 0; i < compound->size(); ++i) {
1040  const auto target_rex = compound->getTargetExpr(i);
1041  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1042  std::shared_ptr<Analyzer::Expr> target_expr;
1043  if (target_rex_agg) {
1044  target_expr =
1045  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1046  } else {
1047  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1048  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1049  if (target_rex_ref) {
1050  const auto ref_idx = target_rex_ref->getIndex();
1051  CHECK_GE(ref_idx, size_t(1));
1052  CHECK_LE(ref_idx, groupby_exprs.size());
1053  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1054  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1055  } else {
1056  if (i >= starting_projection_column_idx &&
1057  i < get_scalar_sources_size(compound) - 1) {
1058  target_expr =
1059  cast_to_column_type(translator.translateScalarRex(target_rex_scalar),
1060  tableId,
1061  cat,
1062  colNames[i - starting_projection_column_idx]);
1063  } else {
1064  target_expr = translator.translateScalarRex(target_rex_scalar);
1065  }
1066  auto rewritten_expr = rewrite_expr(target_expr.get());
1067  target_expr = fold_expr(rewritten_expr.get());
1068  }
1069  }
1070  CHECK(target_expr);
1071  target_exprs_owned.push_back(target_expr);
1072  target_exprs.push_back(target_expr.get());
1073  }
1074  return target_exprs;
1075 }
1076 
1078  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1079  return agg_expr && agg_expr->get_is_distinct();
1080 }
1081 
1082 std::vector<TargetMetaInfo> get_modify_manipulated_targets_meta(
1083  ModifyManipulationTarget const* manip_node,
1084  const std::vector<Analyzer::Expr*>& target_exprs) {
1085  std::vector<TargetMetaInfo> targets_meta;
1086 
1087  for (int i = 0; i < (manip_node->getTargetColumnCount()); ++i) {
1088  CHECK(target_exprs[i]);
1089  // TODO(alex): remove the count distinct type fixup.
1090  targets_meta.emplace_back(manip_node->getTargetColumns()[i],
1091  is_count_distinct(target_exprs[i])
1092  ? SQLTypeInfo(kBIGINT, false)
1093  : target_exprs[i]->get_type_info());
1094  }
1095 
1096  return targets_meta;
1097 }
1098 
1099 template <class RA>
1100 std::vector<TargetMetaInfo> get_targets_meta(
1101  const RA* ra_node,
1102  const std::vector<Analyzer::Expr*>& target_exprs) {
1103  std::vector<TargetMetaInfo> targets_meta;
1104  for (size_t i = 0; i < ra_node->size(); ++i) {
1105  CHECK(target_exprs[i]);
1106  // TODO(alex): remove the count distinct type fixup.
1107  targets_meta.emplace_back(
1108  ra_node->getFieldName(i),
1109  is_count_distinct(target_exprs[i])
1110  ? SQLTypeInfo(kBIGINT, false)
1111  : get_logical_type_info(target_exprs[i]->get_type_info()),
1112  target_exprs[i]->get_type_info());
1113  }
1114  return targets_meta;
1115 }
1116 
1117 } // namespace
1118 
1120  const CompilationOptions& co,
1121  const ExecutionOptions& eo,
1122  RenderInfo* render_info,
1123  const int64_t queue_time_ms) {
1124  if (!compound->validateTargetColumns(
1125  yieldColumnValidator(compound->getModifiedTableDescriptor()))) {
1126  throw std::runtime_error(
1127  "Unsupported update operation encountered. (None-encoded string column updates "
1128  "are not supported.)");
1129  }
1130 
1131  const auto work_unit = createModifyCompoundWorkUnit(
1132  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1133  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1134  CompilationOptions co_project = co;
1136 
1137  try {
1139 
1140  UpdateTransactionParameters update_params(compound->getModifiedTableDescriptor(),
1141  compound->getTargetColumns(),
1142  compound->getOutputMetainfo(),
1143  compound->isVarlenUpdateRequired());
1144  auto update_callback = yieldUpdateCallback(update_params);
1145  executor_->executeUpdate(work_unit.exe_unit,
1146  table_infos.front(),
1147  co_project,
1148  eo,
1149  cat_,
1150  executor_->row_set_mem_owner_,
1151  update_callback);
1152  update_params.finalizeTransaction();
1153  } catch (...) {
1154  LOG(INFO) << "Update operation failed.";
1155  throw;
1156  }
1157 }
1158 
1160  const CompilationOptions& co,
1161  const ExecutionOptions& eo,
1162  RenderInfo* render_info,
1163  const int64_t queue_time_ms) {
1164  if (!project->validateTargetColumns(
1165  yieldColumnValidator(project->getModifiedTableDescriptor()))) {
1166  throw std::runtime_error(
1167  "Unsupported update operation encountered. (None-encoded string column updates "
1168  "are not supported.)");
1169  }
1170 
1171  auto work_unit = createModifyProjectWorkUnit(
1172  project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1173  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1174  CompilationOptions co_project = co;
1176 
1177  if (project->isSimple()) {
1178  CHECK_EQ(size_t(1), project->inputCount());
1179  const auto input_ra = project->getInput(0);
1180  if (dynamic_cast<const RelSort*>(input_ra)) {
1181  const auto& input_table =
1182  get_temporary_table(&temporary_tables_, -input_ra->getId());
1183  CHECK(input_table);
1184  work_unit.exe_unit.scan_limit = input_table->rowCount();
1185  }
1186  }
1187 
1188  try {
1190 
1191  UpdateTransactionParameters update_params(project->getModifiedTableDescriptor(),
1192  project->getTargetColumns(),
1193  project->getOutputMetainfo(),
1194  project->isVarlenUpdateRequired());
1195  auto update_callback = yieldUpdateCallback(update_params);
1196  executor_->executeUpdate(work_unit.exe_unit,
1197  table_infos.front(),
1198  co_project,
1199  eo,
1200  cat_,
1201  executor_->row_set_mem_owner_,
1202  update_callback);
1203  update_params.finalizeTransaction();
1204  } catch (...) {
1205  LOG(INFO) << "Update operation failed.";
1206  throw;
1207  }
1208 }
1209 
1211  const CompilationOptions& co,
1212  const ExecutionOptions& eo,
1213  RenderInfo* render_info,
1214  const int64_t queue_time_ms) {
1215  auto* table_descriptor = compound->getModifiedTableDescriptor();
1216  if (!table_descriptor->hasDeletedCol) {
1217  throw std::runtime_error(
1218  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1219  }
1220 
1221  const auto work_unit = createModifyCompoundWorkUnit(
1222  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1223  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1224  CompilationOptions co_project = co;
1226 
1227  try {
1229 
1230  DeleteTransactionParameters delete_params;
1231  auto delete_callback = yieldDeleteCallback(delete_params);
1232 
1233  executor_->executeUpdate(work_unit.exe_unit,
1234  table_infos.front(),
1235  co_project,
1236  eo,
1237  cat_,
1238  executor_->row_set_mem_owner_,
1239  delete_callback);
1240  delete_params.finalizeTransaction();
1241  } catch (...) {
1242  LOG(INFO) << "Delete operation failed.";
1243  throw;
1244  }
1245 }
1246 
1248  const CompilationOptions& co,
1249  const ExecutionOptions& eo,
1250  RenderInfo* render_info,
1251  const int64_t queue_time_ms) {
1252  auto* table_descriptor = project->getModifiedTableDescriptor();
1253  if (!table_descriptor->hasDeletedCol) {
1254  throw std::runtime_error(
1255  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1256  }
1257 
1258  auto work_unit = createModifyProjectWorkUnit(
1259  project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1260  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1261  CompilationOptions co_project = co;
1263 
1264  if (project->isSimple()) {
1265  CHECK_EQ(size_t(1), project->inputCount());
1266  const auto input_ra = project->getInput(0);
1267  if (dynamic_cast<const RelSort*>(input_ra)) {
1268  const auto& input_table =
1269  get_temporary_table(&temporary_tables_, -input_ra->getId());
1270  CHECK(input_table);
1271  work_unit.exe_unit.scan_limit = input_table->rowCount();
1272  }
1273  }
1274 
1275  try {
1277 
1278  DeleteTransactionParameters delete_params;
1279  auto delete_callback = yieldDeleteCallback(delete_params);
1280 
1281  executor_->executeUpdate(work_unit.exe_unit,
1282  table_infos.front(),
1283  co_project,
1284  eo,
1285  cat_,
1286  executor_->row_set_mem_owner_,
1287  delete_callback);
1288  delete_params.finalizeTransaction();
1289  } catch (...) {
1290  LOG(INFO) << "Delete operation failed.";
1291  throw;
1292  }
1293 }
1294 
1296  const CompilationOptions& co,
1297  const ExecutionOptions& eo,
1298  RenderInfo* render_info,
1299  const int64_t queue_time_ms) {
1300  const auto work_unit = createCompoundWorkUnit(
1301  compound, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1302  CompilationOptions co_compound = co;
1303  return executeWorkUnit(work_unit,
1304  compound->getOutputMetainfo(),
1305  compound->isAggregate(),
1306  co_compound,
1307  eo,
1308  render_info,
1309  queue_time_ms);
1310 }
1311 
1313  const CompilationOptions& co,
1314  const ExecutionOptions& eo,
1315  RenderInfo* render_info,
1316  const int64_t queue_time_ms) {
1317  const auto work_unit = createAggregateWorkUnit(
1318  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1319  return executeWorkUnit(work_unit,
1320  aggregate->getOutputMetainfo(),
1321  true,
1322  co,
1323  eo,
1324  render_info,
1325  queue_time_ms);
1326 }
1327 
1328 namespace {
1329 
1330 // Returns true iff the execution unit contains window functions.
1332  return std::any_of(ra_exe_unit.target_exprs.begin(),
1333  ra_exe_unit.target_exprs.end(),
1334  [](const Analyzer::Expr* expr) {
1335  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1336  });
1337 }
1338 
1339 } // namespace
1340 
1342  const CompilationOptions& co,
1343  const ExecutionOptions& eo,
1344  RenderInfo* render_info,
1345  const int64_t queue_time_ms,
1346  const ssize_t previous_count) {
1347  auto work_unit =
1348  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1349  CompilationOptions co_project = co;
1350  if (project->isSimple()) {
1351  CHECK_EQ(size_t(1), project->inputCount());
1352  const auto input_ra = project->getInput(0);
1353  if (dynamic_cast<const RelSort*>(input_ra)) {
1354  co_project.device_type_ = ExecutorDeviceType::CPU;
1355  const auto& input_table =
1356  get_temporary_table(&temporary_tables_, -input_ra->getId());
1357  CHECK(input_table);
1358  work_unit.exe_unit.scan_limit =
1359  std::min(input_table->getLimit(), input_table->rowCount());
1360  }
1361  }
1362  return executeWorkUnit(work_unit,
1363  project->getOutputMetainfo(),
1364  false,
1365  co_project,
1366  eo,
1367  render_info,
1368  queue_time_ms,
1369  previous_count);
1370 }
1371 
1372 namespace {
1373 
1374 // Creates a new expression which has the range table index set to 1. This is needed to
1375 // reuse the hash join construction helpers to generate a hash table for the window
1376 // function partition: create an equals expression with left and right sides identical
1377 // except for the range table index.
1378 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1379  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1380  if (tuple) {
1381  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1382  for (const auto& element : tuple->getTuple()) {
1383  transformed_tuple.push_back(transform_to_inner(element.get()));
1384  }
1385  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1386  }
1387  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1388  if (!col) {
1389  throw std::runtime_error("Only columns supported in the window partition for now");
1390  }
1391  return makeExpr<Analyzer::ColumnVar>(
1392  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1393 }
1394 
1395 } // namespace
1396 
1398  const CompilationOptions& co,
1399  const ExecutionOptions& eo,
1400  ColumnCacheMap& column_cache_map,
1401  const int64_t queue_time_ms) {
1402  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1403  CHECK_EQ(query_infos.size(), size_t(1));
1404  if (query_infos.front().info.fragments.size() != 1) {
1405  throw std::runtime_error(
1406  "Only single fragment tables supported for window functions for now");
1407  }
1408  query_infos.push_back(query_infos.front());
1409  auto window_project_node_context = WindowProjectNodeContext::create();
1410  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1411  ++target_index) {
1412  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1413  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1414  if (!window_func) {
1415  continue;
1416  }
1417  // Always use baseline layout hash tables for now, make the expression a tuple.
1418  const auto& partition_keys = window_func->getPartitionKeys();
1419  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1420  if (partition_keys.size() > 1) {
1421  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1422  } else {
1423  if (partition_keys.empty()) {
1424  throw std::runtime_error(
1425  "Empty window function partitions are not supported yet");
1426  }
1427  CHECK_EQ(partition_keys.size(), size_t(1));
1428  partition_key_tuple = partition_keys.front();
1429  }
1430  // Creates a tautology equality with the partition expression on both sides.
1431  const auto partition_key_cond =
1432  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1433  kBW_EQ,
1434  kONE,
1435  partition_key_tuple,
1436  transform_to_inner(partition_key_tuple.get()));
1437  auto context = createWindowFunctionContext(
1438  window_func, partition_key_cond, ra_exe_unit, query_infos, co, column_cache_map);
1439  context->compute();
1440  window_project_node_context->addWindowFunctionContext(std::move(context),
1441  target_index);
1442  }
1443 }
1444 
1445 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1446  const Analyzer::WindowFunction* window_func,
1447  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1448  const RelAlgExecutionUnit& ra_exe_unit,
1449  const std::vector<InputTableInfo>& query_infos,
1450  const CompilationOptions& co,
1451  ColumnCacheMap& column_cache_map) {
1452  const auto memory_level = co.device_type_ == ExecutorDeviceType::GPU
1455  const auto join_table_or_err =
1456  executor_->buildHashTableForQualifier(partition_key_cond,
1457  query_infos,
1458  memory_level,
1460  column_cache_map);
1461  if (!join_table_or_err.fail_reason.empty()) {
1462  throw std::runtime_error(join_table_or_err.fail_reason);
1463  }
1464  CHECK(join_table_or_err.hash_table->getHashType() ==
1466  const auto& order_keys = window_func->getOrderKeys();
1467  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1468  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1469  auto context = std::make_unique<WindowFunctionContext>(
1470  window_func, join_table_or_err.hash_table, elem_count, co.device_type_);
1471  for (const auto& order_key : order_keys) {
1472  const auto order_col =
1473  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1474  if (!order_col) {
1475  throw std::runtime_error("Only order by columns supported for now");
1476  }
1477  const int8_t* column;
1478  size_t join_col_elem_count;
1479  std::tie(column, join_col_elem_count) =
1481  *order_col,
1482  query_infos.front().info.fragments.front(),
1483  memory_level,
1484  0,
1485  chunks_owner,
1486  column_cache_map);
1487  CHECK_EQ(join_col_elem_count, elem_count);
1488  context->addOrderColumn(column, order_col.get(), chunks_owner);
1489  }
1490  return context;
1491 }
1492 
1494  const CompilationOptions& co,
1495  const ExecutionOptions& eo,
1496  RenderInfo* render_info,
1497  const int64_t queue_time_ms) {
1498  const auto work_unit =
1499  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1500  return executeWorkUnit(
1501  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1502 }
1503 
1505  const ExecutionOptions& eo) {
1506  if (eo.just_explain) {
1507  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
1508  }
1509 
1510  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
1513  executor_->getRowSetMemoryOwner(),
1514  executor_);
1515 
1516  std::vector<TargetMetaInfo> empty_targets;
1517  return {rs, empty_targets};
1518 }
1519 
1521  const RelLogicalValues* logical_values,
1522  const ExecutionOptions& eo) {
1523  if (eo.just_explain) {
1524  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1525  }
1526  QueryMemoryDescriptor query_mem_desc(
1528 
1529  const auto& tuple_type = logical_values->getTupleType();
1530  for (size_t i = 0; i < tuple_type.size(); ++i) {
1531  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
1532  }
1533  logical_values->setOutputMetainfo(tuple_type);
1534  std::vector<std::unique_ptr<Analyzer::ColumnVar>> owned_column_expressions;
1535  std::vector<Analyzer::Expr*> target_expressions;
1536  for (const auto& tuple_component : tuple_type) {
1537  const auto column_var =
1538  new Analyzer::ColumnVar(tuple_component.get_type_info(), 0, 0, 0);
1539  target_expressions.push_back(column_var);
1540  owned_column_expressions.emplace_back(column_var);
1541  }
1542  std::vector<TargetInfo> target_infos;
1543  for (const auto& tuple_type_component : tuple_type) {
1544  target_infos.emplace_back(TargetInfo{false,
1545  kCOUNT,
1546  tuple_type_component.get_type_info(),
1547  SQLTypeInfo(kNULLT, false),
1548  false,
1549  false});
1550  }
1551  auto rs = std::make_shared<ResultSet>(target_infos,
1553  query_mem_desc,
1554  executor_->getRowSetMemoryOwner(),
1555  executor_);
1556  return {rs, tuple_type};
1557 }
1558 
1559 namespace {
1560 
1561 // TODO(alex): Once we're fully migrated to the relational algebra model, change
1562 // the executor interface to use the collation directly and remove this conversion.
1563 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
1564  std::list<Analyzer::OrderEntry> result;
1565  for (size_t i = 0; i < sort->collationCount(); ++i) {
1566  const auto sort_field = sort->getCollation(i);
1567  result.emplace_back(sort_field.getField() + 1,
1568  sort_field.getSortDir() == SortDirection::Descending,
1569  sort_field.getNullsPosition() == NullSortedPosition::First);
1570  }
1571  return result;
1572 }
1573 
1574 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
1575  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
1576  if (aggregate) {
1577  return 0;
1578  }
1579  const auto compound = dynamic_cast<const RelCompound*>(ra);
1580  return (compound && compound->isAggregate()) ? 0 : limit;
1581 }
1582 
1583 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
1584  return !order_entries.empty() && order_entries.front().is_desc;
1585 }
1586 
1587 } // namespace
1588 
1590  const CompilationOptions& co,
1591  const ExecutionOptions& eo,
1592  RenderInfo* render_info,
1593  const int64_t queue_time_ms) {
1594  CHECK_EQ(size_t(1), sort->inputCount());
1595  const auto source = sort->getInput(0);
1596  if (dynamic_cast<const RelSort*>(source)) {
1597  throw std::runtime_error("Sort node not supported as input to another sort");
1598  }
1599  const bool is_aggregate = node_is_aggregate(source);
1600  auto it = leaf_results_.find(sort->getId());
1601  if (it != leaf_results_.end()) {
1602  // Add any transient string literals to the sdp on the agg
1603  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
1605  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
1606 
1607  // Handle push-down for LIMIT for multi-node
1608  auto& aggregated_result = it->second;
1609  auto& result_rows = aggregated_result.rs;
1610  const size_t limit = sort->getLimit();
1611  const size_t offset = sort->getOffset();
1612  const auto order_entries = get_order_entries(sort);
1613  if (limit || offset) {
1614  if (!order_entries.empty()) {
1615  result_rows->sort(order_entries, limit + offset);
1616  }
1617  result_rows->dropFirstN(offset);
1618  if (limit) {
1619  result_rows->keepFirstN(limit);
1620  }
1621  }
1622  ExecutionResult result(result_rows, aggregated_result.targets_meta);
1623  sort->setOutputMetainfo(aggregated_result.targets_meta);
1624 
1625  return result;
1626  }
1627  while (true) {
1628  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1629  bool is_desc{false};
1630  try {
1631  const auto source_work_unit = createSortInputWorkUnit(sort, eo.just_explain);
1632  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
1633  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
1634  auto source_result = executeWorkUnit(source_work_unit,
1635  source->getOutputMetainfo(),
1636  is_aggregate,
1637  co,
1638  eo,
1639  render_info,
1640  queue_time_ms);
1641  if (render_info && render_info->isPotentialInSituRender()) {
1642  return source_result;
1643  }
1644  if (source_result.isFilterPushDownEnabled()) {
1645  return source_result;
1646  }
1647  auto rows_to_sort = source_result.getRows();
1648  if (eo.just_explain) {
1649  return {rows_to_sort, {}};
1650  }
1651  const size_t limit = sort->getLimit();
1652  const size_t offset = sort->getOffset();
1653  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
1654  !use_speculative_top_n(source_work_unit.exe_unit,
1655  rows_to_sort->getQueryMemDesc())) {
1656  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
1657  limit + offset);
1658  }
1659  if (limit || offset) {
1660  if (g_cluster && sort->collationCount() == 0) {
1661  if (offset >= rows_to_sort->rowCount()) {
1662  rows_to_sort->dropFirstN(offset);
1663  } else {
1664  rows_to_sort->keepFirstN(limit + offset);
1665  }
1666  } else {
1667  rows_to_sort->dropFirstN(offset);
1668  if (limit) {
1669  rows_to_sort->keepFirstN(limit);
1670  }
1671  }
1672  }
1673  return {rows_to_sort, source_result.getTargetsMeta()};
1674  } catch (const SpeculativeTopNFailed&) {
1675  CHECK_EQ(size_t(1), groupby_exprs.size());
1676  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
1677  }
1678  }
1679  CHECK(false);
1680  return {std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1681  co.device_type_,
1683  nullptr,
1684  executor_),
1685  {}};
1686 }
1687 
1689  const RelSort* sort,
1690  const bool just_explain) {
1691  const auto source = sort->getInput(0);
1692  const size_t limit = sort->getLimit();
1693  const size_t offset = sort->getOffset();
1694  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
1695  const size_t scan_total_limit =
1696  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
1697  size_t max_groups_buffer_entry_guess{
1698  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
1700  const auto order_entries = get_order_entries(sort);
1701  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
1702  auto source_work_unit = createWorkUnit(source, sort_info, just_explain);
1703  const auto& source_exe_unit = source_work_unit.exe_unit;
1704  if (source_exe_unit.groupby_exprs.size() == 1) {
1705  if (!source_exe_unit.groupby_exprs.front()) {
1706  sort_algorithm = SortAlgorithm::StreamingTopN;
1707  } else {
1708  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
1709  first_oe_is_desc(order_entries))) {
1710  sort_algorithm = SortAlgorithm::Default;
1711  }
1712  }
1713  }
1714 
1715  sort->setOutputMetainfo(source->getOutputMetainfo());
1716  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
1717  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
1718  return {{source_exe_unit.input_descs,
1719  std::move(source_exe_unit.input_col_descs),
1720  source_exe_unit.simple_quals,
1721  source_exe_unit.quals,
1722  source_exe_unit.join_quals,
1723  source_exe_unit.groupby_exprs,
1724  source_exe_unit.target_exprs,
1725  nullptr,
1726  {sort_info.order_entries, sort_algorithm, limit, offset},
1727  scan_total_limit,
1728  source_exe_unit.query_features},
1729  source,
1730  max_groups_buffer_entry_guess,
1731  std::move(source_work_unit.query_rewriter),
1732  source_work_unit.input_permutation,
1733  source_work_unit.left_deep_join_input_sizes};
1734 }
1735 
1736 namespace {
1737 
1744 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
1745  CHECK(!table_infos.empty());
1746  const auto& first_table = table_infos.front();
1747  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
1748  for (const auto& table_info : table_infos) {
1749  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
1750  max_num_groups = table_info.info.getNumTuplesUpperBound();
1751  }
1752  }
1753  return std::max(max_num_groups, size_t(1));
1754 }
1755 
1763  for (const auto target_expr : ra_exe_unit.target_exprs) {
1764  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1765  return false;
1766  }
1767  }
1768  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1769  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
1770  return true;
1771  }
1772  return false;
1773 }
1774 
1775 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
1776  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
1777  ra_exe_unit.simple_quals.empty());
1778 }
1779 
1781  const RelAlgExecutionUnit& ra_exe_unit_in,
1782  const std::vector<InputTableInfo>& table_infos,
1783  const Executor* executor,
1784  const ExecutorDeviceType device_type_in,
1785  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
1786  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
1787  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
1788  const auto target_expr = ra_exe_unit.target_exprs[i];
1789  const auto agg_info = get_target_info(target_expr, g_bigint_count);
1790  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
1791  continue;
1792  }
1793  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
1794  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
1795  CHECK(arg);
1796  const auto& arg_ti = arg->get_type_info();
1797  // Avoid calling getExpressionRange for variable length types (string and array),
1798  // it'd trigger an assertion since that API expects to be called only for types
1799  // for which the notion of range is well-defined. A bit of a kludge, but the
1800  // logic to reject these types anyway is at lower levels in the stack and not
1801  // really worth pulling into a separate function for now.
1802  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
1803  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
1804  continue;
1805  }
1806  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
1807  if (arg_range.getType() != ExpressionRangeType::Integer) {
1808  continue;
1809  }
1810  // When running distributed, the threshold for using the precise implementation
1811  // must be consistent across all leaves, otherwise we could have a mix of precise
1812  // and approximate bitmaps and we cannot aggregate them.
1813  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
1814  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
1815  const auto sub_bitmap_count =
1816  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
1817  int64_t approx_bitmap_sz_bits{0};
1818  const auto error_rate =
1819  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
1820  if (error_rate) {
1821  CHECK(error_rate->get_type_info().get_type() == kINT);
1822  CHECK_GE(error_rate->get_constval().intval, 1);
1823  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
1824  } else {
1825  approx_bitmap_sz_bits = g_hll_precision_bits;
1826  }
1827  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
1828  arg_range.getIntMin(),
1829  approx_bitmap_sz_bits,
1830  true,
1831  device_type,
1832  sub_bitmap_count};
1833  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
1834  arg_range.getIntMin(),
1835  bitmap_sz_bits,
1836  false,
1837  device_type,
1838  sub_bitmap_count};
1839  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
1840  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
1841  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
1842  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
1843  target_exprs_owned.push_back(precise_count_distinct);
1844  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
1845  }
1846  }
1847  return ra_exe_unit;
1848 }
1849 
1851  RenderInfo& render_info,
1852  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
1853  const std::vector<std::shared_ptr<Analyzer::Expr>>& owned_target_exprs,
1854  const std::vector<TargetMetaInfo>& targets_meta) {
1855  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
1856  render_info.targets.clear();
1857  for (size_t i = 0; i < targets_meta.size(); ++i) {
1858  // TODO(croot): find a better way to iterate through these or a better data
1859  // structure for faster lookup to avoid the double for-loop. These vectors should
1860  // be small tho and have no real impact on performance.
1861  size_t j{0};
1862  for (j = 0; j < owned_target_exprs.size(); ++j) {
1863  if (owned_target_exprs[j].get() == work_unit_target_exprs[i]) {
1864  break;
1865  }
1866  }
1867  CHECK_LT(j, owned_target_exprs.size());
1868 
1869  const auto& meta_ti = targets_meta[i].get_physical_type_info();
1870  const auto& expr_ti = owned_target_exprs[j]->get_type_info();
1871  CHECK(meta_ti == expr_ti) << targets_meta[i].get_resname() << " " << i << "," << j
1872  << ", targets meta: " << meta_ti.get_type_name() << "("
1873  << meta_ti.get_compression_name()
1874  << "), target_expr: " << expr_ti.get_type_name() << "("
1875  << expr_ti.get_compression_name() << ")";
1876  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
1877  targets_meta[i].get_resname(), owned_target_exprs[j], false));
1878  }
1879 }
1880 
1881 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
1882  const CompilationOptions& co,
1883  const ExecutionOptions& eo) {
1885  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
1886 }
1887 
1888 } // namespace
1889 
1891  const RelAlgExecutor::WorkUnit& work_unit,
1892  const std::vector<TargetMetaInfo>& targets_meta,
1893  const bool is_agg,
1894  const CompilationOptions& co_in,
1895  const ExecutionOptions& eo,
1896  RenderInfo* render_info,
1897  const int64_t queue_time_ms,
1898  const ssize_t previous_count) {
1899  INJECT_TIMER(executeWorkUnit);
1900 
1901  auto co = co_in;
1902  ColumnCacheMap column_cache;
1903  if (is_window_execution_unit(work_unit.exe_unit)) {
1904  if (g_cluster) {
1905  throw std::runtime_error(
1906  "Window functions support not supported in distributed mode");
1907  }
1909  throw std::runtime_error("Window functions support is disabled");
1910  }
1911  co.device_type_ = ExecutorDeviceType::CPU;
1912  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
1913  }
1914  if (!eo.just_explain && eo.find_push_down_candidates) {
1915  // find potential candidates:
1916  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
1917  if (!selected_filters.empty() || eo.just_calcite_explain) {
1918  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
1919  }
1920  }
1921  const auto body = work_unit.body;
1922  CHECK(body);
1923  auto it = leaf_results_.find(body->getId());
1924  if (it != leaf_results_.end()) {
1926  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
1927  auto& aggregated_result = it->second;
1928  auto& result_rows = aggregated_result.rs;
1929  ExecutionResult result(result_rows, aggregated_result.targets_meta);
1930  body->setOutputMetainfo(aggregated_result.targets_meta);
1931  if (render_info) {
1932  build_render_targets(*render_info,
1933  work_unit.exe_unit.target_exprs,
1934  target_exprs_owned_,
1935  targets_meta);
1936  }
1937  return result;
1938  }
1939  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1940 
1942  work_unit.exe_unit, table_infos, executor_, co.device_type_, target_exprs_owned_);
1943  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
1944  if (is_window_execution_unit(ra_exe_unit)) {
1945  CHECK_EQ(table_infos.size(), size_t(1));
1946  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
1947  max_groups_buffer_entry_guess =
1948  table_infos.front().info.fragments.front().getNumTuples();
1949  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
1950  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
1951  if (previous_count > 0 && !exe_unit_has_quals(ra_exe_unit)) {
1952  ra_exe_unit.scan_limit = static_cast<size_t>(previous_count);
1953  } else {
1954  // TODO(adb): enable bump allocator path for render queries
1955  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
1956  ra_exe_unit.scan_limit = 0;
1957  ra_exe_unit.use_bump_allocator = true;
1958  } else if (!eo.just_explain) {
1959  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
1960  if (filter_count_all >= 0) {
1961  ra_exe_unit.scan_limit = std::max(filter_count_all, ssize_t(1));
1962  }
1963  }
1964  }
1965  }
1966 
1967  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1968  co.device_type_,
1970  nullptr,
1971  executor_),
1972  {}};
1973 
1974  auto execute_and_handle_errors =
1975  [&](const auto max_groups_buffer_entry_guess_in,
1976  const bool has_cardinality_estimation) -> ExecutionResult {
1977  // Note that the groups buffer entry guess may be modified during query execution.
1978  // Create a local copy so we can track those changes if we need to attempt a retry due
1979  // to OOM
1980  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
1981  try {
1982  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
1983  is_agg,
1984  table_infos,
1985  ra_exe_unit,
1986  co,
1987  eo,
1988  cat_,
1989  executor_->row_set_mem_owner_,
1990  render_info,
1991  has_cardinality_estimation,
1992  column_cache),
1993  targets_meta};
1994  } catch (const QueryExecutionError& e) {
1995  handlePersistentError(e.getErrorCode());
1996  return handleOutOfMemoryRetry(
1997  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
1998  targets_meta,
1999  is_agg,
2000  co,
2001  eo,
2002  render_info,
2004  queue_time_ms);
2005  }
2006  };
2007 
2008  try {
2009  result = execute_and_handle_errors(
2010  max_groups_buffer_entry_guess,
2012  } catch (const CardinalityEstimationRequired&) {
2013  const auto estimated_groups_buffer_entry_guess =
2014  2 * std::min(groups_approx_upper_bound(table_infos),
2015  getNDVEstimation(work_unit, is_agg, co, eo));
2016  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2017  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2018  }
2019 
2020  result.setQueueTime(queue_time_ms);
2021  if (render_info) {
2023  *render_info, work_unit.exe_unit.target_exprs, target_exprs_owned_, targets_meta);
2024  if (render_info->isPotentialInSituRender()) {
2025  // return an empty result (with the same queue time, and zero render time)
2026  return {
2027  std::make_shared<ResultSet>(queue_time_ms, 0, executor_->row_set_mem_owner_),
2028  {}};
2029  }
2030  }
2031  return result;
2032 }
2033 
2035  const bool is_agg,
2036  const CompilationOptions& co,
2037  const ExecutionOptions& eo) {
2038  const auto count =
2039  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2040  kCOUNT,
2041  nullptr,
2042  false,
2043  nullptr);
2044  const auto count_all_exe_unit =
2045  create_count_all_execution_unit(work_unit.exe_unit, count);
2046  size_t one{1};
2047  ResultSetPtr count_all_result;
2048  try {
2049  ColumnCacheMap column_cache;
2050  count_all_result =
2051  executor_->executeWorkUnit(one,
2052  is_agg,
2053  get_table_infos(work_unit.exe_unit, executor_),
2054  count_all_exe_unit,
2055  co,
2056  eo,
2057  cat_,
2058  executor_->row_set_mem_owner_,
2059  nullptr,
2060  false,
2061  column_cache);
2062  } catch (const std::exception& e) {
2063  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2064  return -1;
2065  }
2066  const auto count_row = count_all_result->getNextRow(false, false);
2067  CHECK_EQ(size_t(1), count_row.size());
2068  const auto& count_tv = count_row.front();
2069  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2070  CHECK(count_scalar_tv);
2071  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2072  CHECK(count_ptr);
2073  CHECK_GE(*count_ptr, 0);
2074  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2075  return std::max(count_upper_bound, size_t(1));
2076 }
2077 
2078 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
2079  const auto& ra_exe_unit = work_unit.exe_unit;
2080  if (ra_exe_unit.input_descs.size() != 1) {
2081  return false;
2082  }
2083  const auto& table_desc = ra_exe_unit.input_descs.front();
2084  if (table_desc.getSourceType() != InputSourceType::TABLE) {
2085  return false;
2086  }
2087  const int table_id = table_desc.getTableId();
2088  for (const auto simple_qual : ra_exe_unit.simple_quals) {
2089  const auto comp_expr =
2090  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
2091  if (!comp_expr || comp_expr->get_optype() != kEQ) {
2092  return false;
2093  }
2094  const auto lhs = comp_expr->get_left_operand();
2095  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
2096  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
2097  return false;
2098  }
2099  const auto rhs = comp_expr->get_right_operand();
2100  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
2101  if (!rhs_const) {
2102  return false;
2103  }
2104  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
2105  if (cd->isVirtualCol) {
2106  CHECK_EQ("rowid", cd->columnName);
2107  return true;
2108  }
2109  }
2110  return false;
2111 }
2112 
2114  const RelAlgExecutor::WorkUnit& work_unit,
2115  const std::vector<TargetMetaInfo>& targets_meta,
2116  const bool is_agg,
2117  const CompilationOptions& co,
2118  const ExecutionOptions& eo,
2119  RenderInfo* render_info,
2120  const bool was_multifrag_kernel_launch,
2121  const int64_t queue_time_ms) {
2122  // Disable the bump allocator
2123  // Note that this will have basically the same affect as using the bump allocator for
2124  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2125  // path and the bump allocator path for kernel per fragment execution.
2126  auto ra_exe_unit_in = work_unit.exe_unit;
2127  ra_exe_unit_in.use_bump_allocator = false;
2128 
2129  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2130  co.device_type_,
2132  nullptr,
2133  executor_),
2134  {}};
2135 
2136  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2137  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2138  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2139  false,
2140  false,
2141  eo.allow_loop_joins,
2142  eo.with_watchdog,
2143  eo.jit_debug,
2144  false,
2147  false,
2148  false,
2150 
2151  if (was_multifrag_kernel_launch) {
2152  try {
2153  // Attempt to retry using the kernel per fragment path. The smaller input size
2154  // required may allow the entire kernel to execute in GPU memory.
2155  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2156  "kernels disabled.";
2157  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2158  ra_exe_unit_in, table_infos, executor_, co.device_type_, target_exprs_owned_);
2159  ColumnCacheMap column_cache;
2160  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2161  is_agg,
2162  table_infos,
2163  ra_exe_unit,
2164  co,
2165  eo_no_multifrag,
2166  cat_,
2167  executor_->row_set_mem_owner_,
2168  nullptr,
2169  true,
2170  column_cache),
2171  targets_meta};
2172  result.setQueueTime(queue_time_ms);
2173  } catch (const QueryExecutionError& e) {
2174  handlePersistentError(e.getErrorCode());
2175  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2176  }
2177  }
2178 
2179  if (render_info) {
2180  render_info->setForceNonInSituData();
2181  }
2182 
2184  co.hoist_literals_,
2185  co.opt_level_,
2187 
2188  // Only reset the group buffer entry guess if we ran out of slots, which
2189  // suggests a
2190  // highly pathological input which prevented a good estimation of distinct tuple
2191  // count. For projection queries, this will force a per-fragment scan limit, which is
2192  // compatible with the CPU path
2193  VLOG(1) << "Resetting max groups buffer entry guess.";
2194  max_groups_buffer_entry_guess = 0;
2195 
2196  int iteration_ctr = -1;
2197  while (true) {
2198  iteration_ctr++;
2200  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type_, target_exprs_owned_);
2201  ColumnCacheMap column_cache;
2202  try {
2203  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2204  is_agg,
2205  table_infos,
2206  ra_exe_unit,
2207  co_cpu,
2208  eo_no_multifrag,
2209  cat_,
2210  executor_->row_set_mem_owner_,
2211  nullptr,
2212  true,
2213  column_cache),
2214  targets_meta};
2215  } catch (const QueryExecutionError& e) {
2216  // Ran out of slots
2217  if (e.getErrorCode() < 0) {
2218  // Even the conservative guess failed; it should only happen when we group
2219  // by a huge cardinality array. Maybe we should throw an exception instead?
2220  // Such a heavy query is entirely capable of exhausting all the host memory.
2221  CHECK(max_groups_buffer_entry_guess);
2222  // Only allow two iterations of increasingly large entry guesses up to a maximum
2223  // of 512MB per column per kernel
2224  if (g_enable_watchdog || iteration_ctr > 1) {
2225  throw std::runtime_error("Query ran out of output slots in the result");
2226  }
2227  max_groups_buffer_entry_guess *= 2;
2228  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
2229  "groups buffer entry "
2230  "guess equal to "
2231  << max_groups_buffer_entry_guess;
2232  } else {
2233  handlePersistentError(e.getErrorCode());
2234  }
2235  continue;
2236  }
2237  result.setQueueTime(queue_time_ms);
2238  return result;
2239  }
2240  return result;
2241 }
2242 
2244  LOG(ERROR) << "Query execution failed with error "
2245  << getErrorMessageFromCode(error_code);
2246  if (error_code == Executor::ERR_SPECULATIVE_TOP_OOM) {
2247  throw SpeculativeTopNFailed();
2248  }
2249  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
2250  // We ran out of GPU memory, this doesn't count as an error if the query is
2251  // allowed to continue on CPU because retry on CPU is explicitly allowed through
2252  // --allow-cpu-retry.
2253  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
2254  if (!g_allow_cpu_retry) {
2255  throw std::runtime_error(
2256  "Query ran out of GPU memory, unable to automatically retry on CPU");
2257  }
2258  return;
2259  }
2260  throw std::runtime_error(getErrorMessageFromCode(error_code));
2261 }
2262 
2264  if (error_code < 0) {
2265  return "Ran out of slots in the query output buffer";
2266  }
2267  switch (error_code) {
2269  return "Division by zero";
2271  return "Query couldn't keep the entire working set of columns in GPU memory";
2273  return "Self joins not supported yet";
2275  return "Not enough host memory to execute the query";
2277  return "Overflow or underflow";
2279  return "Query execution has exceeded the time limit";
2281  return "Query execution has been interrupted";
2283  return "Columnar conversion not supported for variable length types";
2285  return "Too many literals in the query";
2287  return "NONE ENCODED String types are not supported as input result set.";
2289  return "Not enough OpenGL memory to render the query results";
2291  return "Streaming-Top-N not supported in Render Query";
2292  }
2293  return "Other error: code " + std::to_string(error_code);
2294 }
2295 
2297  const SortInfo& sort_info,
2298  const bool just_explain) {
2299  const auto compound = dynamic_cast<const RelCompound*>(node);
2300  if (compound) {
2301  return createCompoundWorkUnit(compound, sort_info, just_explain);
2302  }
2303  const auto project = dynamic_cast<const RelProject*>(node);
2304  if (project) {
2305  return createProjectWorkUnit(project, sort_info, just_explain);
2306  }
2307  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
2308  if (aggregate) {
2309  return createAggregateWorkUnit(aggregate, sort_info, just_explain);
2310  }
2311  const auto filter = dynamic_cast<const RelFilter*>(node);
2312  CHECK(filter);
2313  return createFilterWorkUnit(filter, sort_info, just_explain);
2314 }
2315 
2316 namespace {
2317 
2319  auto sink = get_data_sink(ra);
2320  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
2321  return join->getJoinType();
2322  }
2323  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
2324  return JoinType::INNER;
2325  }
2326 
2327  return JoinType::INVALID;
2328 }
2329 
2330 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
2331  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2332  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
2333  return nullptr;
2334  }
2335  const auto equi_join_condition =
2336  dynamic_cast<const RexOperator*>(condition->getOperand(0));
2337  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
2338  return nullptr;
2339  }
2340  const auto both_are_null_condition =
2341  dynamic_cast<const RexOperator*>(condition->getOperand(1));
2342  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
2343  both_are_null_condition->size() != 2) {
2344  return nullptr;
2345  }
2346  const auto lhs_is_null =
2347  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
2348  const auto rhs_is_null =
2349  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
2350  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
2351  rhs_is_null->getOperator() != kISNULL) {
2352  return nullptr;
2353  }
2354  CHECK_EQ(size_t(1), lhs_is_null->size());
2355  CHECK_EQ(size_t(1), rhs_is_null->size());
2356  CHECK_EQ(size_t(2), equi_join_condition->size());
2357  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
2358  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
2359  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
2360  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
2361  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
2362  return nullptr;
2363  }
2364  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
2365  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
2366  RexDeepCopyVisitor deep_copy_visitor;
2367  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
2368  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
2369  eq_operands.emplace_back(lhs_op_copy.release());
2370  eq_operands.emplace_back(rhs_op_copy.release());
2371  return boost::make_unique<const RexOperator>(
2372  kBW_EQ, eq_operands, equi_join_condition->getType());
2373  }
2374  return nullptr;
2375 }
2376 
2377 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
2378  const RexScalar* scalar) {
2379  const auto condition = dynamic_cast<const RexOperator*>(scalar);
2380  if (condition && condition->getOperator() == kAND) {
2381  CHECK_GE(condition->size(), size_t(2));
2382  auto acc = get_bitwise_equals(condition->getOperand(0));
2383  if (!acc) {
2384  return nullptr;
2385  }
2386  for (size_t i = 1; i < condition->size(); ++i) {
2387  std::vector<std::unique_ptr<const RexScalar>> and_operands;
2388  and_operands.emplace_back(std::move(acc));
2389  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
2390  acc =
2391  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
2392  }
2393  return acc;
2394  }
2395  return get_bitwise_equals(scalar);
2396 }
2397 
2398 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
2399  CHECK_GE(left_deep_join->inputCount(), size_t(2));
2400  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
2401  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
2402  ++nesting_level) {
2403  if (left_deep_join->getOuterCondition(nesting_level)) {
2404  join_types[nesting_level - 1] = JoinType::LEFT;
2405  }
2406  }
2407  return join_types;
2408 }
2409 
2410 template <class RA>
2411 std::vector<size_t> do_table_reordering(
2412  std::vector<InputDescriptor>& input_descs,
2413  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
2414  const JoinQualsPerNestingLevel& left_deep_join_quals,
2415  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2416  const RA* node,
2417  const std::vector<InputTableInfo>& query_infos,
2418  const Executor* executor) {
2419  if (g_cluster) {
2420  // Disable table reordering in distributed mode. The aggregator does not have enough
2421  // information to break ties
2422  return {};
2423  }
2424  const auto& cat = *executor->getCatalog();
2425  for (const auto& table_info : query_infos) {
2426  if (table_info.table_id < 0) {
2427  continue;
2428  }
2429  const auto td = cat.getMetadataForTable(table_info.table_id);
2430  CHECK(td);
2431  if (table_is_replicated(td)) {
2432  return {};
2433  }
2434  }
2435  const auto input_permutation =
2436  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
2437  input_to_nest_level = get_input_nest_levels(node, input_permutation);
2438  std::tie(input_descs, input_col_descs, std::ignore) =
2439  get_input_desc(node, input_to_nest_level, input_permutation, cat);
2440  return input_permutation;
2441 }
2442 
2444  const RelLeftDeepInnerJoin* left_deep_join) {
2445  std::vector<size_t> input_sizes;
2446  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
2447  const auto inputs = get_node_output(left_deep_join->getInput(i));
2448  input_sizes.push_back(inputs.size());
2449  }
2450  return input_sizes;
2451 }
2452 
2453 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
2454  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
2455  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
2456  for (const auto& qual : quals) {
2457  const auto rewritten_qual = rewrite_expr(qual.get());
2458  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
2459  }
2460  return rewritten_quals;
2461 }
2462 
2463 } // namespace
2464 
2466  const RelCompound* compound,
2467  const SortInfo& sort_info,
2468  const bool just_explain) {
2469  std::vector<InputDescriptor> input_descs;
2470  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2471  auto input_to_nest_level = get_input_nest_levels(compound, {});
2472  std::tie(input_descs, input_col_descs, std::ignore) =
2473  get_input_desc(compound, input_to_nest_level, {}, cat_);
2474  const auto query_infos = get_table_infos(input_descs, executor_);
2475  CHECK_EQ(size_t(1), compound->inputCount());
2476  const auto left_deep_join =
2477  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
2478  JoinQualsPerNestingLevel left_deep_join_quals;
2479  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2480  : std::vector<JoinType>{get_join_type(compound)};
2481  if (left_deep_join) {
2482  left_deep_join_quals = translateLeftDeepJoinFilter(
2483  left_deep_join, input_descs, input_to_nest_level, just_explain);
2484  }
2485  QueryFeatureDescriptor query_features;
2486  RelAlgTranslator translator(cat_,
2487  executor_,
2488  input_to_nest_level,
2489  join_types,
2490  now_,
2491  just_explain,
2492  query_features);
2493  size_t starting_projection_column_idx =
2494  get_scalar_sources_size(compound) - compound->getTargetColumnCount() - 1;
2495  CHECK_GT(starting_projection_column_idx, 0u);
2496  const auto scalar_sources =
2498  translator,
2499  compound->getModifiedTableDescriptor()->tableId,
2500  cat_,
2501  compound->getTargetColumns(),
2502  starting_projection_column_idx);
2503  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
2504  const auto quals_cf = translate_quals(compound, translator);
2505  decltype(target_exprs_owned_) target_exprs_owned;
2506  translate_targets_for_update(target_exprs_owned,
2507  scalar_sources,
2508  groupby_exprs,
2509  compound,
2510  translator,
2511  compound->getModifiedTableDescriptor()->tableId,
2512  cat_,
2513  compound->getTargetColumns(),
2514  starting_projection_column_idx);
2515  target_exprs_owned_.insert(
2516  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
2517  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
2518  CHECK_EQ(compound->size(), target_exprs.size());
2519 
2520  const auto update_expr_iter =
2521  std::next(target_exprs.cbegin(), starting_projection_column_idx);
2522  decltype(target_exprs) filtered_target_exprs(update_expr_iter, target_exprs.end());
2523 
2524  UsedColumnsVisitor used_columns_visitor;
2525  std::unordered_set<int> id_accumulator;
2526 
2527  for (auto const& expr :
2528  boost::make_iterator_range(update_expr_iter, target_exprs.end())) {
2529  auto used_column_ids = used_columns_visitor.visit(expr);
2530  id_accumulator.insert(used_column_ids.begin(), used_column_ids.end());
2531  }
2532  for (auto const& expr : quals_cf.simple_quals) {
2533  auto simple_quals_used_column_ids = used_columns_visitor.visit(expr.get());
2534  id_accumulator.insert(simple_quals_used_column_ids.begin(),
2535  simple_quals_used_column_ids.end());
2536  }
2537  for (auto const& expr : quals_cf.quals) {
2538  auto quals_used_column_ids = used_columns_visitor.visit(expr.get());
2539  id_accumulator.insert(quals_used_column_ids.begin(), quals_used_column_ids.end());
2540  }
2541 
2542  decltype(input_col_descs) filtered_input_col_descs;
2543  for (auto col_desc : input_col_descs) {
2544  if (id_accumulator.find(col_desc->getColId()) != id_accumulator.end()) {
2545  filtered_input_col_descs.push_back(col_desc);
2546  }
2547  }
2548 
2549  const RelAlgExecutionUnit exe_unit = {input_descs,
2550  filtered_input_col_descs,
2551  quals_cf.simple_quals,
2552  rewrite_quals(quals_cf.quals),
2553  left_deep_join_quals,
2554  groupby_exprs,
2555  filtered_target_exprs,
2556  nullptr,
2557  sort_info,
2558  0,
2559  query_features,
2560  false};
2561  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
2562  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
2563  const auto targets_meta =
2564  get_modify_manipulated_targets_meta(compound, rewritten_exe_unit.target_exprs);
2565  compound->setOutputMetainfo(targets_meta);
2566  return {rewritten_exe_unit,
2567  compound,
2568  max_groups_buffer_entry_default_guess,
2569  std::move(query_rewriter)};
2570 }
2571 
2573  const RelCompound* compound,
2574  const SortInfo& sort_info,
2575  const bool just_explain) {
2576  std::vector<InputDescriptor> input_descs;
2577  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2578  auto input_to_nest_level = get_input_nest_levels(compound, {});
2579  std::tie(input_descs, input_col_descs, std::ignore) =
2580  get_input_desc(compound, input_to_nest_level, {}, cat_);
2581  const auto query_infos = get_table_infos(input_descs, executor_);
2582  CHECK_EQ(size_t(1), compound->inputCount());
2583  const auto left_deep_join =
2584  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
2585  JoinQualsPerNestingLevel left_deep_join_quals;
2586  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2587  : std::vector<JoinType>{get_join_type(compound)};
2588  std::vector<size_t> input_permutation;
2589  std::vector<size_t> left_deep_join_input_sizes;
2590  if (left_deep_join) {
2591  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
2592  left_deep_join_quals = translateLeftDeepJoinFilter(
2593  left_deep_join, input_descs, input_to_nest_level, just_explain);
2595  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
2596  join_types.end()) {
2597  input_permutation = do_table_reordering(input_descs,
2598  input_col_descs,
2599  left_deep_join_quals,
2600  input_to_nest_level,
2601  compound,
2602  query_infos,
2603  executor_);
2604  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
2605  std::tie(input_descs, input_col_descs, std::ignore) =
2606  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
2607  left_deep_join_quals = translateLeftDeepJoinFilter(
2608  left_deep_join, input_descs, input_to_nest_level, just_explain);
2609  }
2610  }
2611  QueryFeatureDescriptor query_features;
2612  RelAlgTranslator translator(cat_,
2613  executor_,
2614  input_to_nest_level,
2615  join_types,
2616  now_,
2617  just_explain,
2618  query_features);
2619  const auto scalar_sources = translate_scalar_sources(compound, translator);
2620  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
2621  const auto quals_cf = translate_quals(compound, translator);
2622  const auto target_exprs = translate_targets(
2623  target_exprs_owned_, scalar_sources, groupby_exprs, compound, translator);
2624  CHECK_EQ(compound->size(), target_exprs.size());
2625  const RelAlgExecutionUnit exe_unit = {input_descs,
2626  input_col_descs,
2627  quals_cf.simple_quals,
2628  rewrite_quals(quals_cf.quals),
2629  left_deep_join_quals,
2630  groupby_exprs,
2631  target_exprs,
2632  nullptr,
2633  sort_info,
2634  0,
2635  query_features,
2636  false};
2637  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
2638  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
2639  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
2640  compound->setOutputMetainfo(targets_meta);
2641  return {rewritten_exe_unit,
2642  compound,
2643  max_groups_buffer_entry_default_guess,
2644  std::move(query_rewriter),
2645  input_permutation,
2646  left_deep_join_input_sizes};
2647 }
2648 
2649 namespace {
2650 
2651 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
2652  CHECK(qual_expr);
2653  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
2654  if (!bin_oper || bin_oper->getOperator() != kAND) {
2655  return {qual_expr};
2656  }
2657  CHECK_GE(bin_oper->size(), size_t(2));
2658  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
2659  for (size_t i = 1; i < bin_oper->size(); ++i) {
2660  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
2661  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
2662  }
2663  return lhs_cf;
2664 }
2665 
2666 std::shared_ptr<Analyzer::Expr> build_logical_expression(
2667  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
2668  const SQLOps sql_op) {
2669  CHECK(!factors.empty());
2670  auto acc = factors.front();
2671  for (size_t i = 1; i < factors.size(); ++i) {
2672  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
2673  }
2674  return acc;
2675 }
2676 
2677 template <class QualsList>
2678 bool list_contains_expression(const QualsList& haystack,
2679  const std::shared_ptr<Analyzer::Expr>& needle) {
2680  for (const auto& qual : haystack) {
2681  if (*qual == *needle) {
2682  return true;
2683  }
2684  }
2685  return false;
2686 }
2687 
2688 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
2689 // evaluations of `p` and allows use of the original form in joins if `p`
2690 // can be used for hash joins.
2691 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
2692  const std::shared_ptr<Analyzer::Expr>& expr) {
2693  const auto expr_terms = qual_to_disjunctive_form(expr);
2694  CHECK_GE(expr_terms.size(), size_t(1));
2695  const auto& first_term = expr_terms.front();
2696  const auto first_term_factors = qual_to_conjunctive_form(first_term);
2697  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
2698  // First, collect the conjunctive components common to all the disjunctive components.
2699  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
2700  for (const auto& first_term_factor : first_term_factors.quals) {
2701  bool is_common =
2702  expr_terms.size() > 1; // Only report common factors for disjunction.
2703  for (size_t i = 1; i < expr_terms.size(); ++i) {
2704  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
2705  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
2706  is_common = false;
2707  break;
2708  }
2709  }
2710  if (is_common) {
2711  common_factors.push_back(first_term_factor);
2712  }
2713  }
2714  if (common_factors.empty()) {
2715  return expr;
2716  }
2717  // Now that the common expressions are known, collect the remaining expressions.
2718  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
2719  for (const auto& term : expr_terms) {
2720  const auto term_cf = qual_to_conjunctive_form(term);
2721  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
2722  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
2723  for (const auto& qual : term_cf.quals) {
2724  if (!list_contains_expression(common_factors, qual)) {
2725  remaining_quals.push_back(qual);
2726  }
2727  }
2728  if (!remaining_quals.empty()) {
2729  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
2730  }
2731  }
2732  // Reconstruct the expression with the transformation applied.
2733  const auto common_expr = build_logical_expression(common_factors, kAND);
2734  if (remaining_terms.empty()) {
2735  return common_expr;
2736  }
2737  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
2738  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
2739 }
2740 
2741 } // namespace
2742 
2743 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
2744  const RexScalar* join_condition,
2745  const std::vector<JoinType>& join_types,
2746  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2747  const bool just_explain) const {
2748  QueryFeatureDescriptor query_features;
2749  RelAlgTranslator translator(cat_,
2750  executor_,
2751  input_to_nest_level,
2752  join_types,
2753  now_,
2754  just_explain,
2755  query_features);
2756  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
2757  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
2758  for (const auto rex_condition_component : rex_condition_cf) {
2759  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
2760  const auto join_condition =
2762  bw_equals ? bw_equals.get() : rex_condition_component));
2763  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
2764  join_condition_quals.insert(join_condition_quals.end(),
2765  join_condition_cf.quals.begin(),
2766  join_condition_cf.quals.end());
2767  join_condition_quals.insert(join_condition_quals.end(),
2768  join_condition_cf.simple_quals.begin(),
2769  join_condition_cf.simple_quals.end());
2770  }
2771  return combine_equi_join_conditions(join_condition_quals);
2772 }
2773 
2774 // Translate left deep join filter and separate the conjunctive form qualifiers
2775 // per nesting level. The code generated for hash table lookups on each level
2776 // must dominate its uses in deeper nesting levels.
2778  const RelLeftDeepInnerJoin* join,
2779  const std::vector<InputDescriptor>& input_descs,
2780  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
2781  const bool just_explain) {
2782  const auto join_types = left_deep_join_types(join);
2783  const auto join_condition_quals = makeJoinQuals(
2784  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
2785  MaxRangeTableIndexVisitor rte_idx_visitor;
2786  JoinQualsPerNestingLevel result(input_descs.size() - 1);
2787  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
2788  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
2789  const auto outer_condition = join->getOuterCondition(rte_idx);
2790  if (outer_condition) {
2791  result[rte_idx - 1].quals =
2792  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
2793  CHECK_LE(rte_idx, join_types.size());
2794  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
2795  result[rte_idx - 1].type = JoinType::LEFT;
2796  continue;
2797  }
2798  for (const auto qual : join_condition_quals) {
2799  if (visited_quals.count(qual)) {
2800  continue;
2801  }
2802  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
2803  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
2804  const auto it_ok = visited_quals.emplace(qual);
2805  CHECK(it_ok.second);
2806  result[rte_idx - 1].quals.push_back(qual);
2807  }
2808  }
2809  CHECK_LE(rte_idx, join_types.size());
2810  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
2811  result[rte_idx - 1].type = JoinType::INNER;
2812  }
2813  return result;
2814 }
2815 
2816 namespace {
2817 
2818 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
2819  const RelAlgNode* ra_node,
2820  const size_t nest_level,
2821  const std::vector<TargetMetaInfo>& in_metainfo,
2822  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
2823  CHECK_LE(size_t(1), ra_node->inputCount());
2824  CHECK_GE(size_t(2), ra_node->inputCount());
2825  const auto input = ra_node->getInput(nest_level);
2826  const auto it_rte_idx = input_to_nest_level.find(input);
2827  CHECK(it_rte_idx != input_to_nest_level.end());
2828  const int rte_idx = it_rte_idx->second;
2829  const int table_id = table_id_from_ra(input);
2830  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
2831  const auto scan_ra = dynamic_cast<const RelScan*>(input);
2832  int input_idx = 0;
2833  for (const auto& input_meta : in_metainfo) {
2834  inputs.push_back(
2835  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
2836  table_id,
2837  scan_ra ? input_idx + 1 : input_idx,
2838  rte_idx));
2839  ++input_idx;
2840  }
2841  return inputs;
2842 }
2843 
2844 } // namespace
2845 
2847  const RelAggregate* aggregate,
2848  const SortInfo& sort_info,
2849  const bool just_explain) {
2850  std::vector<InputDescriptor> input_descs;
2851  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2852  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
2853  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
2854  std::tie(input_descs, input_col_descs, used_inputs_owned) =
2855  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
2856  const auto join_type = get_join_type(aggregate);
2857  QueryFeatureDescriptor query_features;
2858  RelAlgTranslator translator(cat_,
2859  executor_,
2860  input_to_nest_level,
2861  {join_type},
2862  now_,
2863  just_explain,
2864  query_features);
2865  CHECK_EQ(size_t(1), aggregate->inputCount());
2866  const auto source = aggregate->getInput(0);
2867  const auto& in_metainfo = source->getOutputMetainfo();
2868  const auto scalar_sources =
2869  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
2870  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
2871  const auto target_exprs = translate_targets(
2872  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
2873  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
2874  aggregate->setOutputMetainfo(targets_meta);
2875  return {{input_descs,
2876  input_col_descs,
2877  {},
2878  {},
2879  {},
2880  groupby_exprs,
2881  target_exprs,
2882  nullptr,
2883  sort_info,
2884  0,
2885  query_features,
2886  false},
2887  aggregate,
2888  max_groups_buffer_entry_default_guess,
2889  nullptr};
2890 }
2891 
2893  const RelProject* project,
2894  const SortInfo& sort_info,
2895  const bool just_explain) {
2896  std::vector<InputDescriptor> input_descs;
2897  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2898  auto input_to_nest_level = get_input_nest_levels(project, {});
2899  std::tie(input_descs, input_col_descs, std::ignore) =
2900  get_input_desc(project, input_to_nest_level, {}, cat_);
2901  const auto left_deep_join =
2902  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
2903  JoinQualsPerNestingLevel left_deep_join_quals;
2904  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2905  : std::vector<JoinType>{get_join_type(project)};
2906  if (left_deep_join) {
2907  left_deep_join_quals = translateLeftDeepJoinFilter(
2908  left_deep_join, input_descs, input_to_nest_level, just_explain);
2909  }
2910  QueryFeatureDescriptor query_features;
2911  RelAlgTranslator translator(cat_,
2912  executor_,
2913  input_to_nest_level,
2914  join_types,
2915  now_,
2916  just_explain,
2917  query_features);
2918  size_t starting_projection_column_idx =
2919  get_scalar_sources_size(project) - project->getTargetColumnCount() - 1;
2920  CHECK_GT(starting_projection_column_idx, 0u);
2921  auto target_exprs_owned =
2923  translator,
2924  project->getModifiedTableDescriptor()->tableId,
2925  cat_,
2926  project->getTargetColumns(),
2927  starting_projection_column_idx);
2928  target_exprs_owned_.insert(
2929  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
2930  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
2931  CHECK_EQ(project->size(), target_exprs.size());
2932 
2933  const auto update_expr_iter =
2934  std::next(target_exprs.cbegin(), starting_projection_column_idx);
2935  decltype(target_exprs) filtered_target_exprs(update_expr_iter, target_exprs.end());
2936 
2937  UsedColumnsVisitor used_columns_visitor;
2938  std::unordered_set<int> id_accumulator;
2939 
2940  for (auto const& expr :
2941  boost::make_iterator_range(update_expr_iter, target_exprs.end())) {
2942  auto used_column_ids = used_columns_visitor.visit(expr);
2943  id_accumulator.insert(used_column_ids.begin(), used_column_ids.end());
2944  }
2945 
2946  decltype(input_col_descs) filtered_input_col_descs;
2947  for (auto col_desc : input_col_descs) {
2948  if (id_accumulator.find(col_desc->getColId()) != id_accumulator.end()) {
2949  filtered_input_col_descs.push_back(col_desc);
2950  }
2951  }
2952 
2953  const auto targets_meta =
2954  get_modify_manipulated_targets_meta(project, filtered_target_exprs);
2955  project->setOutputMetainfo(targets_meta);
2956  return {{input_descs,
2957  filtered_input_col_descs,
2958  {},
2959  {},
2960  left_deep_join_quals,
2961  {nullptr},
2962  filtered_target_exprs,
2963  nullptr,
2964  sort_info,
2965  0,
2966  query_features,
2967  false},
2968  project,
2969  max_groups_buffer_entry_default_guess,
2970  nullptr};
2971 }
2972 
2974  const SortInfo& sort_info,
2975  const bool just_explain) {
2976  std::vector<InputDescriptor> input_descs;
2977  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
2978  auto input_to_nest_level = get_input_nest_levels(project, {});
2979  std::tie(input_descs, input_col_descs, std::ignore) =
2980  get_input_desc(project, input_to_nest_level, {}, cat_);
2981  const auto query_infos = get_table_infos(input_descs, executor_);
2982 
2983  const auto left_deep_join =
2984  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
2985  JoinQualsPerNestingLevel left_deep_join_quals;
2986  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
2987  : std::vector<JoinType>{get_join_type(project)};
2988  std::vector<size_t> input_permutation;
2989  std::vector<size_t> left_deep_join_input_sizes;
2990  if (left_deep_join) {
2991  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
2992  const auto query_infos = get_table_infos(input_descs, executor_);
2993  left_deep_join_quals = translateLeftDeepJoinFilter(
2994  left_deep_join, input_descs, input_to_nest_level, just_explain);
2996  input_permutation = do_table_reordering(input_descs,
2997  input_col_descs,
2998  left_deep_join_quals,
2999  input_to_nest_level,
3000  project,
3001  query_infos,
3002  executor_);
3003  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3004  std::tie(input_descs, input_col_descs, std::ignore) =
3005  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3006  left_deep_join_quals = translateLeftDeepJoinFilter(
3007  left_deep_join, input_descs, input_to_nest_level, just_explain);
3008  }
3009  }
3010 
3011  QueryFeatureDescriptor query_features;
3012  RelAlgTranslator translator(cat_,
3013  executor_,
3014  input_to_nest_level,
3015  join_types,
3016  now_,
3017  just_explain,
3018  query_features);
3019  const auto target_exprs_owned = translate_scalar_sources(project, translator);
3020  target_exprs_owned_.insert(
3021  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3022  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3023  const RelAlgExecutionUnit exe_unit = {input_descs,
3024  input_col_descs,
3025  {},
3026  {},
3027  left_deep_join_quals,
3028  {nullptr},
3029  target_exprs,
3030  nullptr,
3031  sort_info,
3032  0,
3033  query_features,
3034  false};
3035  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3036  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3037  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3038  project->setOutputMetainfo(targets_meta);
3039  return {rewritten_exe_unit,
3040  project,
3041  max_groups_buffer_entry_default_guess,
3042  std::move(query_rewriter),
3043  input_permutation,
3044  left_deep_join_input_sizes};
3045 }
3046 
3047 namespace {
3048 
3049 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
3051  const RelAlgTranslator& translator,
3052  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
3053  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3054  std::vector<TargetMetaInfo> in_metainfo;
3055  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
3056  const auto data_sink_node = get_data_sink(filter);
3057  auto input_it = inputs_owned.begin();
3058  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
3059  const auto source = data_sink_node->getInput(nest_level);
3060  const auto scan_source = dynamic_cast<const RelScan*>(source);
3061  if (scan_source) {
3062  CHECK(source->getOutputMetainfo().empty());
3063  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
3064  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
3065  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
3066  }
3067  const auto source_metadata =
3068  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
3069  in_metainfo.insert(
3070  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3071  exprs_owned.insert(
3072  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3073  } else {
3074  const auto& source_metadata = source->getOutputMetainfo();
3075  input_it += source_metadata.size();
3076  in_metainfo.insert(
3077  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
3078  const auto scalar_sources_owned = synthesize_inputs(
3079  data_sink_node, nest_level, source_metadata, input_to_nest_level);
3080  exprs_owned.insert(
3081  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
3082  }
3083  }
3084  return std::make_pair(in_metainfo, exprs_owned);
3085 }
3086 
3087 } // namespace
3088 
3090  const SortInfo& sort_info,
3091  const bool just_explain) {
3092  CHECK_EQ(size_t(1), filter->inputCount());
3093  std::vector<InputDescriptor> input_descs;
3094  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3095  std::vector<TargetMetaInfo> in_metainfo;
3096  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3097  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3098 
3099  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3100  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3101  get_input_desc(filter, input_to_nest_level, {}, cat_);
3102  const auto join_type = get_join_type(filter);
3103  QueryFeatureDescriptor query_features;
3104  RelAlgTranslator translator(cat_,
3105  executor_,
3106  input_to_nest_level,
3107  {join_type},
3108  now_,
3109  just_explain,
3110  query_features);
3111  std::tie(in_metainfo, target_exprs_owned) =
3112  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3113  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3114  const auto qual = fold_expr(filter_expr.get());
3115  target_exprs_owned_.insert(
3116  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3117  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3118  filter->setOutputMetainfo(in_metainfo);
3119  const auto rewritten_qual = rewrite_expr(qual.get());
3120  return {{input_descs,
3121  input_col_descs,
3122  {},
3123  {rewritten_qual ? rewritten_qual : qual},
3124  {},
3125  {nullptr},
3126  target_exprs,
3127  nullptr,
3128  sort_info,
3129  0},
3130  filter,
3131  max_groups_buffer_entry_default_guess,
3132  nullptr};
3133 }
3134 
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
const std::vector< TargetMetaInfo > getTupleType() const
const RenderQueryOptions * getRenderQueryOptsPtr() const
Definition: RenderInfo.cpp:94
ExecutionResult executeRelAlgQueryNoRetry(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput *> &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelAggregate *aggregate, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
const size_t getGroupByCount() const
JoinType
Definition: sqldefs.h:98
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
const TableDescriptor * getTableDescriptor() const
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1013
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
size_t collationCount() const
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const bool just_explain)
AggregatedColRange computeColRangesCache(const RelAlgNode *ra)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
unsigned getId() const
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner
Definition: RenderInfo.h:34
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
ExecutorDeviceType
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const bool just_explain)
void setForceNonInSituData()
Definition: RenderInfo.cpp:36
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:70
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
ExecutionResult executeRelAlgSubSeq(std::vector< RaExecutionDesc >::iterator start_desc, std::vector< RaExecutionDesc >::iterator end_desc, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
#define LOG(tag)
Definition: Logger.h:182
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:65
size_t size() const override
static SpeculativeTopNBlacklist speculative_topn_blacklist_
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr *> &target_exprs)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:38
SQLOps
Definition: sqldefs.h:29
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1347
const RexScalar * getProjectAt(const size_t idx) const
ExecutionResult executeRelAlgSeq(std::vector< RaExecutionDesc > &ed_list, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::list< Analyzer::OrderEntry > order_entries
const Rex * getTargetExpr(const size_t i) const
TableGenerations computeTableGenerations(const RelAlgNode *ra)
const Executor * executor_
Definition: ResultSet.h:808
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:89
const ExecutorOptLevel opt_level_
std::string join(T const &container, std::string const &delim)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
Definition: sqldefs.h:38
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1508
std::vector< TargetMetaInfo > get_modify_manipulated_targets_meta(ModifyManipulationTarget const *manip_node, const std::vector< Analyzer::Expr *> &target_exprs)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
#define CHECK_GE(x, y)
Definition: Logger.h:200
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
Definition: sqldefs.h:30
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const bool just_explain)
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1015
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:70
bool validateTargetColumns(VALIDATION_FUNCTOR validator) const
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelFilter *filter, const Catalog_Namespace::Catalog &cat)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:211
TableDescriptor const * getModifiedTableDescriptor() const
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr *> &work_unit_target_exprs, const std::vector< std::shared_ptr< Analyzer::Expr >> &owned_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:199
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
void handleNop(RaExecutionDesc &ed)
std::shared_ptr< const RelAlgNode > deserialize_ra_dag(const std::string &query_ra, const Catalog_Namespace::Catalog &cat, RelAlgExecutor *ra_executor, const RenderQueryOptions *render_opts)
#define LOG_IF(severity, condition)
Definition: Logger.h:273
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
SortField getCollation(const size_t i) const
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:42
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:465
size_t getIndex() const
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1016
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1017
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1014
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:176
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:55
auto const isVarlenUpdateRequired() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1004
const bool allow_multifrag
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::vector< Analyzer::Expr * > translate_targets_for_update(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
WorkUnit createModifyProjectWorkUnit(const RelProject *project, const SortInfo &sort_info, const bool just_explain)
const bool find_push_down_candidates
bool g_from_table_reordering
Definition: Execute.cpp:75
const bool just_validate
const RelAlgNode * getSourceNode() const
const SortInfo sort_info
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const bool with_dynamic_watchdog
ColumnNameList const & getTargetColumns() const
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1008
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionDesc &exec_desc, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
const JoinQualsPerNestingLevel join_quals
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
bool g_enable_bump_allocator
Definition: Execute.cpp:96
int getNestLevel() const
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
SortAlgorithm
const double gpu_input_mem_limit_percent
const RexScalar * scalar_at(const size_t i, const RelProject *project)
const RelAlgNode * getBody() const
size_t g_big_group_threshold
Definition: Execute.cpp:90
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1010
const size_t getScalarSourcesSize() const
const RexScalar * getInnerCondition() const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1012
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
Definition: sqldefs.h:37
void setGeneration(const uint32_t id, const size_t generation)
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
ExecutionResult executeRelAlgSubQuery(const RexSubQuery *subquery, const CompilationOptions &co, const ExecutionOptions &eo)
const bool register_intel_jit_listener_
static WindowProjectNodeContext * create()
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1007
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
const bool output_columnar_hint
WorkUnit createModifyCompoundWorkUnit(const RelCompound *compound, const SortInfo &sort_info, const bool just_explain)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
JoinType get_join_type(const RelAlgNode *ra)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1005
size_t getOffset() const
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:823
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
ExecutorDeviceType device_type_
std::shared_ptr< Analyzer::Expr > cast_to_column_type(std::shared_ptr< Analyzer::Expr > expr, int32_t tableId, const Catalog_Namespace::Catalog &cat, const std::string &colName)
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:257
bool node_is_aggregate(const RelAlgNode *ra)
bool g_enable_window_functions
Definition: Execute.cpp:91
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const RexScalar * getOuterCondition(const size_t nesting_level) const
const ExecutorExplainType explain_type_
#define CHECK_LT(x, y)
Definition: Logger.h:197
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: sqldefs.h:69
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:186
const bool just_calcite_explain
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator)
#define CHECK_LE(x, y)
Definition: Logger.h:198
const RexScalar * getCondition() const
const size_t getGroupByCount() const
Executor * getExecutor() const
size_t getLimit() const
bool table_is_replicated(const TableDescriptor *td)
const RexScalar * getScalarSource(const size_t i) const
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
void executeDeleteViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setGeneration(const uint32_t id, const TableGeneration &generation)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1343
Definition: sqldefs.h:71
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput *> &aggregate, const std::unordered_set< const RexInput *> &next_result) const override
bool get_is_distinct() const
Definition: Analyzer.h:990
void executeUpdateViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
size_t get_scalar_sources_size(const RelProject *project)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
T visit(const Analyzer::Expr *expr) const
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< RaExecutionDesc > get_execution_descriptors(const RelAlgNode *ra_node)
int64_t queue_time_ms_
Definition: ResultSet.h:806
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
static void handlePersistentError(const int32_t error_code)
void executeDeleteViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
const int getColumnIdBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1430
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
const size_t max_groups_buffer_entry_guess
std::list< std::shared_ptr< Analyzer::Expr > > quals
const bool allow_loop_joins
static const int32_t ERR_SPECULATIVE_TOP_OOM
Definition: Execute.h:1011
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
RANodeOutput get_node_output(const RelAlgNode *ra_node)
void executeUpdateViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput *> &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getFilterExpr() const
#define CHECK(condition)
Definition: Logger.h:187
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
Definition: sqldefs.h:31
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:72
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
WorkUnit createSortInputWorkUnit(const RelSort *, const bool just_explain)
const RelAlgNode * getInput(const size_t idx) const
bool g_enable_watchdog
Definition: Execute.cpp:69
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
void setColRange(const PhysicalInput &, const ExpressionRange &)
void cleanupPostExecution()
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map)
ExecutionResult executeRelAlgQuery(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
Definition: sqltypes.h:47
SQLTypeInfo columnType
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
void sort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:464
const bool with_dynamic_watchdog_
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
const RelAlgNode * get_data_sink(const RelAlgNode *ra_node)
const unsigned dynamic_watchdog_time_limit
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1009
void setResult(const ExecutionResult &result)
std::vector< const RelAlgNode * > get_non_join_sequence(const RelAlgNode *ra)
std::unordered_set< int > get_physical_table_ids(const std::unordered_set< PhysicalInput > &phys_inputs)
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
#define IS_GEO(T)
Definition: sqltypes.h:164
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
void executeRelAlgStep(const size_t step_idx, std::vector< RaExecutionDesc >::iterator exec_desc_itr, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
#define VLOG(n)
Definition: Logger.h:277
Type timer_start()
Definition: measure.h:40
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
std::vector< std::string > ColumnNameList
const bool with_watchdog
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:139
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
static std::shared_ptr< Analyzer::Expr > translateAggregateRex(const RexAgg *rex, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
StringDictionaryGenerations computeStringDictionaryGenerations(const RelAlgNode *ra)
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelAggregate *aggregate, const RelAlgTranslator &translator)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
void set_transient_dict_maybe(std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::shared_ptr< Analyzer::Expr > &expr)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)
const Expr * get_left_operand() const
Definition: Analyzer.h:435