OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
20 #include "Parser/ParserNode.h"
37 #include "QueryEngine/RexVisitor.h"
41 #include "Shared/measure.h"
42 #include "Shared/misc.h"
43 #include "Shared/shard_key.h"
44 
45 #include <boost/algorithm/cxx11/any_of.hpp>
46 #include <boost/range/adaptor/reversed.hpp>
47 
48 #include <algorithm>
49 #include <functional>
50 #include <numeric>
51 
53 bool g_enable_interop{false};
54 bool g_enable_union{false};
56 
57 extern bool g_enable_bump_allocator;
59 extern bool g_enable_system_tables;
60 
61 namespace {
62 
63 bool node_is_aggregate(const RelAlgNode* ra) {
64  const auto compound = dynamic_cast<const RelCompound*>(ra);
65  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
66  return ((compound && compound->isAggregate()) || aggregate);
67 }
68 
69 std::unordered_set<PhysicalInput> get_physical_inputs(
71  const RelAlgNode* ra) {
72  auto phys_inputs = get_physical_inputs(ra);
73  std::unordered_set<PhysicalInput> phys_inputs2;
74  for (auto& phi : phys_inputs) {
75  phys_inputs2.insert(
76  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
77  }
78  return phys_inputs2;
79 }
80 
81 void set_parallelism_hints(const RelAlgNode& ra_node,
82  const Catalog_Namespace::Catalog& catalog) {
83  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
84  parallelism_hints_per_table;
85  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
86  int table_id = physical_input.table_id;
87  auto table = catalog.getMetadataForTable(table_id, false);
88  if (table && table->storageType == StorageType::FOREIGN_TABLE &&
89  !table->is_system_table) {
90  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
91  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
92  auto foreign_table = catalog.getForeignTable(table_id);
93  for (const auto& fragment :
94  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
95  Chunk_NS::Chunk chunk{col_desc};
96  ChunkKey chunk_key = {
97  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
98  // do not include chunk hints that are in CPU memory
99  if (!chunk.isChunkOnDevice(
100  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
101  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
103  fragment.fragmentId});
104  }
105  }
106  }
107  }
108  if (!parallelism_hints_per_table.empty()) {
109  auto foreign_storage_mgr =
111  CHECK(foreign_storage_mgr);
112  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
113  }
114 }
115 
117  const Catalog_Namespace::Catalog& catalog) {
118  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
119  int table_id = physical_input.table_id;
120  auto table = catalog.getMetadataForTable(table_id, false);
121  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
122  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
123  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
124  auto foreign_table = catalog.getForeignTable(table_id);
125  if (col_desc->columnType.is_dict_encoded_type()) {
126  CHECK(foreign_table->fragmenter != nullptr);
127  for (const auto& fragment :
128  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
129  ChunkKey chunk_key = {
130  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
131  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
132  CHECK(metadata_map.find(col_id) != metadata_map.end());
133  if (foreign_storage::is_metadata_placeholder(*(metadata_map.at(col_id)))) {
134  // When this goes out of scope it will stay in CPU cache but become
135  // evictable
136  std::shared_ptr<Chunk_NS::Chunk> chunk =
137  Chunk_NS::Chunk::getChunk(col_desc,
138  &(catalog.getDataMgr()),
139  chunk_key,
141  0,
142  0,
143  0);
144  }
145  }
146  }
147  }
148  }
149 }
150 
152  const Catalog_Namespace::Catalog& catalog) {
153  // Iterate through ra_node inputs for types that need to be loaded pre-execution
154  // If they do not have valid metadata, load them into CPU memory to generate
155  // the metadata and leave them ready to be used by the query
156  set_parallelism_hints(ra_node, catalog);
157  prepare_string_dictionaries(ra_node, catalog);
158 }
159 
161  const Catalog_Namespace::Catalog& catalog,
162  const CompilationOptions& co) {
164  std::set<int32_t> system_table_ids;
165  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
166  int table_id = physical_input.table_id;
167  auto table = catalog.getMetadataForTable(table_id, false);
168  if (table && table->is_system_table) {
169  system_table_ids.emplace(table_id);
170  }
171  }
172  // Execute on CPU for queries involving system tables
173  if (!system_table_ids.empty() && co.device_type != ExecutorDeviceType::CPU) {
174  throw QueryMustRunOnCpu();
175  }
176  // Clear any previously cached data, since system tables depend on point in time data
177  // snapshots.
178  for (const auto table_id : system_table_ids) {
180  ChunkKey{catalog.getDatabaseId(), table_id}, Data_Namespace::CPU_LEVEL);
181  }
182  }
183 }
184 
186  return !dag.contain_not_supported_rel_node &&
187  dag.extracted_dag.compare(EMPTY_QUERY_PLAN) != 0;
188 }
189 
190 class RelLeftDeepTreeIdsCollector : public RelAlgVisitor<std::vector<unsigned>> {
191  public:
192  std::vector<unsigned> visitLeftDeepInnerJoin(
193  const RelLeftDeepInnerJoin* left_deep_join_tree) const override {
194  return {left_deep_join_tree->getId()};
195  }
196 
197  protected:
198  std::vector<unsigned> aggregateResult(
199  const std::vector<unsigned>& aggregate,
200  const std::vector<unsigned>& next_result) const override {
201  auto result = aggregate;
202  std::copy(next_result.begin(), next_result.end(), std::back_inserter(result));
203  return result;
204  }
205 };
206 
207 } // namespace
208 
210  const ExecutionOptions& eo) {
211  if (eo.find_push_down_candidates) {
212  return 0;
213  }
214 
215  if (eo.just_explain) {
216  return 0;
217  }
218 
219  CHECK(query_dag_);
220 
221  query_dag_->resetQueryExecutionState();
222  const auto& ra = query_dag_->getRootNode();
223 
224  auto lock = executor_->acquireExecuteMutex();
225  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
226  const auto phys_inputs = get_physical_inputs(cat_, &ra);
227  const auto phys_table_ids = get_physical_table_inputs(&ra);
228  executor_->setCatalog(&cat_);
229  executor_->setupCaching(phys_inputs, phys_table_ids);
230 
231  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
232  auto ed_seq = RaExecutionSequence(&ra);
233 
234  if (!getSubqueries().empty()) {
235  return 0;
236  }
237 
238  CHECK(!ed_seq.empty());
239  if (ed_seq.size() > 1) {
240  return 0;
241  }
242 
245  executor_->setCatalog(&cat_);
246  executor_->temporary_tables_ = &temporary_tables_;
247 
249  auto exec_desc_ptr = ed_seq.getDescriptor(0);
250  CHECK(exec_desc_ptr);
251  auto& exec_desc = *exec_desc_ptr;
252  const auto body = exec_desc.getBody();
253  if (body->isNop()) {
254  return 0;
255  }
256 
257  const auto project = dynamic_cast<const RelProject*>(body);
258  if (project) {
259  auto work_unit =
260  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
261 
262  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
263  executor_);
264  }
265 
266  const auto compound = dynamic_cast<const RelCompound*>(body);
267  if (compound) {
268  if (compound->isDeleteViaSelect()) {
269  return 0;
270  } else if (compound->isUpdateViaSelect()) {
271  return 0;
272  } else {
273  if (compound->isAggregate()) {
274  return 0;
275  }
276 
277  const auto work_unit =
278  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
279 
280  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
281  executor_);
282  }
283  }
284 
285  return 0;
286 }
287 
289  const ExecutionOptions& eo,
290  const bool just_explain_plan,
291  RenderInfo* render_info) {
292  CHECK(query_dag_);
293  auto timer = DEBUG_TIMER(__func__);
295 
296  auto run_query = [&](const CompilationOptions& co_in) {
297  auto execution_result =
298  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
300  VLOG(1) << "Running post execution callback.";
301  (*post_execution_callback_)();
302  }
303  return execution_result;
304  };
305 
306  try {
307  return run_query(co);
308  } catch (const QueryMustRunOnCpu&) {
309  if (!g_allow_cpu_retry) {
310  throw;
311  }
312  }
313  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
314  auto co_cpu = CompilationOptions::makeCpuOnly(co);
315 
316  if (render_info) {
317  render_info->setForceNonInSituData();
318  }
319  return run_query(co_cpu);
320 }
321 
323  const ExecutionOptions& eo,
324  const bool just_explain_plan,
325  RenderInfo* render_info) {
327  auto timer = DEBUG_TIMER(__func__);
328  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
329 
330  query_dag_->resetQueryExecutionState();
331  const auto& ra = query_dag_->getRootNode();
332 
333  // capture the lock acquistion time
334  auto clock_begin = timer_start();
336  executor_->resetInterrupt();
337  }
338  std::string query_session{""};
339  std::string query_str{"N/A"};
340  std::string query_submitted_time{""};
341  // gather necessary query's info
342  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
343  query_session = query_state_->getConstSessionInfo()->get_session_id();
344  query_str = query_state_->getQueryStr();
345  query_submitted_time = query_state_->getQuerySubmittedTime();
346  }
347 
348  auto validate_or_explain_query =
349  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
350  auto interruptable = !render_info && !query_session.empty() &&
351  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
352  if (interruptable) {
353  // if we reach here, the current query which was waiting an idle executor
354  // within the dispatch queue is now scheduled to the specific executor
355  // (not UNITARY_EXECUTOR)
356  // so we update the query session's status with the executor that takes this query
357  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
358  query_session, query_str, query_submitted_time);
359 
360  // now the query is going to be executed, so update the status as
361  // "RUNNING_QUERY_KERNEL"
362  executor_->updateQuerySessionStatus(
363  query_session,
364  query_submitted_time,
365  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
366  }
367 
368  // so it should do cleanup session info after finishing its execution
369  ScopeGuard clearQuerySessionInfo =
370  [this, &query_session, &interruptable, &query_submitted_time] {
371  // reset the runtime query interrupt status after the end of query execution
372  if (interruptable) {
373  // cleanup running session's info
374  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
375  }
376  };
377 
378  auto acquire_execute_mutex = [](Executor * executor) -> auto {
379  auto ret = executor->acquireExecuteMutex();
380  return ret;
381  };
382  // now we acquire executor lock in here to make sure that this executor holds
383  // all necessary resources and at the same time protect them against other executor
384  auto lock = acquire_execute_mutex(executor_);
385 
386  if (interruptable) {
387  // check whether this query session is "already" interrupted
388  // this case occurs when there is very short gap between being interrupted and
389  // taking the execute lock
390  // if so we have to remove "all" queries initiated by this session and we do in here
391  // without running the query
392  try {
393  executor_->checkPendingQueryStatus(query_session);
394  } catch (QueryExecutionError& e) {
396  throw std::runtime_error("Query execution has been interrupted (pending query)");
397  }
398  throw e;
399  } catch (...) {
400  throw std::runtime_error("Checking pending query status failed: unknown error");
401  }
402  }
403  int64_t queue_time_ms = timer_stop(clock_begin);
404 
406 
407  // Notify foreign tables to load prior to caching
409 
410  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
411  const auto phys_inputs = get_physical_inputs(cat_, &ra);
412  const auto phys_table_ids = get_physical_table_inputs(&ra);
413  executor_->setCatalog(&cat_);
414  executor_->setupCaching(phys_inputs, phys_table_ids);
415 
416  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
417  auto ed_seq = RaExecutionSequence(&ra);
418 
419  if (just_explain_plan) {
420  std::stringstream ss;
421  std::vector<const RelAlgNode*> nodes;
422  for (size_t i = 0; i < ed_seq.size(); i++) {
423  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
424  }
425  size_t ctr = nodes.size();
426  size_t tab_ctr = 0;
427  for (auto& body : boost::adaptors::reverse(nodes)) {
428  const auto index = ctr--;
429  const auto tabs = std::string(tab_ctr++, '\t');
430  CHECK(body);
431  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
432  if (auto sort = dynamic_cast<const RelSort*>(body)) {
433  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
434  }
435  if (dynamic_cast<const RelProject*>(body) ||
436  dynamic_cast<const RelCompound*>(body)) {
437  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
438  ss << tabs << " : " << join->toString() << "\n";
439  }
440  }
441  }
442  const auto& subqueries = getSubqueries();
443  if (!subqueries.empty()) {
444  ss << "Subqueries: "
445  << "\n";
446  for (const auto& subquery : subqueries) {
447  const auto ra = subquery->getRelAlg();
448  ss << "\t" << ra->toString() << "\n";
449  }
450  }
451  auto rs = std::make_shared<ResultSet>(ss.str());
452  return {rs, {}};
453  }
454 
455  if (render_info) {
456  // set render to be non-insitu in certain situations.
458  ed_seq.size() > 1) {
459  // old logic
460  // disallow if more than one ED
461  render_info->setInSituDataIfUnset(false);
462  }
463  }
464 
465  if (eo.find_push_down_candidates) {
466  // this extra logic is mainly due to current limitations on multi-step queries
467  // and/or subqueries.
469  ed_seq, co, eo, render_info, queue_time_ms);
470  }
471  timer_setup.stop();
472 
473  // Dispatch the subqueries first
474  for (auto subquery : getSubqueries()) {
475  const auto subquery_ra = subquery->getRelAlg();
476  CHECK(subquery_ra);
477  if (subquery_ra->hasContextData()) {
478  continue;
479  }
480  // Execute the subquery and cache the result.
481  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
482  RaExecutionSequence subquery_seq(subquery_ra);
483  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
484  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
485  }
486  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
487 }
488 
490  AggregatedColRange agg_col_range_cache;
491  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
492  return executor_->computeColRangesCache(phys_inputs);
493 }
494 
496  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
497  return executor_->computeStringDictionaryGenerations(phys_inputs);
498 }
499 
501  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
502  return executor_->computeTableGenerations(phys_table_ids);
503 }
504 
505 Executor* RelAlgExecutor::getExecutor() const {
506  return executor_;
507 }
508 
510  CHECK(executor_);
511  executor_->row_set_mem_owner_ = nullptr;
512 }
513 
514 std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
516  auto sort_node = dynamic_cast<const RelSort*>(root_node);
517  if (sort_node) {
518  // we assume that test query that needs join info does not contain any sort node
519  return {};
520  }
521  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
522  RelLeftDeepTreeIdsCollector visitor;
523  auto left_deep_tree_ids = visitor.visit(root_node);
524  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
525 }
526 
527 namespace {
528 
530  CHECK_EQ(size_t(1), sort->inputCount());
531  const auto source = sort->getInput(0);
532  if (dynamic_cast<const RelSort*>(source)) {
533  throw std::runtime_error("Sort node not supported as input to another sort");
534  }
535 }
536 
537 } // namespace
538 
540  const RaExecutionSequence& seq,
541  const size_t step_idx,
542  const CompilationOptions& co,
543  const ExecutionOptions& eo,
544  RenderInfo* render_info) {
545  INJECT_TIMER(executeRelAlgQueryStep);
546 
547  auto exe_desc_ptr = seq.getDescriptor(step_idx);
548  CHECK(exe_desc_ptr);
549  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
550 
551  size_t shard_count{0};
552  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
553  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
554  };
555 
556  if (sort) {
558  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
560  source_work_unit.exe_unit, *executor_->getCatalog());
561  if (!shard_count) {
562  // No point in sorting on the leaf, only execute the input to the sort node.
563  CHECK_EQ(size_t(1), sort->inputCount());
564  const auto source = sort->getInput(0);
565  if (sort->collationCount() || node_is_aggregate(source)) {
566  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
567  CHECK_EQ(temp_seq.size(), size_t(1));
568  ExecutionOptions eo_copy = {
570  eo.allow_multifrag,
571  eo.just_explain,
572  eo.allow_loop_joins,
573  eo.with_watchdog,
574  eo.jit_debug,
575  eo.just_validate || sort->isEmptyResult(),
584  eo.executor_type,
585  };
586  // Use subseq to avoid clearing existing temporary tables
587  return {
588  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
589  merge_type(source),
590  source->getId(),
591  false};
592  }
593  }
594  }
597  std::make_pair(step_idx, step_idx + 1),
598  co,
599  eo,
600  render_info,
602  merge_type(exe_desc_ptr->getBody()),
603  exe_desc_ptr->getBody()->getId(),
604  false};
606  VLOG(1) << "Running post execution callback.";
607  (*post_execution_callback_)();
608  }
609  return result;
610 }
611 
613  const AggregatedColRange& agg_col_range,
614  const StringDictionaryGenerations& string_dictionary_generations,
615  const TableGenerations& table_generations) {
616  // capture the lock acquistion time
617  auto clock_begin = timer_start();
619  executor_->resetInterrupt();
620  }
621  queue_time_ms_ = timer_stop(clock_begin);
622  executor_->row_set_mem_owner_ =
623  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
624  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
625  executor_->table_generations_ = table_generations;
626  executor_->agg_col_range_cache_ = agg_col_range;
627 }
628 
630  const CompilationOptions& co,
631  const ExecutionOptions& eo,
632  RenderInfo* render_info,
633  const int64_t queue_time_ms,
634  const bool with_existing_temp_tables) {
636  auto timer = DEBUG_TIMER(__func__);
637  if (!with_existing_temp_tables) {
639  }
642  executor_->setCatalog(&cat_);
643  executor_->temporary_tables_ = &temporary_tables_;
644 
645  time(&now_);
646  CHECK(!seq.empty());
647 
648  auto get_descriptor_count = [&seq, &eo]() -> size_t {
649  if (eo.just_explain) {
650  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
651  // run the logical values descriptor to generate the result set, then the next
652  // descriptor to generate the explain
653  CHECK_GE(seq.size(), size_t(2));
654  return 2;
655  } else {
656  return 1;
657  }
658  } else {
659  return seq.size();
660  }
661  };
662 
663  const auto exec_desc_count = get_descriptor_count();
664  // this join info needs to be maintained throughout an entire query runtime
665  for (size_t i = 0; i < exec_desc_count; i++) {
666  VLOG(1) << "Executing query step " << i;
667  // only render on the last step
668  try {
669  executeRelAlgStep(seq,
670  i,
671  co,
672  eo,
673  (i == exec_desc_count - 1) ? render_info : nullptr,
674  queue_time_ms);
675  } catch (const QueryMustRunOnCpu&) {
676  // Do not allow per-step retry if flag is off or in distributed mode
677  // TODO(todd): Determine if and when we can relax this restriction
678  // for distributed
681  throw;
682  }
683  LOG(INFO) << "Retrying current query step " << i << " on CPU";
684  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
685  executeRelAlgStep(seq,
686  i,
687  co_cpu,
688  eo,
689  (i == exec_desc_count - 1) ? render_info : nullptr,
690  queue_time_ms);
691  } catch (const NativeExecutionError&) {
692  if (!g_enable_interop) {
693  throw;
694  }
695  auto eo_extern = eo;
696  eo_extern.executor_type = ::ExecutorType::Extern;
697  auto exec_desc_ptr = seq.getDescriptor(i);
698  const auto body = exec_desc_ptr->getBody();
699  const auto compound = dynamic_cast<const RelCompound*>(body);
700  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
701  LOG(INFO) << "Also failed to run the query using interoperability";
702  throw;
703  }
704  executeRelAlgStep(seq,
705  i,
706  co,
707  eo_extern,
708  (i == exec_desc_count - 1) ? render_info : nullptr,
709  queue_time_ms);
710  }
711  }
712 
713  return seq.getDescriptor(exec_desc_count - 1)->getResult();
714 }
715 
717  const RaExecutionSequence& seq,
718  const std::pair<size_t, size_t> interval,
719  const CompilationOptions& co,
720  const ExecutionOptions& eo,
721  RenderInfo* render_info,
722  const int64_t queue_time_ms) {
724  executor_->setCatalog(&cat_);
725  executor_->temporary_tables_ = &temporary_tables_;
727  time(&now_);
728  for (size_t i = interval.first; i < interval.second; i++) {
729  // only render on the last step
730  try {
731  executeRelAlgStep(seq,
732  i,
733  co,
734  eo,
735  (i == interval.second - 1) ? render_info : nullptr,
736  queue_time_ms);
737  } catch (const QueryMustRunOnCpu&) {
738  // Do not allow per-step retry if flag is off or in distributed mode
739  // TODO(todd): Determine if and when we can relax this restriction
740  // for distributed
743  throw;
744  }
745  LOG(INFO) << "Retrying current query step " << i << " on CPU";
746  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
747  executeRelAlgStep(seq,
748  i,
749  co_cpu,
750  eo,
751  (i == interval.second - 1) ? render_info : nullptr,
752  queue_time_ms);
753  }
754  }
755 
756  return seq.getDescriptor(interval.second - 1)->getResult();
757 }
758 
760  const size_t step_idx,
761  const CompilationOptions& co,
762  const ExecutionOptions& eo,
763  RenderInfo* render_info,
764  const int64_t queue_time_ms) {
766  auto timer = DEBUG_TIMER(__func__);
768  auto exec_desc_ptr = seq.getDescriptor(step_idx);
769  CHECK(exec_desc_ptr);
770  auto& exec_desc = *exec_desc_ptr;
771  const auto body = exec_desc.getBody();
772  if (body->isNop()) {
773  handleNop(exec_desc);
774  return;
775  }
776 
777  const ExecutionOptions eo_work_unit{
779  eo.allow_multifrag,
780  eo.just_explain,
781  eo.allow_loop_joins,
782  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
783  eo.jit_debug,
784  eo.just_validate,
793  eo.executor_type,
794  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
795 
796  auto handle_hint = [co,
797  eo_work_unit,
798  body,
799  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
800  ExecutionOptions eo_hint_applied = eo_work_unit;
801  CompilationOptions co_hint_applied = co;
802  auto target_node = body;
803  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
804  target_node = sort_body->getInput(0);
805  }
806  auto query_hints = getParsedQueryHint(target_node);
807  auto columnar_output_hint_enabled = false;
808  auto rowwise_output_hint_enabled = false;
809  if (query_hints) {
810  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
811  VLOG(1) << "A user forces to run the query on the CPU execution mode";
812  co_hint_applied.device_type = ExecutorDeviceType::CPU;
813  }
814  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
815  VLOG(1) << "A user forces the query to run with columnar output";
816  columnar_output_hint_enabled = true;
817  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
818  VLOG(1) << "A user forces the query to run with rowwise output";
819  rowwise_output_hint_enabled = true;
820  }
821  }
822  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
823  ? !rowwise_output_hint_enabled
824  : columnar_output_hint_enabled;
825  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
826  LOG(INFO) << "Currently, we do not support applying query hint to change query "
827  "output layout in distributed mode.";
828  }
829  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
830  return std::make_pair(co_hint_applied, eo_hint_applied);
831  };
832 
833  // Notify foreign tables to load prior to execution
835  auto hint_applied = handle_hint();
836  const auto compound = dynamic_cast<const RelCompound*>(body);
837  if (compound) {
838  if (compound->isDeleteViaSelect()) {
839  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
840  } else if (compound->isUpdateViaSelect()) {
841  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
842  } else {
843  exec_desc.setResult(executeCompound(
844  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
845  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
846  << static_cast<int>(-compound->getId()) << ", ...)"
847  << " exec_desc.getResult().getDataPtr()->rowCount()="
848  << exec_desc.getResult().getDataPtr()->rowCount();
849  if (exec_desc.getResult().isFilterPushDownEnabled()) {
850  return;
851  }
852  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
853  }
854  return;
855  }
856  const auto project = dynamic_cast<const RelProject*>(body);
857  if (project) {
858  if (project->isDeleteViaSelect()) {
859  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
860  } else if (project->isUpdateViaSelect()) {
861  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
862  } else {
863  std::optional<size_t> prev_count;
864  // Disabling the intermediate count optimization in distributed, as the previous
865  // execution descriptor will likely not hold the aggregated result.
866  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
867  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
868  CHECK(prev_exec_desc);
869  RelAlgNode const* prev_body = prev_exec_desc->getBody();
870  // This optimization needs to be restricted in its application for UNION, which
871  // can have 2 input nodes in which neither should restrict the count of the other.
872  // However some non-UNION queries are measurably slower with this restriction, so
873  // it is only applied when g_enable_union is true.
874  bool const parent_check =
875  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
876  // If the previous node produced a reliable count, skip the pre-flight count
877  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
878  dynamic_cast<const RelLogicalValues*>(prev_body))) {
879  const auto& prev_exe_result = prev_exec_desc->getResult();
880  const auto prev_result = prev_exe_result.getRows();
881  if (prev_result) {
882  prev_count = prev_result->rowCount();
883  VLOG(3) << "Setting output row count for projection node to previous node ("
884  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
885  }
886  }
887  }
888  exec_desc.setResult(executeProject(project,
889  hint_applied.first,
890  hint_applied.second,
891  render_info,
892  queue_time_ms,
893  prev_count));
894  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
895  << static_cast<int>(-project->getId()) << ", ...)"
896  << " exec_desc.getResult().getDataPtr()->rowCount()="
897  << exec_desc.getResult().getDataPtr()->rowCount();
898  if (exec_desc.getResult().isFilterPushDownEnabled()) {
899  return;
900  }
901  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
902  }
903  return;
904  }
905  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
906  if (aggregate) {
907  exec_desc.setResult(executeAggregate(
908  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
909  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
910  return;
911  }
912  const auto filter = dynamic_cast<const RelFilter*>(body);
913  if (filter) {
914  exec_desc.setResult(executeFilter(
915  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
916  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
917  return;
918  }
919  const auto sort = dynamic_cast<const RelSort*>(body);
920  if (sort) {
921  exec_desc.setResult(executeSort(
922  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
923  if (exec_desc.getResult().isFilterPushDownEnabled()) {
924  return;
925  }
926  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
927  return;
928  }
929  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
930  if (logical_values) {
931  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
932  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
933  return;
934  }
935  const auto modify = dynamic_cast<const RelModify*>(body);
936  if (modify) {
937  exec_desc.setResult(executeModify(modify, hint_applied.second));
938  return;
939  }
940  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
941  if (logical_union) {
942  exec_desc.setResult(executeUnion(logical_union,
943  seq,
944  hint_applied.first,
945  hint_applied.second,
946  render_info,
947  queue_time_ms));
948  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
949  return;
950  }
951  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
952  if (table_func) {
953  exec_desc.setResult(executeTableFunction(
954  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
955  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
956  return;
957  }
958  LOG(FATAL) << "Unhandled body type: " << body->toString();
959 }
960 
962  // just set the result of the previous node as the result of no op
963  auto body = ed.getBody();
964  CHECK(dynamic_cast<const RelAggregate*>(body));
965  CHECK_EQ(size_t(1), body->inputCount());
966  const auto input = body->getInput(0);
967  body->setOutputMetainfo(input->getOutputMetainfo());
968  const auto it = temporary_tables_.find(-input->getId());
969  CHECK(it != temporary_tables_.end());
970  // set up temp table as it could be used by the outer query or next step
971  addTemporaryTable(-body->getId(), it->second);
972 
973  ed.setResult({it->second, input->getOutputMetainfo()});
974 }
975 
976 namespace {
977 
978 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
979  public:
981 
982  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
983  return synthesized_physical_inputs_owned;
984  }
985 
986  std::unordered_set<const RexInput*> visitInput(
987  const RexInput* rex_input) const override {
988  const auto input_ra = rex_input->getSourceNode();
989  CHECK(input_ra);
990  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
991  if (scan_ra) {
992  const auto td = scan_ra->getTableDescriptor();
993  if (td) {
994  const auto col_id = rex_input->getIndex();
995  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
996  if (cd && cd->columnType.get_physical_cols() > 0) {
997  CHECK(IS_GEO(cd->columnType.get_type()));
998  std::unordered_set<const RexInput*> synthesized_physical_inputs;
999  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
1000  auto physical_input =
1001  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
1002  synthesized_physical_inputs_owned.emplace_back(physical_input);
1003  synthesized_physical_inputs.insert(physical_input);
1004  }
1005  return synthesized_physical_inputs;
1006  }
1007  }
1008  }
1009  return {rex_input};
1010  }
1011 
1012  protected:
1013  std::unordered_set<const RexInput*> aggregateResult(
1014  const std::unordered_set<const RexInput*>& aggregate,
1015  const std::unordered_set<const RexInput*>& next_result) const override {
1016  auto result = aggregate;
1017  result.insert(next_result.begin(), next_result.end());
1018  return result;
1019  }
1020 
1021  private:
1022  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
1024 };
1025 
1026 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
1027  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
1028  return table_func;
1029  }
1030  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
1031  CHECK_EQ(size_t(2), join->inputCount());
1032  return join;
1033  }
1034  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1035  CHECK_EQ(size_t(1), ra_node->inputCount());
1036  }
1037  auto only_src = ra_node->getInput(0);
1038  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
1039  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
1040  return is_join ? only_src : ra_node;
1041 }
1042 
1043 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1045  RexUsedInputsVisitor visitor(cat);
1046  const auto filter_expr = compound->getFilterExpr();
1047  std::unordered_set<const RexInput*> used_inputs =
1048  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
1049  const auto sources_size = compound->getScalarSourcesSize();
1050  for (size_t i = 0; i < sources_size; ++i) {
1051  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
1052  used_inputs.insert(source_inputs.begin(), source_inputs.end());
1053  }
1054  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1055  return std::make_pair(used_inputs, used_inputs_owned);
1056 }
1057 
1058 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1060  CHECK_EQ(size_t(1), aggregate->inputCount());
1061  std::unordered_set<const RexInput*> used_inputs;
1062  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1063  const auto source = aggregate->getInput(0);
1064  const auto& in_metainfo = source->getOutputMetainfo();
1065  const auto group_count = aggregate->getGroupByCount();
1066  CHECK_GE(in_metainfo.size(), group_count);
1067  for (size_t i = 0; i < group_count; ++i) {
1068  auto synthesized_used_input = new RexInput(source, i);
1069  used_inputs_owned.emplace_back(synthesized_used_input);
1070  used_inputs.insert(synthesized_used_input);
1071  }
1072  for (const auto& agg_expr : aggregate->getAggExprs()) {
1073  for (size_t i = 0; i < agg_expr->size(); ++i) {
1074  const auto operand_idx = agg_expr->getOperand(i);
1075  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
1076  auto synthesized_used_input = new RexInput(source, operand_idx);
1077  used_inputs_owned.emplace_back(synthesized_used_input);
1078  used_inputs.insert(synthesized_used_input);
1079  }
1080  }
1081  return std::make_pair(used_inputs, used_inputs_owned);
1082 }
1083 
1084 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1086  RexUsedInputsVisitor visitor(cat);
1087  std::unordered_set<const RexInput*> used_inputs;
1088  for (size_t i = 0; i < project->size(); ++i) {
1089  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
1090  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
1091  }
1092  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1093  return std::make_pair(used_inputs, used_inputs_owned);
1094 }
1095 
1096 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1099  RexUsedInputsVisitor visitor(cat);
1100  std::unordered_set<const RexInput*> used_inputs;
1101  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
1102  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
1103  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
1104  }
1105  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1106  return std::make_pair(used_inputs, used_inputs_owned);
1107 }
1108 
1109 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1111  std::unordered_set<const RexInput*> used_inputs;
1112  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1113  const auto data_sink_node = get_data_sink(filter);
1114  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
1115  const auto source = data_sink_node->getInput(nest_level);
1116  const auto scan_source = dynamic_cast<const RelScan*>(source);
1117  if (scan_source) {
1118  CHECK(source->getOutputMetainfo().empty());
1119  for (size_t i = 0; i < scan_source->size(); ++i) {
1120  auto synthesized_used_input = new RexInput(scan_source, i);
1121  used_inputs_owned.emplace_back(synthesized_used_input);
1122  used_inputs.insert(synthesized_used_input);
1123  }
1124  } else {
1125  const auto& partial_in_metadata = source->getOutputMetainfo();
1126  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1127  auto synthesized_used_input = new RexInput(source, i);
1128  used_inputs_owned.emplace_back(synthesized_used_input);
1129  used_inputs.insert(synthesized_used_input);
1130  }
1131  }
1132  }
1133  return std::make_pair(used_inputs, used_inputs_owned);
1134 }
1135 
1136 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1138  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1139  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1140  used_inputs_owned.reserve(logical_union->inputCount());
1141  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1142  auto const n_inputs = logical_union->inputCount();
1143  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1144  auto input = logical_union->getInput(nest_level);
1145  for (size_t i = 0; i < input->size(); ++i) {
1146  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1147  used_inputs.insert(used_inputs_owned.back().get());
1148  }
1149  }
1150  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1151 }
1152 
1153 int table_id_from_ra(const RelAlgNode* ra_node) {
1154  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1155  if (scan_ra) {
1156  const auto td = scan_ra->getTableDescriptor();
1157  CHECK(td);
1158  return td->tableId;
1159  }
1160  return -ra_node->getId();
1161 }
1162 
1163 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1164  const RelAlgNode* ra_node,
1165  const std::vector<size_t>& input_permutation) {
1166  const auto data_sink_node = get_data_sink(ra_node);
1167  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1168  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1169  const auto input_node_idx =
1170  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1171  const auto input_ra = data_sink_node->getInput(input_node_idx);
1172  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1173  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1174  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1175  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1176  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1177  CHECK(it_ok.second);
1178  LOG_IF(INFO, !input_permutation.empty())
1179  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1180  }
1181  return input_to_nest_level;
1182 }
1183 
1184 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1187  const auto data_sink_node = get_data_sink(ra_node);
1188  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1189  CHECK_EQ(join->inputCount(), 2u);
1190  const auto condition = join->getCondition();
1191  RexUsedInputsVisitor visitor(cat);
1192  auto condition_inputs = visitor.visit(condition);
1193  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1194  visitor.get_inputs_owned());
1195  return std::make_pair(condition_inputs, condition_inputs_owned);
1196  }
1197 
1198  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1199  CHECK_GE(left_deep_join->inputCount(), 2u);
1200  const auto condition = left_deep_join->getInnerCondition();
1201  RexUsedInputsVisitor visitor(cat);
1202  auto result = visitor.visit(condition);
1203  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1204  ++nesting_level) {
1205  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1206  if (outer_condition) {
1207  const auto outer_result = visitor.visit(outer_condition);
1208  result.insert(outer_result.begin(), outer_result.end());
1209  }
1210  }
1211  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1212  return std::make_pair(result, used_inputs_owned);
1213  }
1214 
1215  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1216  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1217  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1218  // no-op
1219  CHECK_GE(ra_node->inputCount(), 0u) << ra_node->toString();
1220  } else {
1221  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1222  }
1223  return std::make_pair(std::unordered_set<const RexInput*>{},
1224  std::vector<std::shared_ptr<RexInput>>{});
1225 }
1226 
1228  std::vector<InputDescriptor>& input_descs,
1230  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1231  const RelAlgNode* ra_node,
1232  const std::unordered_set<const RexInput*>& source_used_inputs,
1233  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1234  VLOG(3) << "ra_node=" << ra_node->toString()
1235  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1236  << " source_used_inputs.size()=" << source_used_inputs.size();
1237  for (const auto used_input : source_used_inputs) {
1238  const auto input_ra = used_input->getSourceNode();
1239  const int table_id = table_id_from_ra(input_ra);
1240  const auto col_id = used_input->getIndex();
1241  auto it = input_to_nest_level.find(input_ra);
1242  if (it != input_to_nest_level.end()) {
1243  const int input_desc = it->second;
1244  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1245  dynamic_cast<const RelScan*>(input_ra)
1246  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1247  : col_id,
1248  table_id,
1249  input_desc));
1250  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1251  throw std::runtime_error("Bushy joins not supported");
1252  }
1253  }
1254 }
1255 
1256 template <class RA>
1257 std::pair<std::vector<InputDescriptor>,
1258  std::list<std::shared_ptr<const InputColDescriptor>>>
1259 get_input_desc_impl(const RA* ra_node,
1260  const std::unordered_set<const RexInput*>& used_inputs,
1261  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1262  const std::vector<size_t>& input_permutation,
1264  std::vector<InputDescriptor> input_descs;
1265  const auto data_sink_node = get_data_sink(ra_node);
1266  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1267  const auto input_node_idx =
1268  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1269  auto input_ra = data_sink_node->getInput(input_node_idx);
1270  const int table_id = table_id_from_ra(input_ra);
1271  input_descs.emplace_back(table_id, input_idx);
1272  }
1273  std::sort(input_descs.begin(),
1274  input_descs.end(),
1275  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1276  return lhs.getNestLevel() < rhs.getNestLevel();
1277  });
1278  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1279  collect_used_input_desc(input_descs,
1280  cat,
1281  input_col_descs_unique, // modified
1282  ra_node,
1283  used_inputs,
1284  input_to_nest_level);
1285  std::unordered_set<const RexInput*> join_source_used_inputs;
1286  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1287  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1288  get_join_source_used_inputs(ra_node, cat);
1289  collect_used_input_desc(input_descs,
1290  cat,
1291  input_col_descs_unique, // modified
1292  ra_node,
1293  join_source_used_inputs,
1294  input_to_nest_level);
1295  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1296  input_col_descs_unique.begin(), input_col_descs_unique.end());
1297 
1298  std::sort(input_col_descs.begin(),
1299  input_col_descs.end(),
1300  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1301  std::shared_ptr<const InputColDescriptor> const& rhs) {
1302  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1303  lhs->getColId(),
1304  lhs->getScanDesc().getTableId()) <
1305  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1306  rhs->getColId(),
1307  rhs->getScanDesc().getTableId());
1308  });
1309  return {input_descs,
1310  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1311  input_col_descs.end())};
1312 }
1313 
1314 template <class RA>
1315 std::tuple<std::vector<InputDescriptor>,
1316  std::list<std::shared_ptr<const InputColDescriptor>>,
1317  std::vector<std::shared_ptr<RexInput>>>
1318 get_input_desc(const RA* ra_node,
1319  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1320  const std::vector<size_t>& input_permutation,
1321  const Catalog_Namespace::Catalog& cat) {
1322  std::unordered_set<const RexInput*> used_inputs;
1323  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1324  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1325  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1326  auto input_desc_pair = get_input_desc_impl(
1327  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1328  return std::make_tuple(
1329  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1330 }
1331 
1332 size_t get_scalar_sources_size(const RelCompound* compound) {
1333  return compound->getScalarSourcesSize();
1334 }
1335 
1336 size_t get_scalar_sources_size(const RelProject* project) {
1337  return project->size();
1338 }
1339 
1340 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1341  return table_func->getTableFuncInputsSize();
1342 }
1343 
1344 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1345  return compound->getScalarSource(i);
1346 }
1347 
1348 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1349  return project->getProjectAt(i);
1350 }
1351 
1352 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1353  return table_func->getTableFuncInputAt(i);
1354 }
1355 
1356 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1357  const std::shared_ptr<Analyzer::Expr> expr) {
1358  const auto& ti = expr->get_type_info();
1359  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1360  return expr;
1361  }
1362  auto transient_dict_ti = ti;
1363  transient_dict_ti.set_compression(kENCODING_DICT);
1364  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1365  transient_dict_ti.set_fixed_size();
1366  return expr->add_cast(transient_dict_ti);
1367 }
1368 
1370  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1371  const std::shared_ptr<Analyzer::Expr>& expr) {
1372  try {
1373  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1374  } catch (...) {
1375  scalar_sources.push_back(fold_expr(expr.get()));
1376  }
1377 }
1378 
1379 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1380  const std::shared_ptr<Analyzer::Expr>& input) {
1381  const auto& input_ti = input->get_type_info();
1382  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1383  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1384  }
1385  return input;
1386 }
1387 
1388 template <class RA>
1389 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1390  const RA* ra_node,
1391  const RelAlgTranslator& translator,
1392  const ::ExecutorType executor_type) {
1393  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1394  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1395  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1396  << ") = " << scalar_sources_size;
1397  for (size_t i = 0; i < scalar_sources_size; ++i) {
1398  const auto scalar_rex = scalar_at(i, ra_node);
1399  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1400  // RexRef are synthetic scalars we append at the end of the real ones
1401  // for the sake of taking memory ownership, no real work needed here.
1402  continue;
1403  }
1404 
1405  const auto scalar_expr =
1406  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1407  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1408  if (executor_type == ExecutorType::Native) {
1409  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1410  } else if (executor_type == ExecutorType::TableFunctions) {
1411  scalar_sources.push_back(fold_expr(rewritten_expr.get()));
1412  } else {
1413  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1414  }
1415  }
1416 
1417  return scalar_sources;
1418 }
1419 
1420 template <class RA>
1421 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1422  const RA* ra_node,
1423  const RelAlgTranslator& translator,
1424  int32_t tableId,
1425  const Catalog_Namespace::Catalog& cat,
1426  const ColumnNameList& colNames,
1427  size_t starting_projection_column_idx) {
1428  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1429  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1430  const auto scalar_rex = scalar_at(i, ra_node);
1431  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1432  // RexRef are synthetic scalars we append at the end of the real ones
1433  // for the sake of taking memory ownership, no real work needed here.
1434  continue;
1435  }
1436 
1437  std::shared_ptr<Analyzer::Expr> translated_expr;
1438  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1439  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1440  tableId,
1441  cat,
1442  colNames[i - starting_projection_column_idx]);
1443  } else {
1444  translated_expr = translator.translateScalarRex(scalar_rex);
1445  }
1446  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1447  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1448  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1449  }
1450 
1451  return scalar_sources;
1452 }
1453 
1454 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1455  const RelCompound* compound,
1456  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1457  if (!compound->isAggregate()) {
1458  return {nullptr};
1459  }
1460  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1461  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1462  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1463  }
1464  return groupby_exprs;
1465 }
1466 
1467 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1468  const RelAggregate* aggregate,
1469  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1470  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1471  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1472  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1473  }
1474  return groupby_exprs;
1475 }
1476 
1478  const RelAlgTranslator& translator) {
1479  const auto filter_rex = compound->getFilterExpr();
1480  const auto filter_expr =
1481  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1482  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1484 }
1485 
1486 std::vector<Analyzer::Expr*> translate_targets(
1487  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1488  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1489  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1490  const RelCompound* compound,
1491  const RelAlgTranslator& translator,
1492  const ExecutorType executor_type) {
1493  std::vector<Analyzer::Expr*> target_exprs;
1494  for (size_t i = 0; i < compound->size(); ++i) {
1495  const auto target_rex = compound->getTargetExpr(i);
1496  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1497  std::shared_ptr<Analyzer::Expr> target_expr;
1498  if (target_rex_agg) {
1499  target_expr =
1500  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1501  } else {
1502  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1503  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1504  if (target_rex_ref) {
1505  const auto ref_idx = target_rex_ref->getIndex();
1506  CHECK_GE(ref_idx, size_t(1));
1507  CHECK_LE(ref_idx, groupby_exprs.size());
1508  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1509  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1510  } else {
1511  target_expr = translator.translateScalarRex(target_rex_scalar);
1512  auto rewritten_expr = rewrite_expr(target_expr.get());
1513  target_expr = fold_expr(rewritten_expr.get());
1514  if (executor_type == ExecutorType::Native) {
1515  try {
1516  target_expr = set_transient_dict(target_expr);
1517  } catch (...) {
1518  // noop
1519  }
1520  } else {
1521  target_expr = cast_dict_to_none(target_expr);
1522  }
1523  }
1524  }
1525  CHECK(target_expr);
1526  target_exprs_owned.push_back(target_expr);
1527  target_exprs.push_back(target_expr.get());
1528  }
1529  return target_exprs;
1530 }
1531 
1532 std::vector<Analyzer::Expr*> translate_targets(
1533  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1534  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1535  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1536  const RelAggregate* aggregate,
1537  const RelAlgTranslator& translator) {
1538  std::vector<Analyzer::Expr*> target_exprs;
1539  size_t group_key_idx = 1;
1540  for (const auto& groupby_expr : groupby_exprs) {
1541  auto target_expr =
1542  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1543  target_exprs_owned.push_back(target_expr);
1544  target_exprs.push_back(target_expr.get());
1545  }
1546 
1547  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1548  auto target_expr =
1549  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1550  CHECK(target_expr);
1551  target_expr = fold_expr(target_expr.get());
1552  target_exprs_owned.push_back(target_expr);
1553  target_exprs.push_back(target_expr.get());
1554  }
1555  return target_exprs;
1556 }
1557 
1559  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1560  return agg_expr && agg_expr->get_is_distinct();
1561 }
1562 
1563 bool is_agg(const Analyzer::Expr* expr) {
1564  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1565  if (agg_expr && agg_expr->get_contains_agg()) {
1566  auto agg_type = agg_expr->get_aggtype();
1567  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1568  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1569  return true;
1570  }
1571  }
1572  return false;
1573 }
1574 
1576  if (is_count_distinct(&expr)) {
1577  return SQLTypeInfo(kBIGINT, false);
1578  } else if (is_agg(&expr)) {
1580  }
1581  return get_logical_type_info(expr.get_type_info());
1582 }
1583 
1584 template <class RA>
1585 std::vector<TargetMetaInfo> get_targets_meta(
1586  const RA* ra_node,
1587  const std::vector<Analyzer::Expr*>& target_exprs) {
1588  std::vector<TargetMetaInfo> targets_meta;
1589  CHECK_EQ(ra_node->size(), target_exprs.size());
1590  for (size_t i = 0; i < ra_node->size(); ++i) {
1591  CHECK(target_exprs[i]);
1592  // TODO(alex): remove the count distinct type fixup.
1593  targets_meta.emplace_back(ra_node->getFieldName(i),
1594  get_logical_type_for_expr(*target_exprs[i]),
1595  target_exprs[i]->get_type_info());
1596  }
1597  return targets_meta;
1598 }
1599 
1600 template <>
1601 std::vector<TargetMetaInfo> get_targets_meta(
1602  const RelFilter* filter,
1603  const std::vector<Analyzer::Expr*>& target_exprs) {
1604  RelAlgNode const* input0 = filter->getInput(0);
1605  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1606  return get_targets_meta(input, target_exprs);
1607  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1608  return get_targets_meta(input, target_exprs);
1609  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1610  return get_targets_meta(input, target_exprs);
1611  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1612  return get_targets_meta(input, target_exprs);
1613  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1614  return get_targets_meta(input, target_exprs);
1615  }
1616  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1617  return {};
1618 }
1619 
1620 } // namespace
1621 
1623  const CompilationOptions& co_in,
1624  const ExecutionOptions& eo_in,
1625  const int64_t queue_time_ms) {
1626  CHECK(node);
1627  auto timer = DEBUG_TIMER(__func__);
1628 
1630 
1631  auto co = co_in;
1632  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1633  // encoded string updates
1634 
1635  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1636  auto& work_unit,
1637  const bool is_aggregate) {
1638  auto table_descriptor = node->getModifiedTableDescriptor();
1639  CHECK(table_descriptor);
1640  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1641  throw std::runtime_error(
1642  "UPDATE queries involving variable length columns are only supported on tables "
1643  "with the vacuum attribute set to 'delayed'");
1644  }
1645  auto updated_table_desc = node->getModifiedTableDescriptor();
1647  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1648  node->getTargetColumns(),
1649  node->getOutputMetainfo(),
1650  node->isVarlenUpdateRequired());
1651 
1652  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1653 
1654  auto execute_update_ra_exe_unit =
1655  [this, &co, &eo_in, &table_infos, &updated_table_desc](
1656  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1658 
1659  auto eo = eo_in;
1660  if (dml_transaction_parameters_->tableIsTemporary()) {
1661  eo.output_columnar_hint = true;
1662  co_project.allow_lazy_fetch = false;
1663  co_project.filter_on_deleted_column =
1664  false; // project the entire delete column for columnar update
1665  }
1666 
1667  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
1669  CHECK(update_transaction_parameters);
1670  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1671  try {
1672  auto table_update_metadata =
1673  executor_->executeUpdate(ra_exe_unit,
1674  table_infos,
1675  updated_table_desc,
1676  co_project,
1677  eo,
1678  cat_,
1679  executor_->row_set_mem_owner_,
1680  update_callback,
1681  is_aggregate);
1682  post_execution_callback_ = [table_update_metadata, this]() {
1683  dml_transaction_parameters_->finalizeTransaction(cat_);
1684  TableOptimizer table_optimizer{
1685  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1686  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1687  };
1688  } catch (const QueryExecutionError& e) {
1689  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1690  }
1691  };
1692 
1693  if (dml_transaction_parameters_->tableIsTemporary()) {
1694  // hold owned target exprs during execution if rewriting
1695  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1696  // rewrite temp table updates to generate the full column by moving the where
1697  // clause into a case if such a rewrite is not possible, bail on the update
1698  // operation build an expr for the update target
1699  auto update_transaction_params =
1701  CHECK(update_transaction_params);
1702  const auto td = update_transaction_params->getTableDescriptor();
1703  CHECK(td);
1704  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1705  if (update_column_names.size() > 1) {
1706  throw std::runtime_error(
1707  "Multi-column update is not yet supported for temporary tables.");
1708  }
1709 
1710  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1711  CHECK(cd);
1712  auto projected_column_to_update =
1713  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1714  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1715  work_unit.exe_unit, projected_column_to_update);
1716  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1717  throw std::runtime_error(
1718  "Variable length updates not yet supported on temporary tables.");
1719  }
1720  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1721  } else {
1722  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1723  }
1724  };
1725 
1726  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1727  auto work_unit =
1728  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1729 
1730  execute_update_for_node(compound, work_unit, compound->isAggregate());
1731  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1732  auto work_unit =
1733  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1734 
1735  if (project->isSimple()) {
1736  CHECK_EQ(size_t(1), project->inputCount());
1737  const auto input_ra = project->getInput(0);
1738  if (dynamic_cast<const RelSort*>(input_ra)) {
1739  const auto& input_table =
1740  get_temporary_table(&temporary_tables_, -input_ra->getId());
1741  CHECK(input_table);
1742  work_unit.exe_unit.scan_limit = input_table->rowCount();
1743  }
1744  }
1745 
1746  execute_update_for_node(project, work_unit, false);
1747  } else {
1748  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1749  }
1750 }
1751 
1753  const CompilationOptions& co,
1754  const ExecutionOptions& eo_in,
1755  const int64_t queue_time_ms) {
1756  CHECK(node);
1757  auto timer = DEBUG_TIMER(__func__);
1758 
1760 
1761  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1762  auto& work_unit,
1763  const bool is_aggregate) {
1764  auto* table_descriptor = node->getModifiedTableDescriptor();
1765  CHECK(table_descriptor);
1766  if (!table_descriptor->hasDeletedCol) {
1767  throw std::runtime_error(
1768  "DELETE queries are only supported on tables with the vacuum attribute set to "
1769  "'delayed'");
1770  }
1771 
1772  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1773 
1774  auto execute_delete_ra_exe_unit =
1775  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1776  const bool is_aggregate) {
1778  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1779  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1781  CHECK(delete_params);
1782  auto delete_callback = yieldDeleteCallback(*delete_params);
1784 
1785  auto eo = eo_in;
1786  if (dml_transaction_parameters_->tableIsTemporary()) {
1787  eo.output_columnar_hint = true;
1788  co_delete.filter_on_deleted_column =
1789  false; // project the entire delete column for columnar update
1790  } else {
1791  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1792  }
1793 
1794  try {
1795  auto table_update_metadata =
1796  executor_->executeUpdate(exe_unit,
1797  table_infos,
1798  table_descriptor,
1799  co_delete,
1800  eo,
1801  cat_,
1802  executor_->row_set_mem_owner_,
1803  delete_callback,
1804  is_aggregate);
1805  post_execution_callback_ = [table_update_metadata, this]() {
1806  dml_transaction_parameters_->finalizeTransaction(cat_);
1807  TableOptimizer table_optimizer{
1808  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1809  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1810  };
1811  } catch (const QueryExecutionError& e) {
1812  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1813  }
1814  };
1815 
1816  if (table_is_temporary(table_descriptor)) {
1817  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1818  auto cd = cat_.getDeletedColumn(table_descriptor);
1819  CHECK(cd);
1820  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1821  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1822  const auto rewritten_exe_unit =
1823  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1824  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1825  } else {
1826  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1827  }
1828  };
1829 
1830  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1831  const auto work_unit =
1832  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1833  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1834  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1835  auto work_unit =
1836  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1837  if (project->isSimple()) {
1838  CHECK_EQ(size_t(1), project->inputCount());
1839  const auto input_ra = project->getInput(0);
1840  if (dynamic_cast<const RelSort*>(input_ra)) {
1841  const auto& input_table =
1842  get_temporary_table(&temporary_tables_, -input_ra->getId());
1843  CHECK(input_table);
1844  work_unit.exe_unit.scan_limit = input_table->rowCount();
1845  }
1846  }
1847  execute_delete_for_node(project, work_unit, false);
1848  } else {
1849  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1850  }
1851 }
1852 
1854  const CompilationOptions& co,
1855  const ExecutionOptions& eo,
1856  RenderInfo* render_info,
1857  const int64_t queue_time_ms) {
1858  auto timer = DEBUG_TIMER(__func__);
1859  const auto work_unit =
1860  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1861  CompilationOptions co_compound = co;
1862  return executeWorkUnit(work_unit,
1863  compound->getOutputMetainfo(),
1864  compound->isAggregate(),
1865  co_compound,
1866  eo,
1867  render_info,
1868  queue_time_ms);
1869 }
1870 
1872  const CompilationOptions& co,
1873  const ExecutionOptions& eo,
1874  RenderInfo* render_info,
1875  const int64_t queue_time_ms) {
1876  auto timer = DEBUG_TIMER(__func__);
1877  const auto work_unit = createAggregateWorkUnit(
1878  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1879  return executeWorkUnit(work_unit,
1880  aggregate->getOutputMetainfo(),
1881  true,
1882  co,
1883  eo,
1884  render_info,
1885  queue_time_ms);
1886 }
1887 
1888 namespace {
1889 
1890 // Returns true iff the execution unit contains window functions.
1892  return std::any_of(ra_exe_unit.target_exprs.begin(),
1893  ra_exe_unit.target_exprs.end(),
1894  [](const Analyzer::Expr* expr) {
1895  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1896  });
1897 }
1898 
1899 } // namespace
1900 
1902  const RelProject* project,
1903  const CompilationOptions& co,
1904  const ExecutionOptions& eo,
1905  RenderInfo* render_info,
1906  const int64_t queue_time_ms,
1907  const std::optional<size_t> previous_count) {
1908  auto timer = DEBUG_TIMER(__func__);
1909  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1910  CompilationOptions co_project = co;
1911  if (project->isSimple()) {
1912  CHECK_EQ(size_t(1), project->inputCount());
1913  const auto input_ra = project->getInput(0);
1914  if (dynamic_cast<const RelSort*>(input_ra)) {
1915  co_project.device_type = ExecutorDeviceType::CPU;
1916  const auto& input_table =
1917  get_temporary_table(&temporary_tables_, -input_ra->getId());
1918  CHECK(input_table);
1919  work_unit.exe_unit.scan_limit =
1920  std::min(input_table->getLimit(), input_table->rowCount());
1921  }
1922  }
1923  return executeWorkUnit(work_unit,
1924  project->getOutputMetainfo(),
1925  false,
1926  co_project,
1927  eo,
1928  render_info,
1929  queue_time_ms,
1930  previous_count);
1931 }
1932 
1934  const CompilationOptions& co_in,
1935  const ExecutionOptions& eo,
1936  const int64_t queue_time_ms) {
1938  auto timer = DEBUG_TIMER(__func__);
1939 
1940  auto co = co_in;
1941 
1942  if (g_cluster) {
1943  throw std::runtime_error("Table functions not supported in distributed mode yet");
1944  }
1945  if (!g_enable_table_functions) {
1946  throw std::runtime_error("Table function support is disabled");
1947  }
1948  auto table_func_work_unit = createTableFunctionWorkUnit(
1949  table_func,
1950  eo.just_explain,
1951  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1952  const auto body = table_func_work_unit.body;
1953  CHECK(body);
1954 
1955  const auto table_infos =
1956  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1957 
1958  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1959  co.device_type,
1961  nullptr,
1962  executor_->getCatalog(),
1963  executor_->blockSize(),
1964  executor_->gridSize()),
1965  {}};
1966 
1967  try {
1968  result = {executor_->executeTableFunction(
1969  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1970  body->getOutputMetainfo()};
1971  } catch (const QueryExecutionError& e) {
1974  throw std::runtime_error("Table function ran out of memory during execution");
1975  }
1976  result.setQueueTime(queue_time_ms);
1977  return result;
1978 }
1979 
1980 namespace {
1981 
1982 // Creates a new expression which has the range table index set to 1. This is needed to
1983 // reuse the hash join construction helpers to generate a hash table for the window
1984 // function partition: create an equals expression with left and right sides identical
1985 // except for the range table index.
1986 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1987  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1988  if (tuple) {
1989  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1990  for (const auto& element : tuple->getTuple()) {
1991  transformed_tuple.push_back(transform_to_inner(element.get()));
1992  }
1993  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1994  }
1995  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1996  if (!col) {
1997  throw std::runtime_error("Only columns supported in the window partition for now");
1998  }
1999  return makeExpr<Analyzer::ColumnVar>(
2000  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
2001 }
2002 
2003 } // namespace
2004 
2006  const CompilationOptions& co,
2007  const ExecutionOptions& eo,
2008  ColumnCacheMap& column_cache_map,
2009  const int64_t queue_time_ms) {
2010  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
2011  CHECK_EQ(query_infos.size(), size_t(1));
2012  if (query_infos.front().info.fragments.size() != 1) {
2013  throw std::runtime_error(
2014  "Only single fragment tables supported for window functions for now");
2015  }
2016  if (eo.executor_type == ::ExecutorType::Extern) {
2017  return;
2018  }
2019  query_infos.push_back(query_infos.front());
2020  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2021  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
2022  ++target_index) {
2023  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
2024  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2025  if (!window_func) {
2026  continue;
2027  }
2028  // Always use baseline layout hash tables for now, make the expression a tuple.
2029  const auto& partition_keys = window_func->getPartitionKeys();
2030  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2031  if (partition_keys.size() >= 1) {
2032  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2033  if (partition_keys.size() > 1) {
2034  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2035  } else {
2036  CHECK_EQ(partition_keys.size(), size_t(1));
2037  partition_key_tuple = partition_keys.front();
2038  }
2039  // Creates a tautology equality with the partition expression on both sides.
2040  partition_key_cond =
2041  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2042  kBW_EQ,
2043  kONE,
2044  partition_key_tuple,
2045  transform_to_inner(partition_key_tuple.get()));
2046  }
2047  auto context =
2048  createWindowFunctionContext(window_func,
2049  partition_key_cond /*nullptr if no partition key*/,
2050  ra_exe_unit,
2051  query_infos,
2052  co,
2053  column_cache_map,
2054  executor_->getRowSetMemoryOwner());
2055  context->compute();
2056  window_project_node_context->addWindowFunctionContext(std::move(context),
2057  target_index);
2058  }
2059 }
2060 
2061 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
2062  const Analyzer::WindowFunction* window_func,
2063  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
2064  const RelAlgExecutionUnit& ra_exe_unit,
2065  const std::vector<InputTableInfo>& query_infos,
2066  const CompilationOptions& co,
2067  ColumnCacheMap& column_cache_map,
2068  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2069  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2070  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2073  std::unique_ptr<WindowFunctionContext> context;
2074  if (partition_key_cond) {
2075  const auto join_table_or_err =
2076  executor_->buildHashTableForQualifier(partition_key_cond,
2077  query_infos,
2078  memory_level,
2079  JoinType::INVALID, // for window function
2081  column_cache_map,
2082  ra_exe_unit.hash_table_build_plan_dag,
2083  ra_exe_unit.query_hint,
2084  ra_exe_unit.table_id_to_node_map);
2085  if (!join_table_or_err.fail_reason.empty()) {
2086  throw std::runtime_error(join_table_or_err.fail_reason);
2087  }
2088  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2089  context = std::make_unique<WindowFunctionContext>(window_func,
2090  join_table_or_err.hash_table,
2091  elem_count,
2092  co.device_type,
2093  row_set_mem_owner);
2094  } else {
2095  context = std::make_unique<WindowFunctionContext>(
2096  window_func, elem_count, co.device_type, row_set_mem_owner);
2097  }
2098  const auto& order_keys = window_func->getOrderKeys();
2099  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2100  for (const auto& order_key : order_keys) {
2101  const auto order_col =
2102  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2103  if (!order_col) {
2104  throw std::runtime_error("Only order by columns supported for now");
2105  }
2106  const int8_t* column;
2107  size_t join_col_elem_count;
2108  std::tie(column, join_col_elem_count) =
2110  *order_col,
2111  query_infos.front().info.fragments.front(),
2112  memory_level,
2113  0,
2114  nullptr,
2115  /*thread_idx=*/0,
2116  chunks_owner,
2117  column_cache_map);
2118 
2119  CHECK_EQ(join_col_elem_count, elem_count);
2120  context->addOrderColumn(column, order_col.get(), chunks_owner);
2121  }
2122  return context;
2123 }
2124 
2126  const CompilationOptions& co,
2127  const ExecutionOptions& eo,
2128  RenderInfo* render_info,
2129  const int64_t queue_time_ms) {
2130  auto timer = DEBUG_TIMER(__func__);
2131  const auto work_unit =
2132  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2133  return executeWorkUnit(
2134  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2135 }
2136 
2137 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
2138  std::vector<TargetMetaInfo> const& rhs) {
2139  if (lhs.size() == rhs.size()) {
2140  for (size_t i = 0; i < lhs.size(); ++i) {
2141  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
2142  return false;
2143  }
2144  }
2145  return true;
2146  }
2147  return false;
2148 }
2149 
2150 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2151  return target_meta_info.get_type_info().is_geometry();
2152 }
2153 
2155  const RaExecutionSequence& seq,
2156  const CompilationOptions& co,
2157  const ExecutionOptions& eo,
2158  RenderInfo* render_info,
2159  const int64_t queue_time_ms) {
2160  auto timer = DEBUG_TIMER(__func__);
2161  if (!logical_union->isAll()) {
2162  throw std::runtime_error("UNION without ALL is not supported yet.");
2163  }
2164  // Will throw a std::runtime_error if types don't match.
2165  logical_union->checkForMatchingMetaInfoTypes();
2166  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2167  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2168  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2169  }
2170  // Only Projections and Aggregates from a UNION are supported for now.
2171  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
2172  if (node->hasInput(logical_union) &&
2173  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
2174  node)) {
2175  throw std::runtime_error("UNION ALL not yet supported in this context.");
2176  }
2177  });
2178 
2179  auto work_unit =
2180  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2181  return executeWorkUnit(work_unit,
2182  logical_union->getOutputMetainfo(),
2183  false,
2185  eo,
2186  render_info,
2187  queue_time_ms);
2188 }
2189 
2191  const RelLogicalValues* logical_values,
2192  const ExecutionOptions& eo) {
2193  auto timer = DEBUG_TIMER(__func__);
2195  logical_values->getNumRows(),
2197  /*is_table_function=*/false);
2198 
2199  auto tuple_type = logical_values->getTupleType();
2200  for (size_t i = 0; i < tuple_type.size(); ++i) {
2201  auto& target_meta_info = tuple_type[i];
2202  if (target_meta_info.get_type_info().is_varlen()) {
2203  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2204  }
2205  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2206  // replace w/ bigint
2207  tuple_type[i] =
2208  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2209  }
2210  query_mem_desc.addColSlotInfo(
2211  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2212  }
2213  logical_values->setOutputMetainfo(tuple_type);
2214 
2215  std::vector<TargetInfo> target_infos;
2216  for (const auto& tuple_type_component : tuple_type) {
2217  target_infos.emplace_back(TargetInfo{false,
2218  kCOUNT,
2219  tuple_type_component.get_type_info(),
2220  SQLTypeInfo(kNULLT, false),
2221  false,
2222  false,
2223  /*is_varlen_projection=*/false});
2224  }
2225 
2226  std::shared_ptr<ResultSet> rs{
2227  ResultSetLogicalValuesBuilder{logical_values,
2228  target_infos,
2231  executor_->getRowSetMemoryOwner(),
2232  executor_}
2233  .build()};
2234 
2235  return {rs, tuple_type};
2236 }
2237 
2238 namespace {
2239 
2240 template <class T>
2241 int64_t insert_one_dict_str(T* col_data,
2242  const std::string& columnName,
2243  const SQLTypeInfo& columnType,
2244  const Analyzer::Constant* col_cv,
2245  const Catalog_Namespace::Catalog& catalog) {
2246  if (col_cv->get_is_null()) {
2247  *col_data = inline_fixed_encoding_null_val(columnType);
2248  } else {
2249  const int dict_id = columnType.get_comp_param();
2250  const auto col_datum = col_cv->get_constval();
2251  const auto& str = *col_datum.stringval;
2252  const auto dd = catalog.getMetadataForDict(dict_id);
2253  CHECK(dd && dd->stringDict);
2254  int32_t str_id = dd->stringDict->getOrAdd(str);
2255  if (!dd->dictIsTemp) {
2256  const auto checkpoint_ok = dd->stringDict->checkpoint();
2257  if (!checkpoint_ok) {
2258  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2259  columnName);
2260  }
2261  }
2262  const bool invalid = str_id > max_valid_int_value<T>();
2263  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2264  if (invalid) {
2265  LOG(ERROR) << "Could not encode string: " << str
2266  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2267  << " bits. Will store NULL instead.";
2268  }
2269  str_id = inline_fixed_encoding_null_val(columnType);
2270  }
2271  *col_data = str_id;
2272  }
2273  return *col_data;
2274 }
2275 
2276 template <class T>
2277 int64_t insert_one_dict_str(T* col_data,
2278  const ColumnDescriptor* cd,
2279  const Analyzer::Constant* col_cv,
2280  const Catalog_Namespace::Catalog& catalog) {
2281  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2282 }
2283 
2284 } // namespace
2285 
2287  const ExecutionOptions& eo) {
2288  auto timer = DEBUG_TIMER(__func__);
2289  if (eo.just_explain) {
2290  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2291  }
2292 
2293  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2296  executor_->getRowSetMemoryOwner(),
2297  executor_->getCatalog(),
2298  executor_->blockSize(),
2299  executor_->gridSize());
2300 
2301  std::vector<TargetMetaInfo> empty_targets;
2302  return {rs, empty_targets};
2303 }
2304 
2305 namespace {
2306 int64_t int_value_from_numbers_ptr(const SQLTypeInfo& type_info, const int8_t* data) {
2307  size_t sz = 0;
2308  switch (type_info.get_type()) {
2309  case kTINYINT:
2310  case kSMALLINT:
2311  case kINT:
2312  case kBIGINT:
2313  case kTIMESTAMP:
2314  case kTIME:
2315  case kDATE:
2316  sz = type_info.get_logical_size();
2317  break;
2318  case kTEXT:
2319  case kVARCHAR:
2320  case kCHAR:
2321  CHECK(type_info.get_compression() == kENCODING_DICT);
2322  sz = type_info.get_size();
2323  break;
2324  default:
2325  CHECK(false) << "Unexpected sharding key datatype";
2326  }
2327 
2328  switch (sz) {
2329  case 1:
2330  return *(reinterpret_cast<const int8_t*>(data));
2331  case 2:
2332  return *(reinterpret_cast<const int16_t*>(data));
2333  case 4:
2334  return *(reinterpret_cast<const int32_t*>(data));
2335  case 8:
2336  return *(reinterpret_cast<const int64_t*>(data));
2337  default:
2338  CHECK(false);
2339  return 0;
2340  }
2341 }
2342 
2345  const Fragmenter_Namespace::InsertData& data) {
2346  auto shard_column_md = cat.getShardColumnMetadataForTable(td);
2347  CHECK(shard_column_md);
2348  auto sharded_column_id = shard_column_md->columnId;
2349  const TableDescriptor* shard{nullptr};
2350  for (size_t i = 0; i < data.columnIds.size(); ++i) {
2351  if (data.columnIds[i] == sharded_column_id) {
2352  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
2353  const auto shard_count = shard_tables.size();
2354  CHECK(data.data[i].numbersPtr);
2355  auto value = int_value_from_numbers_ptr(shard_column_md->columnType,
2356  data.data[i].numbersPtr);
2357  const size_t shard_idx = SHARD_FOR_KEY(value, shard_count);
2358  shard = shard_tables[shard_idx];
2359  break;
2360  }
2361  }
2362  return shard;
2363 }
2364 
2365 } // namespace
2366 
2368  // Note: We currently obtain an executor for this method, but we do not need it.
2369  // Therefore, we skip the executor state setup in the regular execution path. In the
2370  // future, we will likely want to use the executor to evaluate expressions in the insert
2371  // statement.
2372 
2373  const auto& targets = query.get_targetlist();
2374  const int table_id = query.get_result_table_id();
2375  const auto& col_id_list = query.get_result_col_list();
2376 
2377  std::vector<const ColumnDescriptor*> col_descriptors;
2378  std::vector<int> col_ids;
2379  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2380  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2381  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2382 
2383  for (const int col_id : col_id_list) {
2384  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2385  const auto col_enc = cd->columnType.get_compression();
2386  if (cd->columnType.is_string()) {
2387  switch (col_enc) {
2388  case kENCODING_NONE: {
2389  auto it_ok =
2390  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2391  CHECK(it_ok.second);
2392  break;
2393  }
2394  case kENCODING_DICT: {
2395  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2396  CHECK(dd);
2397  const auto it_ok = col_buffers.emplace(
2398  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2399  CHECK(it_ok.second);
2400  break;
2401  }
2402  default:
2403  CHECK(false);
2404  }
2405  } else if (cd->columnType.is_geometry()) {
2406  auto it_ok =
2407  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2408  CHECK(it_ok.second);
2409  } else if (cd->columnType.is_array()) {
2410  auto it_ok =
2411  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2412  CHECK(it_ok.second);
2413  } else {
2414  const auto it_ok = col_buffers.emplace(
2415  col_id,
2416  std::unique_ptr<uint8_t[]>(
2417  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2418  // the buffer
2419  CHECK(it_ok.second);
2420  }
2421  col_descriptors.push_back(cd);
2422  col_ids.push_back(col_id);
2423  }
2424  size_t col_idx = 0;
2426  insert_data.databaseId = cat_.getCurrentDB().dbId;
2427  insert_data.tableId = table_id;
2428  for (auto target_entry : targets) {
2429  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2430  if (!col_cv) {
2431  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2432  CHECK(col_cast);
2433  CHECK_EQ(kCAST, col_cast->get_optype());
2434  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2435  }
2436  CHECK(col_cv);
2437  const auto cd = col_descriptors[col_idx];
2438  auto col_datum = col_cv->get_constval();
2439  auto col_type = cd->columnType.get_type();
2440  uint8_t* col_data_bytes{nullptr};
2441  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2442  (!cd->columnType.is_string() ||
2443  cd->columnType.get_compression() == kENCODING_DICT)) {
2444  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2445  CHECK(col_data_bytes_it != col_buffers.end());
2446  col_data_bytes = col_data_bytes_it->second.get();
2447  }
2448  switch (col_type) {
2449  case kBOOLEAN: {
2450  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2451  auto null_bool_val =
2452  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2453  *col_data = col_cv->get_is_null() || null_bool_val
2454  ? inline_fixed_encoding_null_val(cd->columnType)
2455  : (col_datum.boolval ? 1 : 0);
2456  break;
2457  }
2458  case kTINYINT: {
2459  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2460  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2461  : col_datum.tinyintval;
2462  break;
2463  }
2464  case kSMALLINT: {
2465  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2466  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2467  : col_datum.smallintval;
2468  break;
2469  }
2470  case kINT: {
2471  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2472  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2473  : col_datum.intval;
2474  break;
2475  }
2476  case kBIGINT:
2477  case kDECIMAL:
2478  case kNUMERIC: {
2479  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2480  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2481  : col_datum.bigintval;
2482  break;
2483  }
2484  case kFLOAT: {
2485  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2486  *col_data = col_datum.floatval;
2487  break;
2488  }
2489  case kDOUBLE: {
2490  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2491  *col_data = col_datum.doubleval;
2492  break;
2493  }
2494  case kTEXT:
2495  case kVARCHAR:
2496  case kCHAR: {
2497  switch (cd->columnType.get_compression()) {
2498  case kENCODING_NONE:
2499  str_col_buffers[col_ids[col_idx]].push_back(
2500  col_datum.stringval ? *col_datum.stringval : "");
2501  break;
2502  case kENCODING_DICT: {
2503  switch (cd->columnType.get_size()) {
2504  case 1:
2506  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2507  break;
2508  case 2:
2510  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2511  break;
2512  case 4:
2514  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2515  break;
2516  default:
2517  CHECK(false);
2518  }
2519  break;
2520  }
2521  default:
2522  CHECK(false);
2523  }
2524  break;
2525  }
2526  case kTIME:
2527  case kTIMESTAMP:
2528  case kDATE: {
2529  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2530  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2531  : col_datum.bigintval;
2532  break;
2533  }
2534  case kARRAY: {
2535  const auto is_null = col_cv->get_is_null();
2536  const auto size = cd->columnType.get_size();
2537  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2538  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2539  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2540  if (is_null && !is_point_coords) {
2541  if (size > 0) {
2542  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2543  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2544  throw std::runtime_error("Column " + cd->columnName +
2545  " doesn't accept NULL values");
2546  }
2547  int8_t* buf = (int8_t*)checked_malloc(size);
2548  put_null_array(static_cast<void*>(buf), elem_ti, "");
2549  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2550  p += elem_ti.get_size()) {
2551  put_null(static_cast<void*>(p), elem_ti, "");
2552  }
2553  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2554  } else {
2555  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2556  }
2557  break;
2558  }
2559  const auto l = col_cv->get_value_list();
2560  size_t len = l.size() * elem_ti.get_size();
2561  if (size > 0 && static_cast<size_t>(size) != len) {
2562  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2563  std::to_string(size / elem_ti.get_size()) +
2564  " values, " + "received " + std::to_string(l.size()));
2565  }
2566  if (elem_ti.is_string()) {
2567  CHECK(kENCODING_DICT == elem_ti.get_compression());
2568  CHECK(4 == elem_ti.get_size());
2569 
2570  int8_t* buf = (int8_t*)checked_malloc(len);
2571  int32_t* p = reinterpret_cast<int32_t*>(buf);
2572 
2573  int elemIndex = 0;
2574  for (auto& e : l) {
2575  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2576  CHECK(c);
2577  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2578  elemIndex++;
2579  }
2580  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2581 
2582  } else {
2583  int8_t* buf = (int8_t*)checked_malloc(len);
2584  int8_t* p = buf;
2585  for (auto& e : l) {
2586  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2587  CHECK(c);
2588  p = appendDatum(p, c->get_constval(), elem_ti);
2589  }
2590  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2591  }
2592  break;
2593  }
2594  case kPOINT:
2595  case kLINESTRING:
2596  case kPOLYGON:
2597  case kMULTIPOLYGON:
2598  str_col_buffers[col_ids[col_idx]].push_back(
2599  col_datum.stringval ? *col_datum.stringval : "");
2600  break;
2601  default:
2602  CHECK(false);
2603  }
2604  ++col_idx;
2605  }
2606  for (const auto& kv : col_buffers) {
2607  insert_data.columnIds.push_back(kv.first);
2608  DataBlockPtr p;
2609  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2610  insert_data.data.push_back(p);
2611  }
2612  for (auto& kv : str_col_buffers) {
2613  insert_data.columnIds.push_back(kv.first);
2614  DataBlockPtr p;
2615  p.stringsPtr = &kv.second;
2616  insert_data.data.push_back(p);
2617  }
2618  for (auto& kv : arr_col_buffers) {
2619  insert_data.columnIds.push_back(kv.first);
2620  DataBlockPtr p;
2621  p.arraysPtr = &kv.second;
2622  insert_data.data.push_back(p);
2623  }
2624  insert_data.numRows = 1;
2625  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2626  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2627  CHECK(table_descriptor);
2628  if (table_descriptor->nShards > 0) {
2629  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2630  CHECK(shard);
2631  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2632  } else {
2633  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2634  }
2635 
2636  // Ensure checkpoint happens across all shards, if not in distributed
2637  // mode (aggregator handles checkpointing in distributed mode)
2638  if (!g_cluster &&
2639  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2640  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2641  }
2642 
2643  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2646  executor_->getRowSetMemoryOwner(),
2647  nullptr,
2648  0,
2649  0);
2650  std::vector<TargetMetaInfo> empty_targets;
2651  return {rs, empty_targets};
2652 }
2653 
2654 namespace {
2655 
2656 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2657 // the executor interface to use the collation directly and remove this conversion.
2658 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2659  std::list<Analyzer::OrderEntry> result;
2660  for (size_t i = 0; i < sort->collationCount(); ++i) {
2661  const auto sort_field = sort->getCollation(i);
2662  result.emplace_back(sort_field.getField() + 1,
2663  sort_field.getSortDir() == SortDirection::Descending,
2664  sort_field.getNullsPosition() == NullSortedPosition::First);
2665  }
2666  return result;
2667 }
2668 
2669 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2670  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2671  if (aggregate) {
2672  return 0;
2673  }
2674  const auto compound = dynamic_cast<const RelCompound*>(ra);
2675  return (compound && compound->isAggregate()) ? 0 : limit;
2676 }
2677 
2678 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2679  return !order_entries.empty() && order_entries.front().is_desc;
2680 }
2681 
2682 } // namespace
2683 
2685  const CompilationOptions& co,
2686  const ExecutionOptions& eo,
2687  RenderInfo* render_info,
2688  const int64_t queue_time_ms) {
2689  auto timer = DEBUG_TIMER(__func__);
2691  const auto source = sort->getInput(0);
2692  const bool is_aggregate = node_is_aggregate(source);
2693  auto it = leaf_results_.find(sort->getId());
2694  if (it != leaf_results_.end()) {
2695  // Add any transient string literals to the sdp on the agg
2696  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2697  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
2698  executor_->row_set_mem_owner_);
2699  // Handle push-down for LIMIT for multi-node
2700  auto& aggregated_result = it->second;
2701  auto& result_rows = aggregated_result.rs;
2702  const size_t limit = sort->getLimit();
2703  const size_t offset = sort->getOffset();
2704  const auto order_entries = get_order_entries(sort);
2705  if (limit || offset) {
2706  if (!order_entries.empty()) {
2707  result_rows->sort(order_entries, limit + offset, executor_);
2708  }
2709  result_rows->dropFirstN(offset);
2710  if (limit) {
2711  result_rows->keepFirstN(limit);
2712  }
2713  }
2714  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2715  sort->setOutputMetainfo(aggregated_result.targets_meta);
2716  return result;
2717  }
2718 
2719  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2720  bool is_desc{false};
2721 
2722  auto execute_sort_query = [this,
2723  sort,
2724  &source,
2725  &is_aggregate,
2726  &eo,
2727  &co,
2728  render_info,
2729  queue_time_ms,
2730  &groupby_exprs,
2731  &is_desc]() -> ExecutionResult {
2732  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2733  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2734  ExecutionOptions eo_copy = {
2736  eo.allow_multifrag,
2737  eo.just_explain,
2738  eo.allow_loop_joins,
2739  eo.with_watchdog,
2740  eo.jit_debug,
2741  eo.just_validate || sort->isEmptyResult(),
2742  eo.with_dynamic_watchdog,
2743  eo.dynamic_watchdog_time_limit,
2744  eo.find_push_down_candidates,
2745  eo.just_calcite_explain,
2746  eo.gpu_input_mem_limit_percent,
2747  eo.allow_runtime_query_interrupt,
2748  eo.running_query_interrupt_freq,
2749  eo.pending_query_interrupt_freq,
2750  eo.executor_type,
2751  };
2752 
2753  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2754  auto source_result = executeWorkUnit(source_work_unit,
2755  source->getOutputMetainfo(),
2756  is_aggregate,
2757  co,
2758  eo_copy,
2759  render_info,
2760  queue_time_ms);
2761  if (render_info && render_info->isPotentialInSituRender()) {
2762  return source_result;
2763  }
2764  if (source_result.isFilterPushDownEnabled()) {
2765  return source_result;
2766  }
2767  auto rows_to_sort = source_result.getRows();
2768  if (eo.just_explain) {
2769  return {rows_to_sort, {}};
2770  }
2771  const size_t limit = sort->getLimit();
2772  const size_t offset = sort->getOffset();
2773  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2774  !use_speculative_top_n(source_work_unit.exe_unit,
2775  rows_to_sort->getQueryMemDesc())) {
2776  const size_t top_n = limit == 0 ? 0 : limit + offset;
2777  rows_to_sort->sort(
2778  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2779  }
2780  if (limit || offset) {
2781  if (g_cluster && sort->collationCount() == 0) {
2782  if (offset >= rows_to_sort->rowCount()) {
2783  rows_to_sort->dropFirstN(offset);
2784  } else {
2785  rows_to_sort->keepFirstN(limit + offset);
2786  }
2787  } else {
2788  rows_to_sort->dropFirstN(offset);
2789  if (limit) {
2790  rows_to_sort->keepFirstN(limit);
2791  }
2792  }
2793  }
2794  return {rows_to_sort, source_result.getTargetsMeta()};
2795  };
2796 
2797  try {
2798  return execute_sort_query();
2799  } catch (const SpeculativeTopNFailed& e) {
2800  CHECK_EQ(size_t(1), groupby_exprs.size());
2801  CHECK(groupby_exprs.front());
2802  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2803  return execute_sort_query();
2804  }
2805 }
2806 
2808  const RelSort* sort,
2809  const ExecutionOptions& eo) {
2810  const auto source = sort->getInput(0);
2811  const size_t limit = sort->getLimit();
2812  const size_t offset = sort->getOffset();
2813  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2814  const size_t scan_total_limit =
2815  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2816  size_t max_groups_buffer_entry_guess{
2817  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
2819  const auto order_entries = get_order_entries(sort);
2820  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2821  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2822  const auto& source_exe_unit = source_work_unit.exe_unit;
2823 
2824  // we do not allow sorting geometry or array types
2825  for (auto order_entry : order_entries) {
2826  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2827  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2828  const auto& ti = get_target_info(te, false);
2829  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2830  throw std::runtime_error(
2831  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2832  }
2833  }
2834 
2835  if (source_exe_unit.groupby_exprs.size() == 1) {
2836  if (!source_exe_unit.groupby_exprs.front()) {
2837  sort_algorithm = SortAlgorithm::StreamingTopN;
2838  } else {
2839  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2840  first_oe_is_desc(order_entries))) {
2841  sort_algorithm = SortAlgorithm::Default;
2842  }
2843  }
2844  }
2845 
2846  sort->setOutputMetainfo(source->getOutputMetainfo());
2847  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2848  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2849  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2850  std::move(source_exe_unit.input_col_descs),
2851  source_exe_unit.simple_quals,
2852  source_exe_unit.quals,
2853  source_exe_unit.join_quals,
2854  source_exe_unit.groupby_exprs,
2855  source_exe_unit.target_exprs,
2856  nullptr,
2857  {sort_info.order_entries, sort_algorithm, limit, offset},
2858  scan_total_limit,
2859  source_exe_unit.query_hint,
2860  source_exe_unit.query_plan_dag,
2861  source_exe_unit.hash_table_build_plan_dag,
2862  source_exe_unit.table_id_to_node_map,
2863  source_exe_unit.use_bump_allocator,
2864  source_exe_unit.union_all,
2865  source_exe_unit.query_state},
2866  source,
2867  max_groups_buffer_entry_guess,
2868  std::move(source_work_unit.query_rewriter),
2869  source_work_unit.input_permutation,
2870  source_work_unit.left_deep_join_input_sizes};
2871 }
2872 
2873 namespace {
2874 
2881 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2882  CHECK(!table_infos.empty());
2883  const auto& first_table = table_infos.front();
2884  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2885  for (const auto& table_info : table_infos) {
2886  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2887  max_num_groups = table_info.info.getNumTuplesUpperBound();
2888  }
2889  }
2890  return std::max(max_num_groups, size_t(1));
2891 }
2892 
2900  for (const auto target_expr : ra_exe_unit.target_exprs) {
2901  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2902  return false;
2903  }
2904  }
2905  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2906  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2907  return true;
2908  }
2909  return false;
2910 }
2911 
2912 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2913  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2914  ra_exe_unit.simple_quals.empty());
2915 }
2916 
2918  const RelAlgExecutionUnit& ra_exe_unit_in,
2919  const std::vector<InputTableInfo>& table_infos,
2920  const Executor* executor,
2921  const ExecutorDeviceType device_type_in,
2922  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2923  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2924  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2925  const auto target_expr = ra_exe_unit.target_exprs[i];
2926  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2927  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2928  continue;
2929  }
2930  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2931  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2932  CHECK(arg);
2933  const auto& arg_ti = arg->get_type_info();
2934  // Avoid calling getExpressionRange for variable length types (string and array),
2935  // it'd trigger an assertion since that API expects to be called only for types
2936  // for which the notion of range is well-defined. A bit of a kludge, but the
2937  // logic to reject these types anyway is at lower levels in the stack and not
2938  // really worth pulling into a separate function for now.
2939  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2940  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2941  continue;
2942  }
2943  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2944  if (arg_range.getType() != ExpressionRangeType::Integer) {
2945  continue;
2946  }
2947  // When running distributed, the threshold for using the precise implementation
2948  // must be consistent across all leaves, otherwise we could have a mix of precise
2949  // and approximate bitmaps and we cannot aggregate them.
2950  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2951  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2952  const auto sub_bitmap_count =
2953  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2954  int64_t approx_bitmap_sz_bits{0};
2955  const auto error_rate = static_cast<Analyzer::AggExpr*>(target_expr)->get_arg1();
2956  if (error_rate) {
2957  CHECK(error_rate->get_type_info().get_type() == kINT);
2958  CHECK_GE(error_rate->get_constval().intval, 1);
2959  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2960  } else {
2961  approx_bitmap_sz_bits = g_hll_precision_bits;
2962  }
2963  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2964  arg_range.getIntMin(),
2965  approx_bitmap_sz_bits,
2966  true,
2967  device_type,
2968  sub_bitmap_count};
2969  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2970  arg_range.getIntMin(),
2971  bitmap_sz_bits,
2972  false,
2973  device_type,
2974  sub_bitmap_count};
2975  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2976  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2977  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2978  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2979  target_exprs_owned.push_back(precise_count_distinct);
2980  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2981  }
2982  }
2983  return ra_exe_unit;
2984 }
2985 
2987  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2988  const std::vector<TargetMetaInfo>& targets_meta) {
2989  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2990  render_info.targets.clear();
2991  for (size_t i = 0; i < targets_meta.size(); ++i) {
2992  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2993  targets_meta[i].get_resname(),
2994  work_unit_target_exprs[i]->get_shared_ptr(),
2995  false));
2996  }
2997 }
2998 
2999 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
3000  const CompilationOptions& co,
3001  const ExecutionOptions& eo) {
3003  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
3004 }
3005 
3006 } // namespace
3007 
3009  const RelAlgExecutor::WorkUnit& work_unit,
3010  const std::vector<TargetMetaInfo>& targets_meta,
3011  const bool is_agg,
3012  const CompilationOptions& co_in,
3013  const ExecutionOptions& eo,
3014  RenderInfo* render_info,
3015  const int64_t queue_time_ms,
3016  const std::optional<size_t> previous_count) {
3018  auto timer = DEBUG_TIMER(__func__);
3019 
3020  auto co = co_in;
3021  ColumnCacheMap column_cache;
3022  if (is_window_execution_unit(work_unit.exe_unit)) {
3024  throw std::runtime_error("Window functions support is disabled");
3025  }
3027  co.allow_lazy_fetch = false;
3028  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
3029  }
3030  if (!eo.just_explain && eo.find_push_down_candidates) {
3031  // find potential candidates:
3032  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3033  if (!selected_filters.empty() || eo.just_calcite_explain) {
3034  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3035  }
3036  }
3037  if (render_info && render_info->isPotentialInSituRender()) {
3038  co.allow_lazy_fetch = false;
3039  }
3040  const auto body = work_unit.body;
3041  CHECK(body);
3042  auto it = leaf_results_.find(body->getId());
3043  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
3044  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3045  if (it != leaf_results_.end()) {
3046  executor_->addTransientStringLiterals(work_unit.exe_unit,
3047  executor_->row_set_mem_owner_);
3048  auto& aggregated_result = it->second;
3049  auto& result_rows = aggregated_result.rs;
3050  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3051  body->setOutputMetainfo(aggregated_result.targets_meta);
3052  if (render_info) {
3053  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3054  }
3055  return result;
3056  }
3057  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3058 
3060  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3061 
3062  // register query hint if query_dag_ is valid
3063  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3064  if (query_dag_) {
3065  auto candidate = query_dag_->getQueryHint(body);
3066  if (candidate) {
3067  ra_exe_unit.query_hint = *candidate;
3068  }
3069  }
3070  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3071  if (is_window_execution_unit(ra_exe_unit)) {
3072  CHECK_EQ(table_infos.size(), size_t(1));
3073  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3074  max_groups_buffer_entry_guess =
3075  table_infos.front().info.fragments.front().getNumTuples();
3076  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3077  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3078  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3079  ra_exe_unit.scan_limit = *previous_count;
3080  } else {
3081  // TODO(adb): enable bump allocator path for render queries
3082  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3083  ra_exe_unit.scan_limit = 0;
3084  ra_exe_unit.use_bump_allocator = true;
3085  } else if (eo.executor_type == ::ExecutorType::Extern) {
3086  ra_exe_unit.scan_limit = 0;
3087  } else if (!eo.just_explain) {
3088  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3089  if (filter_count_all) {
3090  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3091  }
3092  }
3093  }
3094  }
3095  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3096  co.device_type,
3098  nullptr,
3099  executor_->getCatalog(),
3100  executor_->blockSize(),
3101  executor_->gridSize()),
3102  {}};
3103 
3104  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3105  const bool has_cardinality_estimation,
3106  const bool has_ndv_estimation) -> ExecutionResult {
3107  // Note that the groups buffer entry guess may be modified during query execution.
3108  // Create a local copy so we can track those changes if we need to attempt a retry
3109  // due to OOM
3110  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3111  try {
3112  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3113  is_agg,
3114  table_infos,
3115  ra_exe_unit,
3116  co,
3117  eo,
3118  cat_,
3119  render_info,
3120  has_cardinality_estimation,
3121  column_cache),
3122  targets_meta};
3123  } catch (const QueryExecutionError& e) {
3124  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3125  throw CardinalityEstimationRequired(/*range=*/0);
3126  }
3128  return handleOutOfMemoryRetry(
3129  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3130  targets_meta,
3131  is_agg,
3132  co,
3133  eo,
3134  render_info,
3136  queue_time_ms);
3137  }
3138  };
3139 
3140  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
3141  try {
3142  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3143  auto card = cached_cardinality.second;
3144  if (cached_cardinality.first && card >= 0) {
3145  result = execute_and_handle_errors(
3146  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3147  } else {
3148  result = execute_and_handle_errors(
3149  max_groups_buffer_entry_guess,
3151  /*has_ndv_estimation=*/false);
3152  }
3153  } catch (const CardinalityEstimationRequired& e) {
3154  // check the cardinality cache
3155  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3156  auto card = cached_cardinality.second;
3157  if (cached_cardinality.first && card >= 0) {
3158  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3159  } else {
3160  const auto ndv_groups_estimation =
3161  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3162  const auto estimated_groups_buffer_entry_guess =
3163  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3164  : std::min(groups_approx_upper_bound(table_infos),
3166  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3167  result = execute_and_handle_errors(
3168  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3169  if (!(eo.just_validate || eo.just_explain)) {
3170  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3171  }
3172  }
3173  }
3174 
3175  result.setQueueTime(queue_time_ms);
3176  if (render_info) {
3177  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3178  if (render_info->isPotentialInSituRender()) {
3179  // return an empty result (with the same queue time, and zero render time)
3180  return {std::make_shared<ResultSet>(
3181  queue_time_ms,
3182  0,
3183  executor_->row_set_mem_owner_
3184  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3185  : nullptr),
3186  {}};
3187  }
3188  }
3189  return result;
3190 }
3191 
3192 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3193  const bool is_agg,
3194  const CompilationOptions& co,
3195  const ExecutionOptions& eo) {
3196  const auto count =
3197  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3198  kCOUNT,
3199  nullptr,
3200  false,
3201  nullptr);
3202  const auto count_all_exe_unit =
3204  size_t one{1};
3205  ResultSetPtr count_all_result;
3206  try {
3207  ColumnCacheMap column_cache;
3208  count_all_result =
3209  executor_->executeWorkUnit(one,
3210  is_agg,
3211  get_table_infos(work_unit.exe_unit, executor_),
3212  count_all_exe_unit,
3213  co,
3214  eo,
3215  cat_,
3216  nullptr,
3217  false,
3218  column_cache);
3219  } catch (const foreign_storage::ForeignStorageException& error) {
3220  throw error;
3221  } catch (const QueryMustRunOnCpu&) {
3222  // force a retry of the top level query on CPU
3223  throw;
3224  } catch (const std::exception& e) {
3225  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3226  return std::nullopt;
3227  }
3228  const auto count_row = count_all_result->getNextRow(false, false);
3229  CHECK_EQ(size_t(1), count_row.size());
3230  const auto& count_tv = count_row.front();
3231  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3232  CHECK(count_scalar_tv);
3233  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3234  CHECK(count_ptr);
3235  CHECK_GE(*count_ptr, 0);
3236  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3237  return std::max(count_upper_bound, size_t(1));
3238 }
3239 
3240 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3241  const auto& ra_exe_unit = work_unit.exe_unit;
3242  if (ra_exe_unit.input_descs.size() != 1) {
3243  return false;
3244  }
3245  const auto& table_desc = ra_exe_unit.input_descs.front();
3246  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3247  return false;
3248  }
3249  const int table_id = table_desc.getTableId();
3250  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3251  const auto comp_expr =
3252  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3253  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3254  return false;
3255  }
3256  const auto lhs = comp_expr->get_left_operand();
3257  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3258  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3259  return false;
3260  }
3261  const auto rhs = comp_expr->get_right_operand();
3262  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3263  if (!rhs_const) {
3264  return false;
3265  }
3266  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3267  if (cd->isVirtualCol) {
3268  CHECK_EQ("rowid", cd->columnName);
3269  return true;
3270  }
3271  }
3272  return false;
3273 }
3274 
3276  const RelAlgExecutor::WorkUnit& work_unit,
3277  const std::vector<TargetMetaInfo>& targets_meta,
3278  const bool is_agg,
3279  const CompilationOptions& co,
3280  const ExecutionOptions& eo,
3281  RenderInfo* render_info,
3282  const bool was_multifrag_kernel_launch,
3283  const int64_t queue_time_ms) {
3284  // Disable the bump allocator
3285  // Note that this will have basically the same affect as using the bump allocator for
3286  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3287  // path and the bump allocator path for kernel per fragment execution.
3288  auto ra_exe_unit_in = work_unit.exe_unit;
3289  ra_exe_unit_in.use_bump_allocator = false;
3290 
3291  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3292  co.device_type,
3294  nullptr,
3295  executor_->getCatalog(),
3296  executor_->blockSize(),
3297  executor_->gridSize()),
3298  {}};
3299 
3300  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3301  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3302  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3303  false,
3304  false,
3305  eo.allow_loop_joins,
3306  eo.with_watchdog,
3307  eo.jit_debug,
3308  false,
3311  false,
3312  false,
3317  eo.executor_type,
3319 
3320  if (was_multifrag_kernel_launch) {
3321  try {
3322  // Attempt to retry using the kernel per fragment path. The smaller input size
3323  // required may allow the entire kernel to execute in GPU memory.
3324  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3325  "kernels disabled.";
3326  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3327  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3328  ColumnCacheMap column_cache;
3329  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3330  is_agg,
3331  table_infos,
3332  ra_exe_unit,
3333  co,
3334  eo_no_multifrag,
3335  cat_,
3336  nullptr,
3337  true,
3338  column_cache),
3339  targets_meta};
3340  result.setQueueTime(queue_time_ms);
3341  } catch (const QueryExecutionError& e) {
3343  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3344  }
3345  }
3346 
3347  if (render_info) {
3348  render_info->setForceNonInSituData();
3349  }
3350 
3351  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3352  // Only reset the group buffer entry guess if we ran out of slots, which
3353  // suggests a
3354  // highly pathological input which prevented a good estimation of distinct tuple
3355  // count. For projection queries, this will force a per-fragment scan limit, which is
3356  // compatible with the CPU path
3357  VLOG(1) << "Resetting max groups buffer entry guess.";
3358  max_groups_buffer_entry_guess = 0;
3359 
3360  int iteration_ctr = -1;
3361  while (true) {
3362  iteration_ctr++;
3364  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3365  ColumnCacheMap column_cache;
3366  try {
3367  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3368  is_agg,
3369  table_infos,
3370  ra_exe_unit,
3371  co_cpu,
3372  eo_no_multifrag,
3373  cat_,
3374  nullptr,
3375  true,
3376  column_cache),
3377  targets_meta};
3378  } catch (const QueryExecutionError& e) {
3379  // Ran out of slots
3380  if (e.getErrorCode() < 0) {
3381  // Even the conservative guess failed; it should only happen when we group
3382  // by a huge cardinality array. Maybe we should throw an exception instead?
3383  // Such a heavy query is entirely capable of exhausting all the host memory.
3384  CHECK(max_groups_buffer_entry_guess);
3385  // Only allow two iterations of increasingly large entry guesses up to a maximum
3386  // of 512MB per column per kernel
3387  if (g_enable_watchdog || iteration_ctr > 1) {
3388  throw std::runtime_error("Query ran out of output slots in the result");
3389  }
3390  max_groups_buffer_entry_guess *= 2;
3391  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3392  "groups buffer entry "
3393  "guess equal to "
3394  << max_groups_buffer_entry_guess;
3395  } else {
3397  }
3398  continue;
3399  }
3400  result.setQueueTime(queue_time_ms);
3401  return result;
3402  }
3403  return result;
3404 }
3405 
3406 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3407  LOG(ERROR) << "Query execution failed with error "
3408  << getErrorMessageFromCode(error_code);
3409  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3410  // We ran out of GPU memory, this doesn't count as an error if the query is
3411  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3412  // --allow-cpu-retry.
3413  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3414  if (!g_allow_cpu_retry) {
3415  throw std::runtime_error(
3416  "Query ran out of GPU memory, unable to automatically retry on CPU");
3417  }
3418  return;
3419  }
3420  throw std::runtime_error(getErrorMessageFromCode(error_code));
3421 }
3422 
3423 namespace {
3424 struct ErrorInfo {
3425  const char* code{nullptr};
3426  const char* description{nullptr};
3427 };
3428 ErrorInfo getErrorDescription(const int32_t error_code) {
3429  // 'designated initializers' don't compile on Windows for std 17
3430  // They require /std:c++20. They been removed for the windows port.
3431  switch (error_code) {
3433  return {"ERR_DIV_BY_ZERO", "Division by zero"};
3435  return {"ERR_OUT_OF_GPU_MEM",
3436 
3437  "Query couldn't keep the entire working set of columns in GPU memory"};
3439  return {"ERR_UNSUPPORTED_SELF_JOIN", "Self joins not supported yet"};
3441  return {"ERR_OUT_OF_CPU_MEM", "Not enough host memory to execute the query"};
3443  return {"ERR_OVERFLOW_OR_UNDERFLOW", "Overflow or underflow"};
3445  return {"ERR_OUT_OF_TIME", "Query execution has exceeded the time limit"};
3447  return {"ERR_INTERRUPTED", "Query execution has been interrupted"};
3449  return {"ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3450  "Columnar conversion not supported for variable length types"};
3452  return {"ERR_TOO_MANY_LITERALS", "Too many literals in the query"};
3454  return {"ERR_STRING_CONST_IN_RESULTSET",
3455 
3456  "NONE ENCODED String types are not supported as input result set."};
3458  return {"ERR_OUT_OF_RENDER_MEM",
3459 
3460  "Insufficient GPU memory for query results in render output buffer "
3461  "sized by render-mem-bytes"};
3463  return {"ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3464  "Streaming-Top-N not supported in Render Query"};
3466  return {"ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3467  "Multiple distinct values encountered"};
3468  case Executor::ERR_GEOS:
3469  return {"ERR_GEOS", "ERR_GEOS"};
3471  return {"ERR_WIDTH_BUCKET_INVALID_ARGUMENT",
3472 
3473  "Arguments of WIDTH_BUCKET function does not satisfy the condition"};
3474  default:
3475  return {nullptr, nullptr};
3476  }
3477 }
3478 
3479 } // namespace
3480 
3481 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3482  if (error_code < 0) {
3483  return "Ran out of slots in the query output buffer";
3484  }
3485  const auto errorInfo = getErrorDescription(error_code);
3486 
3487  if (errorInfo.code) {
3488  return errorInfo.code + ": "s + errorInfo.description;
3489  } else {
3490  return "Other error: code "s + std::to_string(error_code);
3491  }
3492 }
3493 
3496  VLOG(1) << "Running post execution callback.";
3497  (*post_execution_callback_)();
3498  }
3499 }
3500 
3502  const SortInfo& sort_info,
3503  const ExecutionOptions& eo) {
3504  const auto compound = dynamic_cast<const RelCompound*>(node);
3505  if (compound) {
3506  return createCompoundWorkUnit(compound, sort_info, eo);
3507  }
3508  const auto project = dynamic_cast<const RelProject*>(node);
3509  if (project) {
3510  return createProjectWorkUnit(project, sort_info, eo);
3511  }
3512  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3513  if (aggregate) {
3514  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3515  }
3516  const auto filter = dynamic_cast<const RelFilter*>(node);
3517  if (filter) {
3518  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3519  }
3520  LOG(FATAL) << "Unhandled node type: " << node->toString();
3521  return {};
3522 }
3523 
3524 namespace {
3525 
3527  auto sink = get_data_sink(ra);
3528  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3529  return join->getJoinType();
3530  }
3531  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3532  return JoinType::INNER;
3533  }
3534 
3535  return JoinType::INVALID;
3536 }
3537 
3538 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3539  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3540  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3541  return nullptr;
3542  }
3543  const auto equi_join_condition =
3544  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3545  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3546  return nullptr;
3547  }
3548  const auto both_are_null_condition =
3549  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3550  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3551  both_are_null_condition->size() != 2) {
3552  return nullptr;
3553  }
3554  const auto lhs_is_null =
3555  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3556  const auto rhs_is_null =
3557  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3558  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3559  rhs_is_null->getOperator() != kISNULL) {
3560  return nullptr;
3561  }
3562  CHECK_EQ(size_t(1), lhs_is_null->size());
3563  CHECK_EQ(size_t(1), rhs_is_null->size());
3564  CHECK_EQ(size_t(2), equi_join_condition->size());
3565  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3566  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3567  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3568  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3569  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3570  return nullptr;
3571  }
3572  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3573  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3574  RexDeepCopyVisitor deep_copy_visitor;
3575  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3576  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3577  eq_operands.emplace_back(lhs_op_copy.release());
3578  eq_operands.emplace_back(rhs_op_copy.release());
3579  return boost::make_unique<const RexOperator>(
3580  kBW_EQ, eq_operands, equi_join_condition->getType());
3581  }
3582  return nullptr;
3583 }
3584 
3585 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3586  const RexScalar* scalar) {
3587  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3588  if (condition && condition->getOperator() == kAND) {
3589  CHECK_GE(condition->size(), size_t(2));
3590  auto acc = get_bitwise_equals(condition->getOperand(0));
3591  if (!acc) {
3592  return nullptr;
3593  }
3594  for (size_t i = 1; i < condition->size(); ++i) {
3595  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3596  and_operands.emplace_back(std::move(acc));
3597  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3598  acc =
3599  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3600  }
3601  return acc;
3602  }
3603  return get_bitwise_equals(scalar);
3604 }
3605 
3606 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3607  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3608  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3609  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3610  ++nesting_level) {
3611  if (left_deep_join->getOuterCondition(nesting_level)) {
3612  join_types[nesting_level - 1] = JoinType::LEFT;
3613  }
3614  auto cur_level_join_type = left_deep_join->getJoinType(nesting_level);
3615  if (cur_level_join_type == JoinType::SEMI || cur_level_join_type == JoinType::ANTI) {
3616  join_types[nesting_level - 1] = cur_level_join_type;
3617  }
3618  }
3619  return join_types;
3620 }
3621 
3622 template <class RA>
3623 std::vector<size_t> do_table_reordering(
3624  std::vector<InputDescriptor>& input_descs,
3625  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3626  const JoinQualsPerNestingLevel& left_deep_join_quals,
3627  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3628  const RA* node,
3629  const std::vector<InputTableInfo>& query_infos,
3630  const Executor* executor) {
3631  if (g_cluster) {
3632  // Disable table reordering in distributed mode. The aggregator does not have enough
3633  // information to break ties
3634  return {};
3635  }
3636  const auto& cat = *executor->getCatalog();
3637  for (const auto& table_info : query_infos) {
3638  if (table_info.table_id < 0) {
3639  continue;
3640  }
3641  const auto td = cat.getMetadataForTable(table_info.table_id);
3642  CHECK(td);
3643  if (table_is_replicated(td)) {
3644  return {};
3645  }
3646  }
3647  const auto input_permutation =
3648  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3649  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3650  std::tie(input_descs, input_col_descs, std::ignore) =
3651  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3652  return input_permutation;
3653 }
3654 
3656  const RelLeftDeepInnerJoin* left_deep_join) {
3657  std::vector<size_t> input_sizes;
3658  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3659  const auto inputs = get_node_output(left_deep_join->getInput(i));
3660  input_sizes.push_back(inputs.size());
3661  }
3662  return input_sizes;
3663 }
3664 
3665 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3666  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3667  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3668  for (const auto& qual : quals) {
3669  const auto rewritten_qual = rewrite_expr(qual.get());
3670  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3671  }
3672  return rewritten_quals;
3673 }
3674 
3675 } // namespace
3676 
3678  const RelCompound* compound,
3679  const SortInfo& sort_info,
3680  const ExecutionOptions& eo) {
3681  std::vector<InputDescriptor> input_descs;
3682  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3683  auto input_to_nest_level = get_input_nest_levels(compound, {});
3684  std::tie(input_descs, input_col_descs, std::ignore) =
3685  get_input_desc(compound, input_to_nest_level, {}, cat_);
3686  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3687  const auto query_infos = get_table_infos(input_descs, executor_);
3688  CHECK_EQ(size_t(1), compound->inputCount());
3689  const auto left_deep_join =
3690  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3691  JoinQualsPerNestingLevel left_deep_join_quals;
3692  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3693  : std::vector<JoinType>{get_join_type(compound)};
3694  std::vector<size_t> input_permutation;
3695  std::vector<size_t> left_deep_join_input_sizes;
3696  std::optional<unsigned> left_deep_tree_id;
3697  if (left_deep_join) {
3698  left_deep_tree_id = left_deep_join->getId();
3699  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3700  left_deep_join_quals = translateLeftDeepJoinFilter(
3701  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3703  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3704  join_types.end()) {
3705  input_permutation = do_table_reordering(input_descs,
3706  input_col_descs,
3707  left_deep_join_quals,
3708  input_to_nest_level,
3709  compound,
3710  query_infos,
3711  executor_);
3712  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3713  std::tie(input_descs, input_col_descs, std::ignore) =
3714  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3715  left_deep_join_quals = translateLeftDeepJoinFilter(
3716  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3717  }
3718  }
3719  RelAlgTranslator translator(cat_,
3720  query_state_,
3721  executor_,
3722  input_to_nest_level,
3723  join_types,
3724  now_,
3725  eo.just_explain);
3726  const auto scalar_sources =
3727  translate_scalar_sources(compound, translator, eo.executor_type);
3728  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3729  const auto quals_cf = translate_quals(compound, translator);
3730  const auto target_exprs = translate_targets(target_exprs_owned_,
3731  scalar_sources,
3732  groupby_exprs,
3733  compound,
3734  translator,
3735  eo.executor_type);
3736  auto query_hint = RegisteredQueryHint::defaults();
3737  if (query_dag_) {
3738  auto candidate = query_dag_->getQueryHint(compound);
3739  if (candidate) {
3740  query_hint = *candidate;
3741  }
3742  }
3743  CHECK_EQ(compound->size(), target_exprs.size());
3744  const RelAlgExecutionUnit exe_unit = {input_descs,
3745  input_col_descs,
3746  quals_cf.simple_quals,
3747  rewrite_quals(quals_cf.quals),
3748  left_deep_join_quals,
3749  groupby_exprs,
3750  target_exprs,
3751  nullptr,
3752  sort_info,
3753  0,
3754  query_hint,
3756  {},
3757  {},
3758  false,
3759  std::nullopt,
3760  query_state_};
3761  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3762  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3763  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3764  compound->setOutputMetainfo(targets_meta);
3765  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
3766  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
3767  left_deep_trees_info.emplace(left_deep_tree_id.value(),
3768  rewritten_exe_unit.join_quals);
3769  }
3770  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(compound,
3771  cat_,
3772  left_deep_tree_id,
3773  left_deep_trees_info,
3775  executor_,
3776  translator);
3777  if (is_extracted_dag_valid(dag_info)) {
3778  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
3779  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
3780  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
3781  }
3782  return {rewritten_exe_unit,
3783  compound,
3785  std::move(query_rewriter),
3786  input_permutation,
3787  left_deep_join_input_sizes};
3788 }
3789 
3790 std::shared_ptr<RelAlgTranslator> RelAlgExecutor::getRelAlgTranslator(
3791  const RelAlgNode* node) {
3792  auto input_to_nest_level = get_input_nest_levels(node, {});
3793  const auto left_deep_join =
3794  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
3795  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3796  : std::vector<JoinType>{get_join_type(node)};
3797  return std::make_shared<RelAlgTranslator>(
3798  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, false);
3799 }
3800 
3801 namespace {
3802 
3803 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3804  CHECK(qual_expr);
3805  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3806  if (!bin_oper || bin_oper->getOperator() != kAND) {
3807  return {qual_expr};
3808  }
3809  CHECK_GE(bin_oper->size(), size_t(2));
3810  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3811  for (size_t i = 1; i < bin_oper->size(); ++i) {
3812  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3813  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3814  }
3815  return lhs_cf;
3816 }
3817 
3818 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3819  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3820  const SQLOps sql_op) {
3821  CHECK(!factors.empty());
3822  auto acc = factors.front();
3823  for (size_t i = 1; i < factors.size(); ++i) {
3824  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3825  }
3826  return acc;
3827 }
3828 
3829 template <class QualsList>
3830 bool list_contains_expression(const QualsList& haystack,
3831  const std::shared_ptr<Analyzer::Expr>& needle) {
3832  for (const auto& qual : haystack) {
3833  if (*qual == *needle) {
3834  return true;
3835  }
3836  }
3837  return false;
3838 }
3839 
3840 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3841 // evaluations of `p` and allows use of the original form in joins if `p`
3842 // can be used for hash joins.
3843 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3844  const std::shared_ptr<Analyzer::Expr>& expr) {
3845  const auto expr_terms = qual_to_disjunctive_form(expr);
3846  CHECK_GE(expr_terms.size(), size_t(1));
3847  const auto& first_term = expr_terms.front();
3848  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3849  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3850  // First, collect the conjunctive components common to all the disjunctive components.
3851  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3852  for (const auto& first_term_factor : first_term_factors.quals) {
3853  bool is_common =
3854  expr_terms.size() > 1; // Only report common factors for disjunction.
3855  for (size_t i = 1; i < expr_terms.size(); ++i) {
3856  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3857  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3858  is_common = false;
3859  break;
3860  }
3861  }
3862  if (is_common) {
3863  common_factors.push_back(first_term_factor);
3864  }
3865  }
3866  if (common_factors.empty()) {
3867  return expr;
3868  }
3869  // Now that the common expressions are known, collect the remaining expressions.
3870  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3871  for (const auto& term : expr_terms) {
3872  const auto term_cf = qual_to_conjunctive_form(term);
3873  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3874  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3875  for (const auto& qual : term_cf.quals) {
3876  if (!list_contains_expression(common_factors, qual)) {
3877  remaining_quals.push_back(qual);
3878  }
3879  }
3880  if (!remaining_quals.empty()) {
3881  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3882  }
3883  }
3884  // Reconstruct the expression with the transformation applied.
3885  const auto common_expr = build_logical_expression(common_factors, kAND);
3886  if (remaining_terms.empty()) {
3887  return common_expr;
3888  }
3889  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3890  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3891 }
3892 
3893 } // namespace
3894 
3895 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3896  const RexScalar* join_condition,
3897  const std::vector<JoinType>& join_types,
3898  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3899  const bool just_explain) const {
3900  RelAlgTranslator translator(
3901  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3902  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3903  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3904  for (const auto rex_condition_component : rex_condition_cf) {
3905  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3906  const auto join_condition =
3908  bw_equals ? bw_equals.get() : rex_condition_component));
3909  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3910  join_condition_quals.insert(join_condition_quals.end(),
3911  join_condition_cf.quals.begin(),
3912  join_condition_cf.quals.end());
3913  join_condition_quals.insert(join_condition_quals.end(),
3914  join_condition_cf.simple_quals.begin(),
3915  join_condition_cf.simple_quals.end());
3916  }
3917  return combine_equi_join_conditions(join_condition_quals);
3918 }
3919 
3920 // Translate left deep join filter and separate the conjunctive form qualifiers
3921 // per nesting level. The code generated for hash table lookups on each level
3922 // must dominate its uses in deeper nesting levels.
3924  const RelLeftDeepInnerJoin* join,
3925  const std::vector<InputDescriptor>& input_descs,
3926  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3927  const bool just_explain) {
3928  const auto join_types = left_deep_join_types(join);
3929  const auto join_condition_quals = makeJoinQuals(
3930  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3931  MaxRangeTableIndexVisitor rte_idx_visitor;
3932  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3933  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3934  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3935  const auto outer_condition = join->getOuterCondition(rte_idx);
3936  if (outer_condition) {
3937  result[rte_idx - 1].quals =
3938  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3939  CHECK_LE(rte_idx, join_types.size());
3940  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3941  result[rte_idx - 1].type = JoinType::LEFT;
3942  continue;
3943  }
3944  for (const auto& qual : join_condition_quals) {
3945  if (visited_quals.count(qual)) {
3946  continue;
3947  }
3948  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3949  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3950  const auto it_ok = visited_quals.emplace(qual);
3951  CHECK(it_ok.second);
3952  result[rte_idx - 1].quals.push_back(qual);
3953  }
3954  }
3955  CHECK_LE(rte_idx, join_types.size());
3956  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
3957  join_types[rte_idx - 1] == JoinType::SEMI ||
3958  join_types[rte_idx - 1] == JoinType::ANTI);
3959  result[rte_idx - 1].type = join_types[rte_idx - 1];
3960  }
3961  return result;
3962 }
3963 
3964 namespace {
3965 
3966 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3967  const RelAlgNode* ra_node,
3968  const size_t nest_level,
3969  const std::vector<TargetMetaInfo>& in_metainfo,
3970  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3971  CHECK_LE(size_t(1), ra_node->inputCount());
3972  CHECK_GE(size_t(2), ra_node->inputCount());
3973  const auto input = ra_node->getInput(nest_level);
3974  const auto it_rte_idx = input_to_nest_level.find(input);
3975  CHECK(it_rte_idx != input_to_nest_level.end());
3976  const int rte_idx = it_rte_idx->second;
3977  const int table_id = table_id_from_ra(input);
3978  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3979  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3980  int input_idx = 0;
3981  for (const auto& input_meta : in_metainfo) {
3982  inputs.push_back(
3983  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3984  table_id,
3985  scan_ra ? input_idx + 1 : input_idx,
3986  rte_idx));
3987  ++input_idx;
3988  }
3989  return inputs;
3990 }
3991 
3992 } // namespace
3993 
3995  const RelAggregate* aggregate,
3996  const SortInfo& sort_info,
3997  const bool just_explain) {
3998  std::vector<InputDescriptor> input_descs;
3999  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4000  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4001  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4002  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4003  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4004  const auto join_type = get_join_type(aggregate);
4005 
4006  RelAlgTranslator translator(cat_,
4007  query_state_,
4008  executor_,
4009  input_to_nest_level,
4010  {join_type},
4011  now_,
4012  just_explain);
4013  CHECK_EQ(size_t(1), aggregate->inputCount());
4014  const auto source = aggregate->getInput(0);
4015  const auto& in_metainfo = source->getOutputMetainfo();
4016  const auto scalar_sources =
4017  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4018  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4019  const auto target_exprs = translate_targets(
4020  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
4021  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4022  aggregate->setOutputMetainfo(targets_meta);
4023  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(aggregate,
4024  cat_,
4025  std::nullopt,
4028  executor_,
4029  translator);
4030  auto query_hint = RegisteredQueryHint::defaults();
4031  if (query_dag_) {
4032  auto candidate = query_dag_->getQueryHint(aggregate);
4033  if (candidate) {
4034  query_hint = *candidate;
4035  }
4036  }
4037  return {RelAlgExecutionUnit{input_descs,
4038  input_col_descs,
4039  {},
4040  {},
4041  {},
4042  groupby_exprs,
4043  target_exprs,
4044  nullptr,
4045  sort_info,
4046  0,
4047  query_hint,
4048  dag_info.extracted_dag,
4049  dag_info.hash_table_plan_dag,
4050  dag_info.table_id_to_node_map,
4051  false,
4052  std::nullopt,
4053  query_state_},
4054  aggregate,
4056  nullptr};
4057 }
4058 
4060  const RelProject* project,
4061  const SortInfo& sort_info,
4062  const ExecutionOptions& eo) {
4063  std::vector<InputDescriptor> input_descs;
4064  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4065  auto input_to_nest_level = get_input_nest_levels(project, {});
4066  std::tie(input_descs, input_col_descs, std::ignore) =
4067  get_input_desc(project, input_to_nest_level, {}, cat_);
4068  const auto query_infos = get_table_infos(input_descs, executor_);
4069 
4070  const auto left_deep_join =
4071  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4072  JoinQualsPerNestingLevel left_deep_join_quals;
4073  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4074  : std::vector<JoinType>{get_join_type(project)};
4075  std::vector<size_t> input_permutation;
4076  std::vector<size_t> left_deep_join_input_sizes;
4077  std::optional<unsigned> left_deep_tree_id;
4078  if (left_deep_join) {
4079  left_deep_tree_id = left_deep_join->getId();
4080  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4081  const auto query_infos = get_table_infos(input_descs, executor_);
4082  left_deep_join_quals = translateLeftDeepJoinFilter(
4083  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4085  input_permutation = do_table_reordering(input_descs,
4086  input_col_descs,
4087  left_deep_join_quals,
4088  input_to_nest_level,
4089  project,
4090  query_infos,
4091  executor_);
4092  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4093  std::tie(input_descs, input_col_descs, std::ignore) =
4094  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4095  left_deep_join_quals = translateLeftDeepJoinFilter(
4096  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4097  }
4098  }
4099 
4100  RelAlgTranslator translator(cat_,
4101  query_state_,
4102  executor_,
4103  input_to_nest_level,
4104  join_types,
4105  now_,
4106  eo.just_explain);
4107  const auto target_exprs_owned =
4108  translate_scalar_sources(project, translator, eo.executor_type);
4109  target_exprs_owned_.insert(
4110  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4111  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4112  auto query_hint = RegisteredQueryHint::defaults();
4113  if (query_dag_) {
4114  auto candidate = query_dag_->getQueryHint(project);
4115  if (candidate) {
4116  query_hint = *candidate;
4117  }
4118  }
4119  const RelAlgExecutionUnit exe_unit = {input_descs,
4120  input_col_descs,
4121  {},
4122  {},
4123  left_deep_join_quals,
4124  {nullptr},
4125  target_exprs,
4126  nullptr,
4127  sort_info,
4128  0,
4129  query_hint,
4131  {},
4132  {},
4133  false,
4134  std::nullopt,
4135  query_state_};
4136  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4137  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4138  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4139  project->setOutputMetainfo(targets_meta);
4140  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4141  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4142  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4143  rewritten_exe_unit.join_quals);
4144  }
4145  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(project,
4146  cat_,
4147  left_deep_tree_id,
4148  left_deep_trees_info,
4150  executor_,
4151  translator);
4152  if (is_extracted_dag_valid(dag_info)) {
4153  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
4154  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
4155  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
4156  }
4157  return {rewritten_exe_unit,
4158  project,
4160  std::move(query_rewriter),
4161  input_permutation,
4162  left_deep_join_input_sizes};
4163 }
4164 
4165 namespace {
4166 
4167 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
4168  RelAlgNode const* input_node) {
4169  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
4170  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
4171  const int negative_node_id = -input_node->getId();
4172  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
4173  target_exprs.reserve(tmis.size());
4174  for (size_t i = 0; i < tmis.size(); ++i) {
4175  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
4176  tmis[i].get_type_info(), negative_node_id, i, 0));
4177  }
4178  return target_exprs;
4179 }
4180 
4181 } // namespace
4182 
4184  const RelLogicalUnion* logical_union,
4185  const SortInfo& sort_info,
4186  const ExecutionOptions& eo) {
4187  std::vector<InputDescriptor> input_descs;
4188  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4189  // Map ra input ptr to index (0, 1).
4190  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4191  std::tie(input_descs, input_col_descs, std::ignore) =
4192  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4193  const auto query_infos = get_table_infos(input_descs, executor_);
4194  auto const max_num_tuples =
4195  std::accumulate(query_infos.cbegin(),
4196  query_infos.cend(),
4197  size_t(0),
4198  [](auto max, auto const& query_info) {
4199  return std::max(max, query_info.info.getNumTuples());
4200  });
4201 
4202  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4203  for (auto& pair : input_to_nest_level) {
4204  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
4205  }
4206 
4207  RelAlgTranslator translator(
4208  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4209 
4210  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
4211  CHECK(!input_exprs_owned.empty())
4212  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
4213  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
4214  for (auto& input_expr : input_exprs_owned) {
4215  VLOG(3) << " " << input_expr->toString();
4216  }
4217  target_exprs_owned_.insert(
4218  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4219  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
4220 
4221  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4222  << " input_col_descs=" << shared::printContainer(input_col_descs)
4223  << " target_exprs.size()=" << target_exprs.size()
4224  << " max_num_tuples=" << max_num_tuples;
4225 
4226  const RelAlgExecutionUnit exe_unit = {input_descs,
4227  input_col_descs,
4228  {}, // quals_cf.simple_quals,
4229  {}, // rewrite_quals(quals_cf.quals),
4230  {},
4231  {nullptr},
4232  target_exprs,
4233  nullptr,
4234  sort_info,
4235  max_num_tuples,
4238  {},
4239  {},
4240  false,
4241  logical_union->isAll(),
4242  query_state_};
4243  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4244  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4245 
4246  RelAlgNode const* input0 = logical_union->getInput(0);
4247  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4248  logical_union->setOutputMetainfo(
4249  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4250  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4251  logical_union->setOutputMetainfo(
4252  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4253  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4254  logical_union->setOutputMetainfo(
4255  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4256  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4257  logical_union->setOutputMetainfo(
4258  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4259  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4260  logical_union->setOutputMetainfo(
4261  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4262  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4263  logical_union->setOutputMetainfo(
4264  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4265  } else if (dynamic_cast<const RelSort*>(input0)) {
4266  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4267  } else {
4268  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4269  }
4270  VLOG(3) << "logical_union->getOutputMetainfo()="
4271  << shared::printContainer(logical_union->getOutputMetainfo())
4272  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4273  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4274 
4275  return {rewritten_exe_unit,
4276  logical_union,
4278  std::move(query_rewriter)};
4279 }
4280 
4282  const RelTableFunction* rel_table_func,
4283  const bool just_explain,
4284  const bool is_gpu) {
4285  std::vector<InputDescriptor> input_descs;
4286  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4287  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4288  std::tie(input_descs, input_col_descs, std::ignore) =
4289  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4290  const auto query_infos = get_table_infos(input_descs, executor_);
4291  RelAlgTranslator translator(
4292  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4293  const auto input_exprs_owned = translate_scalar_sources(
4294  rel_table_func, translator, ::ExecutorType::TableFunctions);
4295  target_exprs_owned_.insert(
4296  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4297  auto input_exprs = get_exprs_not_owned(input_exprs_owned);
4298 
4299  const auto table_function_impl_and_type_infos = [=]() {
4300  if (is_gpu) {
4301  try {
4302  return bind_table_function(
4303  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4304  } catch (ExtensionFunctionBindingError& e) {
4305  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4306  << " Redirecting " << rel_table_func->getFunctionName()
4307  << " step to run on CPU.";
4308  throw QueryMustRunOnCpu();
4309  }
4310  } else {
4311  try {
4312  return bind_table_function(
4313  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4314  } catch (ExtensionFunctionBindingError& e) {
4315  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4316  throw;
4317  }
4318  }
4319  }();
4320  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4321  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4322 
4323  size_t output_row_sizing_param = 0;
4324  if (table_function_impl
4325  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
4326  const auto parameter_index =
4327  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4328  CHECK_GT(parameter_index, size_t(0));
4329  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4330  const auto parameter_expr =
4331  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4332  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4333  if (!parameter_expr_literal) {
4334  throw std::runtime_error(
4335  "Provided output buffer sizing parameter is not a literal. Only literal "
4336  "values are supported with output buffer sizing configured table "
4337  "functions.");
4338  }
4339  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4340  if (literal_val < 0) {
4341  throw std::runtime_error("Provided output sizing parameter " +
4342  std::to_string(literal_val) +
4343  " must be positive integer.");
4344  }
4345  output_row_sizing_param = static_cast<size_t>(literal_val);
4346  } else {
4347  // RowMultiplier not specified in the SQL query. Set it to 1
4348  output_row_sizing_param = 1; // default value for RowMultiplier
4349  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
4350  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4351  makeExpr<Analyzer::Constant>(kINT, false, d);
4352  // Push the constant 1 to input_exprs
4353  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4354  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4355  }
4356  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4357  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4358  } else {
4359  UNREACHABLE();
4360  }
4361 
4362  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4363  size_t input_index = 0;
4364  size_t arg_index = 0;
4365  const auto table_func_args = table_function_impl.getInputArgs();
4366  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
4367  for (const auto& ti : table_function_type_infos) {
4368  if (ti.is_column_list()) {
4369  for (int i = 0; i < ti.get_dimension(); i++) {
4370  auto& input_expr = input_exprs[input_index];
4371  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4372  CHECK(col_var);
4373 
4374  // avoid setting type info to ti here since ti doesn't have all the
4375  // properties correctly set
4376  auto type_info = input_expr->get_type_info();
4377  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4378  type_info.set_type(ti.get_type()); // set type to column list
4379  type_info.set_dimension(ti.get_dimension());
4380  input_expr->set_type_info(type_info);
4381 
4382  input_col_exprs.push_back(col_var);
4383  input_index++;
4384  }
4385  } else if (ti.is_column()) {
4386  auto& input_expr = input_exprs[input_index];
4387  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4388  CHECK(col_var);
4389 
4390  // same here! avoid setting type info to ti since it doesn't have all the
4391  // properties correctly set
4392  auto type_info = input_expr->get_type_info();
4393  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4394  type_info.set_type(ti.get_type()); // set type to column
4395  input_expr->set_type_info(type_info);
4396 
4397  input_col_exprs.push_back(col_var);
4398  input_index++;
4399  } else {
4400  auto input_expr = input_exprs[input_index];
4401  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
4402  if (ext_func_arg_ti != input_expr->get_type_info()) {
4403  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
4404  }
4405  input_index++;
4406  }
4407  arg_index++;
4408  }
4409  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4410  std::vector<Analyzer::Expr*> table_func_outputs;
4411  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4412  auto ti = table_function_impl.getOutputSQLType(i);
4413  if (ti.is_dict_encoded_string()) {
4414  auto p = table_function_impl.getInputID(i);
4415 
4416  int32_t input_pos = p.first;
4417  // Iterate over the list of arguments to compute the offset. Use this offset to
4418  // get the corresponding input
4419  int32_t offset = 0;
4420  for (int j = 0; j < input_pos; j++) {
4421  const auto ti = table_function_type_infos[j];
4422  offset += ti.is_column_list() ? ti.get_dimension() : 1;
4423  }
4424  input_pos = offset + p.second;
4425 
4426  CHECK_LT(input_pos, input_exprs.size());
4427  int32_t comp_param = input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4428  ti.set_comp_param(comp_param);
4429  }
4430  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4431  table_func_outputs.push_back(target_exprs_owned_.back().get());
4432  }
4433  const TableFunctionExecutionUnit exe_unit = {
4434  input_descs,
4435  input_col_descs,
4436  input_exprs, // table function inputs
4437  input_col_exprs, // table function column inputs (duplicates w/ above)
4438  table_func_outputs, // table function projected exprs
4439  output_row_sizing_param, // output buffer sizing param
4440  table_function_impl};
4441  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4442  rel_table_func->setOutputMetainfo(targets_meta);
4443  return {exe_unit, rel_table_func};
4444 }
4445 
4446 namespace {
4447 
4448 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4450  const RelAlgTranslator& translator,
4451  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4452  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4453  std::vector<TargetMetaInfo> in_metainfo;
4454  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4455  const auto data_sink_node = get_data_sink(filter);
4456  auto input_it = inputs_owned.begin();
4457  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4458  const auto source = data_sink_node->getInput(nest_level);
4459  const auto scan_source = dynamic_cast<const RelScan*>(source);
4460  if (scan_source) {
4461  CHECK(source->getOutputMetainfo().empty());
4462  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4463  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4464  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4465  }
4466  const auto source_metadata =
4467  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
4468  in_metainfo.insert(
4469  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4470  exprs_owned.insert(
4471  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4472  } else {
4473  const auto& source_metadata = source->getOutputMetainfo();
4474  input_it += source_metadata.size();
4475  in_metainfo.insert(
4476  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4477  const auto scalar_sources_owned = synthesize_inputs(
4478  data_sink_node, nest_level, source_metadata, input_to_nest_level);
4479  exprs_owned.insert(
4480  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4481  }
4482  }
4483  return std::make_pair(in_metainfo, exprs_owned);
4484 }
4485 
4486 } // namespace
4487 
4489  const SortInfo& sort_info,
4490  const bool just_explain) {
4491  CHECK_EQ(size_t(1), filter->inputCount());
4492  std::vector<InputDescriptor> input_descs;
4493  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4494  std::vector<TargetMetaInfo> in_metainfo;
4495  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4496  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4497 
4498  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4499  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4500  get_input_desc(filter, input_to_nest_level, {}, cat_);
4501  const auto join_type = get_join_type(filter);
4502  RelAlgTranslator translator(cat_,
4503  query_state_,
4504  executor_,
4505  input_to_nest_level,
4506  {join_type},
4507  now_,
4508  just_explain);
4509  std::tie(in_metainfo, target_exprs_owned) =
4510  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4511  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4512  const auto qual = fold_expr(filter_expr.get());
4513  target_exprs_owned_.insert(
4514  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4515  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4516  filter->setOutputMetainfo(in_metainfo);
4517  const auto rewritten_qual = rewrite_expr(qual.get());
4518  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(filter,
4519  cat_,
4520  std::nullopt,
4523  executor_,
4524  translator);
4525  auto query_hint = RegisteredQueryHint::defaults();
4526  if (query_dag_) {
4527  auto candidate = query_dag_->getQueryHint(filter);
4528  if (candidate) {
4529  query_hint = *candidate;
4530  }
4531  }
4532  return {{input_descs,
4533  input_col_descs,
4534  {},
4535  {rewritten_qual ? rewritten_qual : qual},
4536  {},
4537  {nullptr},
4538  target_exprs,
4539  nullptr,
4540  sort_info,
4541  0,
4542  query_hint,
4543  dag_info.extracted_dag,
4544  dag_info.hash_table_plan_dag,
4545  dag_info.table_id_to_node_map},
4546  filter,
4548  nullptr};
4549 }
4550 
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:1485
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:217
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
std::string cat(Ts &&...args)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1244
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
Definition: sqltypes.h:49
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3165
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ExecutorDeviceType
ErrorInfo getErrorDescription(const int32_t error_code)
void setForceNonInSituData()
Definition: RenderInfo.cpp:44
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:77
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1070
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:203
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
size_t size() const override
std::vector< size_t > outer_fragment_indices
static SpeculativeTopNBlacklist speculative_topn_blacklist_
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< unsigned > visitLeftDeepInnerJoin(const RelLeftDeepInnerJoin *left_deep_join_tree) const override
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:36
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
const std::list< Analyzer::OrderEntry > order_entries
PersistentStorageMgr * getPersistentStorageMgr() const
Definition: DataMgr.cpp:632
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:97
const RexScalar * getCondition() const
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5409
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:333
Definition: sqldefs.h:38
const QueryPlan extracted_dag
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1976
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
#define UNREACHABLE()
Definition: Logger.h:253
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:222
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1049
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
int64_t int_value_from_numbers_ptr(const SQLTypeInfo &type_info, const int8_t *data)
void prepare_string_dictionaries(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1169
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1165
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:249
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:4086
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1615
#define CHECK_GT(x, y)
Definition: Logger.h:221
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
double running_query_interrupt_freq
foreign_storage::ForeignStorageMgr * getForeignStorageMgr() const
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:299
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:40
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:479
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1166
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1167
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1164
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:226
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1155
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_from_table_reordering
Definition: Execute.cpp:86
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Classes representing a parse tree.
int get_logical_size() const
Definition: sqltypes.h:340
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
ExecutorType executor_type
bool g_enable_system_tables
Definition: SysCatalog.cpp:65
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1159
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
int count
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
const TableDescriptor * get_shard_for_key(const TableDescriptor *td, const Catalog_Namespace::Catalog &cat, const Fragmenter_Namespace::InsertData &data)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
SortAlgorithm
void set_parallelism_hints(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
TableIdToNodeMap table_id_to_node_map
size_t g_big_group_threshold
Definition: Execute.cpp:106
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1161
bool is_extracted_dag_valid(ExtractedPlanDag &dag)
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1162
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37