OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RelAlgExecutor.h"
20 #include "Parser/ParserNode.h"
35 #include "QueryEngine/RexVisitor.h"
39 #include "Shared/measure.h"
40 #include "Shared/misc.h"
41 #include "Shared/shard_key.h"
42 
43 #include <boost/algorithm/cxx11/any_of.hpp>
44 #include <boost/range/adaptor/reversed.hpp>
45 
46 #include <algorithm>
47 #include <functional>
48 #include <numeric>
49 
51 bool g_enable_interop{false};
52 bool g_enable_union{false};
53 
54 extern bool g_enable_bump_allocator;
55 
56 namespace {
57 
58 bool node_is_aggregate(const RelAlgNode* ra) {
59  const auto compound = dynamic_cast<const RelCompound*>(ra);
60  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
61  return ((compound && compound->isAggregate()) || aggregate);
62 }
63 
64 std::unordered_set<PhysicalInput> get_physical_inputs(
66  const RelAlgNode* ra) {
67  auto phys_inputs = get_physical_inputs(ra);
68  std::unordered_set<PhysicalInput> phys_inputs2;
69  for (auto& phi : phys_inputs) {
70  phys_inputs2.insert(
71  PhysicalInput{cat.getColumnIdBySpi(phi.table_id, phi.col_id), phi.table_id});
72  }
73  return phys_inputs2;
74 }
75 
76 void set_parallelism_hints(const RelAlgNode& ra_node,
77  const Catalog_Namespace::Catalog& catalog) {
78  std::map<ChunkKey, std::set<foreign_storage::ForeignStorageMgr::ParallelismHint>>
79  parallelism_hints_per_table;
80  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
81  int table_id = physical_input.table_id;
82  auto table = catalog.getMetadataForTable(table_id, false);
83  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
84  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
85  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
86  auto foreign_table = catalog.getForeignTable(table_id);
87  for (const auto& fragment :
88  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
89  Chunk_NS::Chunk chunk{col_desc};
90  ChunkKey chunk_key = {
91  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
92  // do not include chunk hints that are in CPU memory
93  if (!chunk.isChunkOnDevice(
94  &catalog.getDataMgr(), chunk_key, Data_Namespace::CPU_LEVEL, 0)) {
95  parallelism_hints_per_table[{catalog.getDatabaseId(), table_id}].insert(
97  fragment.fragmentId});
98  }
99  }
100  }
101  }
102  if (!parallelism_hints_per_table.empty()) {
103  auto foreign_storage_mgr =
105  CHECK(foreign_storage_mgr);
106  foreign_storage_mgr->setParallelismHints(parallelism_hints_per_table);
107  }
108 }
109 
111  const Catalog_Namespace::Catalog& catalog) {
112  for (const auto& physical_input : get_physical_inputs(&ra_node)) {
113  int table_id = physical_input.table_id;
114  auto table = catalog.getMetadataForTable(table_id, false);
115  if (table && table->storageType == StorageType::FOREIGN_TABLE) {
116  int col_id = catalog.getColumnIdBySpi(table_id, physical_input.col_id);
117  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
118  auto foreign_table = catalog.getForeignTable(table_id);
119  if (col_desc->columnType.is_dict_encoded_type()) {
120  CHECK(foreign_table->fragmenter != nullptr);
121  for (const auto& fragment :
122  foreign_table->fragmenter->getFragmentsForQuery().fragments) {
123  ChunkKey chunk_key = {
124  catalog.getDatabaseId(), table_id, col_id, fragment.fragmentId};
125  const ChunkMetadataMap& metadata_map = fragment.getChunkMetadataMap();
126  CHECK(metadata_map.find(col_id) != metadata_map.end());
127  if (foreign_storage::is_metadata_placeholder(*(metadata_map.at(col_id)))) {
128  // When this goes out of scope it will stay in CPU cache but become
129  // evictable
130  std::shared_ptr<Chunk_NS::Chunk> chunk =
131  Chunk_NS::Chunk::getChunk(col_desc,
132  &(catalog.getDataMgr()),
133  chunk_key,
135  0,
136  0,
137  0);
138  }
139  }
140  }
141  }
142  }
143 }
144 
146  const Catalog_Namespace::Catalog& catalog) {
147  // Iterate through ra_node inputs for types that need to be loaded pre-execution
148  // If they do not have valid metadata, load them into CPU memory to generate
149  // the metadata and leave them ready to be used by the query
150  set_parallelism_hints(ra_node, catalog);
151  prepare_string_dictionaries(ra_node, catalog);
152 }
153 
154 } // namespace
155 
157  const ExecutionOptions& eo) {
158  if (eo.find_push_down_candidates) {
159  return 0;
160  }
161 
162  if (eo.just_explain) {
163  return 0;
164  }
165 
166  CHECK(query_dag_);
167 
168  query_dag_->resetQueryExecutionState();
169  const auto& ra = query_dag_->getRootNode();
170 
171  auto lock = executor_->acquireExecuteMutex();
172  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
173  const auto phys_inputs = get_physical_inputs(cat_, &ra);
174  const auto phys_table_ids = get_physical_table_inputs(&ra);
175  executor_->setCatalog(&cat_);
176  executor_->setupCaching(phys_inputs, phys_table_ids);
177 
178  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
179  auto ed_seq = RaExecutionSequence(&ra);
180 
181  if (!getSubqueries().empty()) {
182  return 0;
183  }
184 
185  CHECK(!ed_seq.empty());
186  if (ed_seq.size() > 1) {
187  return 0;
188  }
189 
192  executor_->catalog_ = &cat_;
193  executor_->temporary_tables_ = &temporary_tables_;
194 
196  auto exec_desc_ptr = ed_seq.getDescriptor(0);
197  CHECK(exec_desc_ptr);
198  auto& exec_desc = *exec_desc_ptr;
199  const auto body = exec_desc.getBody();
200  if (body->isNop()) {
201  return 0;
202  }
203 
204  const auto project = dynamic_cast<const RelProject*>(body);
205  if (project) {
206  auto work_unit =
207  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
208 
209  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
210  executor_);
211  }
212 
213  const auto compound = dynamic_cast<const RelCompound*>(body);
214  if (compound) {
215  if (compound->isDeleteViaSelect()) {
216  return 0;
217  } else if (compound->isUpdateViaSelect()) {
218  return 0;
219  } else {
220  if (compound->isAggregate()) {
221  return 0;
222  }
223 
224  const auto work_unit =
225  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
226 
227  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
228  executor_);
229  }
230  }
231 
232  return 0;
233 }
234 
236  const ExecutionOptions& eo,
237  const bool just_explain_plan,
238  RenderInfo* render_info) {
239  CHECK(query_dag_);
240  auto timer = DEBUG_TIMER(__func__);
242 
243  auto run_query = [&](const CompilationOptions& co_in) {
244  auto execution_result =
245  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
247  VLOG(1) << "Running post execution callback.";
248  (*post_execution_callback_)();
249  }
250  return execution_result;
251  };
252 
253  try {
254  return run_query(co);
255  } catch (const QueryMustRunOnCpu&) {
256  if (!g_allow_cpu_retry) {
257  throw;
258  }
259  }
260  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
261  auto co_cpu = CompilationOptions::makeCpuOnly(co);
262 
263  if (render_info) {
264  render_info->setForceNonInSituData();
265  }
266  return run_query(co_cpu);
267 }
268 
270  const ExecutionOptions& eo,
271  const bool just_explain_plan,
272  RenderInfo* render_info) {
274  auto timer = DEBUG_TIMER(__func__);
275  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
276 
277  query_dag_->resetQueryExecutionState();
278  const auto& ra = query_dag_->getRootNode();
279 
280  // capture the lock acquistion time
281  auto clock_begin = timer_start();
283  executor_->resetInterrupt();
284  }
285  std::string query_session{""};
286  std::string query_str{"N/A"};
287  std::string query_submitted_time{""};
288  // gather necessary query's info
289  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
290  query_session = query_state_->getConstSessionInfo()->get_session_id();
291  query_str = query_state_->getQueryStr();
292  query_submitted_time = query_state_->getQuerySubmittedTime();
293  }
294 
295  bool acquire_spin_lock = false;
296  auto validate_or_explain_query =
297  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
298  ScopeGuard clearRuntimeInterruptStatus = [this,
299  &query_session,
300  &render_info,
301  &eo,
302  &acquire_spin_lock,
303  &query_submitted_time,
304  &validate_or_explain_query] {
305  // reset the runtime query interrupt status after the end of query execution
306  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
307  !validate_or_explain_query) {
308  executor_->clearQuerySessionStatus(
309  query_session, query_submitted_time, acquire_spin_lock);
310  }
311  };
312 
313  if (!render_info && eo.allow_runtime_query_interrupt && !validate_or_explain_query) {
314  // if we reach here, the current query which was waiting an idle executor
315  // within the dispatch queue is now scheduled to the specific executor
316  // (not UNITARY_EXECUTOR)
317  // so we update the query session's status with the executor that takes this query
318  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
319  query_session, query_str, query_submitted_time);
320 
321  // For now we can run a single query at a time, so if other query is already
322  // executed by different executor (i.e., when we turn on parallel executor),
323  // we can check the possibility of running this query by using the spinlock.
324  // If it fails to acquire a lock, it sleeps {g_pending_query_interrupt_freq} ms.
325  // And then try to get the lock (and again and again...)
326  if (!query_session.empty()) {
327  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
328  try {
329  executor_->checkPendingQueryStatus(query_session);
330  } catch (QueryExecutionError& e) {
332  throw std::runtime_error(
333  "Query execution has been interrupted (pending query)");
334  }
335  throw e;
336  } catch (...) {
337  throw std::runtime_error("Checking pending query status failed: unknown error");
338  }
339  // here it fails to acquire the lock, so sleep...
340  std::this_thread::sleep_for(
341  std::chrono::milliseconds(g_pending_query_interrupt_freq));
342  }
343  acquire_spin_lock = true;
344  // now the query is going to be executed, so update the status as "RUNNING"
345  executor_->updateQuerySessionStatus(query_state_,
346  QuerySessionStatus::QueryStatus::RUNNING);
347  }
348  }
349  auto acquire_execute_mutex = [](Executor * executor) -> auto {
350  auto ret = executor->acquireExecuteMutex();
351  return ret;
352  };
353  // now we get the spinlock that means this query is ready to run by the executor
354  // so we acquire executor lock in here to make sure that this executor holds
355  // all necessary resources and at the same time protect them against other executor
356  auto lock = acquire_execute_mutex(executor_);
357 
358  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
359  !validate_or_explain_query) {
360  // check whether this query session is "already" interrupted
361  // this case occurs when there is very short gap between being interrupted and
362  // taking the execute lock
363  // for instance, the session can fire multiple queries that other queries are waiting
364  // in the spinlock but the user can request the query interruption
365  // if so we have to remove "all" queries initiated by this session and we do in here
366  // without running the query
367  try {
368  executor_->checkPendingQueryStatus(query_session);
369  } catch (QueryExecutionError& e) {
371  throw std::runtime_error("Query execution has been interrupted (pending query)");
372  }
373  throw e;
374  } catch (...) {
375  throw std::runtime_error("Checking pending query status failed: unknown error");
376  }
377  }
378 
379  // Notify foreign tables to load prior to caching
381 
382  int64_t queue_time_ms = timer_stop(clock_begin);
383  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
384  const auto phys_inputs = get_physical_inputs(cat_, &ra);
385  const auto phys_table_ids = get_physical_table_inputs(&ra);
386  executor_->setCatalog(&cat_);
387  executor_->setupCaching(phys_inputs, phys_table_ids);
388 
389  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
390  auto ed_seq = RaExecutionSequence(&ra);
391 
392  if (just_explain_plan) {
393  std::stringstream ss;
394  std::vector<const RelAlgNode*> nodes;
395  for (size_t i = 0; i < ed_seq.size(); i++) {
396  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
397  }
398  size_t ctr = nodes.size();
399  size_t tab_ctr = 0;
400  for (auto& body : boost::adaptors::reverse(nodes)) {
401  const auto index = ctr--;
402  const auto tabs = std::string(tab_ctr++, '\t');
403  CHECK(body);
404  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
405  if (auto sort = dynamic_cast<const RelSort*>(body)) {
406  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
407  }
408  if (dynamic_cast<const RelProject*>(body) ||
409  dynamic_cast<const RelCompound*>(body)) {
410  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
411  ss << tabs << " : " << join->toString() << "\n";
412  }
413  }
414  }
415  const auto& subqueries = getSubqueries();
416  if (!subqueries.empty()) {
417  ss << "Subqueries: "
418  << "\n";
419  for (const auto& subquery : subqueries) {
420  const auto ra = subquery->getRelAlg();
421  ss << "\t" << ra->toString() << "\n";
422  }
423  }
424  auto rs = std::make_shared<ResultSet>(ss.str());
425  return {rs, {}};
426  }
427 
428  if (render_info) {
429  // set render to be non-insitu in certain situations.
430  if (!render_info->disallow_in_situ_only_if_final_ED_is_aggregate &&
431  ed_seq.size() > 1) {
432  // old logic
433  // disallow if more than one ED
434  render_info->setInSituDataIfUnset(false);
435  }
436  }
437 
438  if (eo.find_push_down_candidates) {
439  // this extra logic is mainly due to current limitations on multi-step queries
440  // and/or subqueries.
442  ed_seq, co, eo, render_info, queue_time_ms);
443  }
444  timer_setup.stop();
445 
446  // Dispatch the subqueries first
447  for (auto subquery : getSubqueries()) {
448  const auto subquery_ra = subquery->getRelAlg();
449  CHECK(subquery_ra);
450  if (subquery_ra->hasContextData()) {
451  continue;
452  }
453  // Execute the subquery and cache the result.
454  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
455  RaExecutionSequence subquery_seq(subquery_ra);
456  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
457  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
458  }
459  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
460 }
461 
463  AggregatedColRange agg_col_range_cache;
464  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
465  return executor_->computeColRangesCache(phys_inputs);
466 }
467 
469  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
470  return executor_->computeStringDictionaryGenerations(phys_inputs);
471 }
472 
474  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
475  return executor_->computeTableGenerations(phys_table_ids);
476 }
477 
478 Executor* RelAlgExecutor::getExecutor() const {
479  return executor_;
480 }
481 
483  CHECK(executor_);
484  executor_->row_set_mem_owner_ = nullptr;
485 }
486 
487 namespace {
488 
490  CHECK_EQ(size_t(1), sort->inputCount());
491  const auto source = sort->getInput(0);
492  if (dynamic_cast<const RelSort*>(source)) {
493  throw std::runtime_error("Sort node not supported as input to another sort");
494  }
495 }
496 
497 } // namespace
498 
500  const RaExecutionSequence& seq,
501  const size_t step_idx,
502  const CompilationOptions& co,
503  const ExecutionOptions& eo,
504  RenderInfo* render_info) {
505  INJECT_TIMER(executeRelAlgQueryStep);
506 
507  auto exe_desc_ptr = seq.getDescriptor(step_idx);
508  CHECK(exe_desc_ptr);
509  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
510 
511  size_t shard_count{0};
512  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
513  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
514  };
515 
516  if (sort) {
518  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
520  source_work_unit.exe_unit, *executor_->getCatalog());
521  if (!shard_count) {
522  // No point in sorting on the leaf, only execute the input to the sort node.
523  CHECK_EQ(size_t(1), sort->inputCount());
524  const auto source = sort->getInput(0);
525  if (sort->collationCount() || node_is_aggregate(source)) {
526  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
527  CHECK_EQ(temp_seq.size(), size_t(1));
528  ExecutionOptions eo_copy = {
530  eo.allow_multifrag,
531  eo.just_explain,
532  eo.allow_loop_joins,
533  eo.with_watchdog,
534  eo.jit_debug,
535  eo.just_validate || sort->isEmptyResult(),
544  eo.executor_type,
545  };
546  // Use subseq to avoid clearing existing temporary tables
547  return {
548  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
549  merge_type(source),
550  source->getId(),
551  false};
552  }
553  }
554  }
557  std::make_pair(step_idx, step_idx + 1),
558  co,
559  eo,
560  render_info,
562  merge_type(exe_desc_ptr->getBody()),
563  exe_desc_ptr->getBody()->getId(),
564  false};
566  VLOG(1) << "Running post execution callback.";
567  (*post_execution_callback_)();
568  }
569  return result;
570 }
571 
573  const AggregatedColRange& agg_col_range,
574  const StringDictionaryGenerations& string_dictionary_generations,
575  const TableGenerations& table_generations) {
576  // capture the lock acquistion time
577  auto clock_begin = timer_start();
579  executor_->resetInterrupt();
580  }
581  queue_time_ms_ = timer_stop(clock_begin);
582  executor_->row_set_mem_owner_ =
583  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), cpu_threads());
584  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
585  executor_->table_generations_ = table_generations;
586  executor_->agg_col_range_cache_ = agg_col_range;
587 }
588 
590  const CompilationOptions& co,
591  const ExecutionOptions& eo,
592  RenderInfo* render_info,
593  const int64_t queue_time_ms,
594  const bool with_existing_temp_tables) {
596  auto timer = DEBUG_TIMER(__func__);
597  if (!with_existing_temp_tables) {
599  }
601  executor_->catalog_ = &cat_;
602  executor_->temporary_tables_ = &temporary_tables_;
603 
604  time(&now_);
605  CHECK(!seq.empty());
606 
607  auto get_descriptor_count = [&seq, &eo]() -> size_t {
608  if (eo.just_explain) {
609  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
610  // run the logical values descriptor to generate the result set, then the next
611  // descriptor to generate the explain
612  CHECK_GE(seq.size(), size_t(2));
613  return 2;
614  } else {
615  return 1;
616  }
617  } else {
618  return seq.size();
619  }
620  };
621 
622  const auto exec_desc_count = get_descriptor_count();
623 
624  for (size_t i = 0; i < exec_desc_count; i++) {
625  VLOG(1) << "Executing query step " << i;
626  // only render on the last step
627  try {
628  executeRelAlgStep(seq,
629  i,
630  co,
631  eo,
632  (i == exec_desc_count - 1) ? render_info : nullptr,
633  queue_time_ms);
634  } catch (const NativeExecutionError&) {
635  if (!g_enable_interop) {
636  throw;
637  }
638  auto eo_extern = eo;
639  eo_extern.executor_type = ::ExecutorType::Extern;
640  auto exec_desc_ptr = seq.getDescriptor(i);
641  const auto body = exec_desc_ptr->getBody();
642  const auto compound = dynamic_cast<const RelCompound*>(body);
643  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
644  LOG(INFO) << "Also failed to run the query using interoperability";
645  throw;
646  }
647  executeRelAlgStep(seq,
648  i,
649  co,
650  eo_extern,
651  (i == exec_desc_count - 1) ? render_info : nullptr,
652  queue_time_ms);
653  }
654  }
655 
656  return seq.getDescriptor(exec_desc_count - 1)->getResult();
657 }
658 
660  const RaExecutionSequence& seq,
661  const std::pair<size_t, size_t> interval,
662  const CompilationOptions& co,
663  const ExecutionOptions& eo,
664  RenderInfo* render_info,
665  const int64_t queue_time_ms) {
667  executor_->catalog_ = &cat_;
668  executor_->temporary_tables_ = &temporary_tables_;
669 
670  time(&now_);
671  for (size_t i = interval.first; i < interval.second; i++) {
672  // only render on the last step
673  executeRelAlgStep(seq,
674  i,
675  co,
676  eo,
677  (i == interval.second - 1) ? render_info : nullptr,
678  queue_time_ms);
679  }
680 
681  return seq.getDescriptor(interval.second - 1)->getResult();
682 }
683 
685  const size_t step_idx,
686  const CompilationOptions& co,
687  const ExecutionOptions& eo,
688  RenderInfo* render_info,
689  const int64_t queue_time_ms) {
691  auto timer = DEBUG_TIMER(__func__);
693  auto exec_desc_ptr = seq.getDescriptor(step_idx);
694  CHECK(exec_desc_ptr);
695  auto& exec_desc = *exec_desc_ptr;
696  const auto body = exec_desc.getBody();
697  if (body->isNop()) {
698  handleNop(exec_desc);
699  return;
700  }
701  const ExecutionOptions eo_work_unit{
703  eo.allow_multifrag,
704  eo.just_explain,
705  eo.allow_loop_joins,
706  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
707  eo.jit_debug,
708  eo.just_validate,
717  eo.executor_type,
718  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
719 
720  // Notify foreign tables to load prior to execution
722 
723  const auto compound = dynamic_cast<const RelCompound*>(body);
724  if (compound) {
725  if (compound->isDeleteViaSelect()) {
726  executeDelete(compound, co, eo_work_unit, queue_time_ms);
727  } else if (compound->isUpdateViaSelect()) {
728  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
729  } else {
730  exec_desc.setResult(
731  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
732  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
733  << static_cast<int>(-compound->getId()) << ", ...)"
734  << " exec_desc.getResult().getDataPtr()->rowCount()="
735  << exec_desc.getResult().getDataPtr()->rowCount();
736  if (exec_desc.getResult().isFilterPushDownEnabled()) {
737  return;
738  }
739  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
740  }
741  return;
742  }
743  const auto project = dynamic_cast<const RelProject*>(body);
744  if (project) {
745  if (project->isDeleteViaSelect()) {
746  executeDelete(project, co, eo_work_unit, queue_time_ms);
747  } else if (project->isUpdateViaSelect()) {
748  executeUpdate(project, co, eo_work_unit, queue_time_ms);
749  } else {
750  std::optional<size_t> prev_count;
751  // Disabling the intermediate count optimization in distributed, as the previous
752  // execution descriptor will likely not hold the aggregated result.
753  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
754  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
755  CHECK(prev_exec_desc);
756  RelAlgNode const* prev_body = prev_exec_desc->getBody();
757  // This optimization needs to be restricted in its application for UNION, which
758  // can have 2 input nodes in which neither should restrict the count of the other.
759  // However some non-UNION queries are measurably slower with this restriction, so
760  // it is only applied when g_enable_union is true.
761  bool const parent_check =
762  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
763  // If the previous node produced a reliable count, skip the pre-flight count
764  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
765  dynamic_cast<const RelLogicalValues*>(prev_body))) {
766  const auto& prev_exe_result = prev_exec_desc->getResult();
767  const auto prev_result = prev_exe_result.getRows();
768  if (prev_result) {
769  prev_count = prev_result->rowCount();
770  VLOG(3) << "Setting output row count for projection node to previous node ("
771  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
772  }
773  }
774  }
775  exec_desc.setResult(executeProject(
776  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
777  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
778  << static_cast<int>(-project->getId()) << ", ...)"
779  << " exec_desc.getResult().getDataPtr()->rowCount()="
780  << exec_desc.getResult().getDataPtr()->rowCount();
781  if (exec_desc.getResult().isFilterPushDownEnabled()) {
782  return;
783  }
784  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
785  }
786  return;
787  }
788  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
789  if (aggregate) {
790  exec_desc.setResult(
791  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
792  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
793  return;
794  }
795  const auto filter = dynamic_cast<const RelFilter*>(body);
796  if (filter) {
797  exec_desc.setResult(
798  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
799  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
800  return;
801  }
802  const auto sort = dynamic_cast<const RelSort*>(body);
803  if (sort) {
804  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
805  if (exec_desc.getResult().isFilterPushDownEnabled()) {
806  return;
807  }
808  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
809  return;
810  }
811  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
812  if (logical_values) {
813  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
814  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
815  return;
816  }
817  const auto modify = dynamic_cast<const RelModify*>(body);
818  if (modify) {
819  exec_desc.setResult(executeModify(modify, eo_work_unit));
820  return;
821  }
822  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
823  if (logical_union) {
824  exec_desc.setResult(
825  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
826  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
827  return;
828  }
829  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
830  if (table_func) {
831  exec_desc.setResult(
832  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
833  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
834  return;
835  }
836  LOG(FATAL) << "Unhandled body type: " << body->toString();
837 }
838 
840  // just set the result of the previous node as the result of no op
841  auto body = ed.getBody();
842  CHECK(dynamic_cast<const RelAggregate*>(body));
843  CHECK_EQ(size_t(1), body->inputCount());
844  const auto input = body->getInput(0);
845  body->setOutputMetainfo(input->getOutputMetainfo());
846  const auto it = temporary_tables_.find(-input->getId());
847  CHECK(it != temporary_tables_.end());
848  // set up temp table as it could be used by the outer query or next step
849  addTemporaryTable(-body->getId(), it->second);
850 
851  ed.setResult({it->second, input->getOutputMetainfo()});
852 }
853 
854 namespace {
855 
856 class RexUsedInputsVisitor : public RexVisitor<std::unordered_set<const RexInput*>> {
857  public:
859 
860  const std::vector<std::shared_ptr<RexInput>>& get_inputs_owned() const {
861  return synthesized_physical_inputs_owned;
862  }
863 
864  std::unordered_set<const RexInput*> visitInput(
865  const RexInput* rex_input) const override {
866  const auto input_ra = rex_input->getSourceNode();
867  CHECK(input_ra);
868  const auto scan_ra = dynamic_cast<const RelScan*>(input_ra);
869  if (scan_ra) {
870  const auto td = scan_ra->getTableDescriptor();
871  if (td) {
872  const auto col_id = rex_input->getIndex();
873  const auto cd = cat_.getMetadataForColumnBySpi(td->tableId, col_id + 1);
874  if (cd && cd->columnType.get_physical_cols() > 0) {
875  CHECK(IS_GEO(cd->columnType.get_type()));
876  std::unordered_set<const RexInput*> synthesized_physical_inputs;
877  for (auto i = 0; i < cd->columnType.get_physical_cols(); i++) {
878  auto physical_input =
879  new RexInput(scan_ra, SPIMAP_GEO_PHYSICAL_INPUT(col_id, i));
880  synthesized_physical_inputs_owned.emplace_back(physical_input);
881  synthesized_physical_inputs.insert(physical_input);
882  }
883  return synthesized_physical_inputs;
884  }
885  }
886  }
887  return {rex_input};
888  }
889 
890  protected:
891  std::unordered_set<const RexInput*> aggregateResult(
892  const std::unordered_set<const RexInput*>& aggregate,
893  const std::unordered_set<const RexInput*>& next_result) const override {
894  auto result = aggregate;
895  result.insert(next_result.begin(), next_result.end());
896  return result;
897  }
898 
899  private:
900  mutable std::vector<std::shared_ptr<RexInput>> synthesized_physical_inputs_owned;
902 };
903 
904 const RelAlgNode* get_data_sink(const RelAlgNode* ra_node) {
905  if (auto table_func = dynamic_cast<const RelTableFunction*>(ra_node)) {
906  return table_func;
907  }
908  if (auto join = dynamic_cast<const RelJoin*>(ra_node)) {
909  CHECK_EQ(size_t(2), join->inputCount());
910  return join;
911  }
912  if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
913  CHECK_EQ(size_t(1), ra_node->inputCount());
914  }
915  auto only_src = ra_node->getInput(0);
916  const bool is_join = dynamic_cast<const RelJoin*>(only_src) ||
917  dynamic_cast<const RelLeftDeepInnerJoin*>(only_src);
918  return is_join ? only_src : ra_node;
919 }
920 
921 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
923  RexUsedInputsVisitor visitor(cat);
924  const auto filter_expr = compound->getFilterExpr();
925  std::unordered_set<const RexInput*> used_inputs =
926  filter_expr ? visitor.visit(filter_expr) : std::unordered_set<const RexInput*>{};
927  const auto sources_size = compound->getScalarSourcesSize();
928  for (size_t i = 0; i < sources_size; ++i) {
929  const auto source_inputs = visitor.visit(compound->getScalarSource(i));
930  used_inputs.insert(source_inputs.begin(), source_inputs.end());
931  }
932  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
933  return std::make_pair(used_inputs, used_inputs_owned);
934 }
935 
936 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
938  CHECK_EQ(size_t(1), aggregate->inputCount());
939  std::unordered_set<const RexInput*> used_inputs;
940  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
941  const auto source = aggregate->getInput(0);
942  const auto& in_metainfo = source->getOutputMetainfo();
943  const auto group_count = aggregate->getGroupByCount();
944  CHECK_GE(in_metainfo.size(), group_count);
945  for (size_t i = 0; i < group_count; ++i) {
946  auto synthesized_used_input = new RexInput(source, i);
947  used_inputs_owned.emplace_back(synthesized_used_input);
948  used_inputs.insert(synthesized_used_input);
949  }
950  for (const auto& agg_expr : aggregate->getAggExprs()) {
951  for (size_t i = 0; i < agg_expr->size(); ++i) {
952  const auto operand_idx = agg_expr->getOperand(i);
953  CHECK_GE(in_metainfo.size(), static_cast<size_t>(operand_idx));
954  auto synthesized_used_input = new RexInput(source, operand_idx);
955  used_inputs_owned.emplace_back(synthesized_used_input);
956  used_inputs.insert(synthesized_used_input);
957  }
958  }
959  return std::make_pair(used_inputs, used_inputs_owned);
960 }
961 
962 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
964  RexUsedInputsVisitor visitor(cat);
965  std::unordered_set<const RexInput*> used_inputs;
966  for (size_t i = 0; i < project->size(); ++i) {
967  const auto proj_inputs = visitor.visit(project->getProjectAt(i));
968  used_inputs.insert(proj_inputs.begin(), proj_inputs.end());
969  }
970  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
971  return std::make_pair(used_inputs, used_inputs_owned);
972 }
973 
974 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
977  RexUsedInputsVisitor visitor(cat);
978  std::unordered_set<const RexInput*> used_inputs;
979  for (size_t i = 0; i < table_func->getTableFuncInputsSize(); ++i) {
980  const auto table_func_inputs = visitor.visit(table_func->getTableFuncInputAt(i));
981  used_inputs.insert(table_func_inputs.begin(), table_func_inputs.end());
982  }
983  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
984  return std::make_pair(used_inputs, used_inputs_owned);
985 }
986 
987 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
989  std::unordered_set<const RexInput*> used_inputs;
990  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
991  const auto data_sink_node = get_data_sink(filter);
992  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
993  const auto source = data_sink_node->getInput(nest_level);
994  const auto scan_source = dynamic_cast<const RelScan*>(source);
995  if (scan_source) {
996  CHECK(source->getOutputMetainfo().empty());
997  for (size_t i = 0; i < scan_source->size(); ++i) {
998  auto synthesized_used_input = new RexInput(scan_source, i);
999  used_inputs_owned.emplace_back(synthesized_used_input);
1000  used_inputs.insert(synthesized_used_input);
1001  }
1002  } else {
1003  const auto& partial_in_metadata = source->getOutputMetainfo();
1004  for (size_t i = 0; i < partial_in_metadata.size(); ++i) {
1005  auto synthesized_used_input = new RexInput(source, i);
1006  used_inputs_owned.emplace_back(synthesized_used_input);
1007  used_inputs.insert(synthesized_used_input);
1008  }
1009  }
1010  }
1011  return std::make_pair(used_inputs, used_inputs_owned);
1012 }
1013 
1014 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1016  std::unordered_set<const RexInput*> used_inputs(logical_union->inputCount());
1017  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1018  used_inputs_owned.reserve(logical_union->inputCount());
1019  VLOG(3) << "logical_union->inputCount()=" << logical_union->inputCount();
1020  auto const n_inputs = logical_union->inputCount();
1021  for (size_t nest_level = 0; nest_level < n_inputs; ++nest_level) {
1022  auto input = logical_union->getInput(nest_level);
1023  for (size_t i = 0; i < input->size(); ++i) {
1024  used_inputs_owned.emplace_back(std::make_shared<RexInput>(input, i));
1025  used_inputs.insert(used_inputs_owned.back().get());
1026  }
1027  }
1028  return std::make_pair(std::move(used_inputs), std::move(used_inputs_owned));
1029 }
1030 
1031 int table_id_from_ra(const RelAlgNode* ra_node) {
1032  const auto scan_ra = dynamic_cast<const RelScan*>(ra_node);
1033  if (scan_ra) {
1034  const auto td = scan_ra->getTableDescriptor();
1035  CHECK(td);
1036  return td->tableId;
1037  }
1038  return -ra_node->getId();
1039 }
1040 
1041 std::unordered_map<const RelAlgNode*, int> get_input_nest_levels(
1042  const RelAlgNode* ra_node,
1043  const std::vector<size_t>& input_permutation) {
1044  const auto data_sink_node = get_data_sink(ra_node);
1045  std::unordered_map<const RelAlgNode*, int> input_to_nest_level;
1046  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1047  const auto input_node_idx =
1048  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1049  const auto input_ra = data_sink_node->getInput(input_node_idx);
1050  // Having a non-zero mapped value (input_idx) results in the query being interpretted
1051  // as a JOIN within CodeGenerator::codegenColVar() due to rte_idx being set to the
1052  // mapped value (input_idx) which originates here. This would be incorrect for UNION.
1053  size_t const idx = dynamic_cast<const RelLogicalUnion*>(ra_node) ? 0 : input_idx;
1054  const auto it_ok = input_to_nest_level.emplace(input_ra, idx);
1055  CHECK(it_ok.second);
1056  LOG_IF(INFO, !input_permutation.empty())
1057  << "Assigned input " << input_ra->toString() << " to nest level " << input_idx;
1058  }
1059  return input_to_nest_level;
1060 }
1061 
1062 std::pair<std::unordered_set<const RexInput*>, std::vector<std::shared_ptr<RexInput>>>
1065  const auto data_sink_node = get_data_sink(ra_node);
1066  if (auto join = dynamic_cast<const RelJoin*>(data_sink_node)) {
1067  CHECK_EQ(join->inputCount(), 2u);
1068  const auto condition = join->getCondition();
1069  RexUsedInputsVisitor visitor(cat);
1070  auto condition_inputs = visitor.visit(condition);
1071  std::vector<std::shared_ptr<RexInput>> condition_inputs_owned(
1072  visitor.get_inputs_owned());
1073  return std::make_pair(condition_inputs, condition_inputs_owned);
1074  }
1075 
1076  if (auto left_deep_join = dynamic_cast<const RelLeftDeepInnerJoin*>(data_sink_node)) {
1077  CHECK_GE(left_deep_join->inputCount(), 2u);
1078  const auto condition = left_deep_join->getInnerCondition();
1079  RexUsedInputsVisitor visitor(cat);
1080  auto result = visitor.visit(condition);
1081  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
1082  ++nesting_level) {
1083  const auto outer_condition = left_deep_join->getOuterCondition(nesting_level);
1084  if (outer_condition) {
1085  const auto outer_result = visitor.visit(outer_condition);
1086  result.insert(outer_result.begin(), outer_result.end());
1087  }
1088  }
1089  std::vector<std::shared_ptr<RexInput>> used_inputs_owned(visitor.get_inputs_owned());
1090  return std::make_pair(result, used_inputs_owned);
1091  }
1092 
1093  if (dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1094  CHECK_GT(ra_node->inputCount(), 1u) << ra_node->toString();
1095  } else if (dynamic_cast<const RelTableFunction*>(ra_node)) {
1096  CHECK_GT(ra_node->inputCount(), 0u) << ra_node->toString();
1097  } else {
1098  CHECK_EQ(ra_node->inputCount(), 1u) << ra_node->toString();
1099  }
1100  return std::make_pair(std::unordered_set<const RexInput*>{},
1101  std::vector<std::shared_ptr<RexInput>>{});
1102 }
1103 
1105  std::vector<InputDescriptor>& input_descs,
1107  std::unordered_set<std::shared_ptr<const InputColDescriptor>>& input_col_descs_unique,
1108  const RelAlgNode* ra_node,
1109  const std::unordered_set<const RexInput*>& source_used_inputs,
1110  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
1111  VLOG(3) << "ra_node=" << ra_node->toString()
1112  << " input_col_descs_unique.size()=" << input_col_descs_unique.size()
1113  << " source_used_inputs.size()=" << source_used_inputs.size();
1114  for (const auto used_input : source_used_inputs) {
1115  const auto input_ra = used_input->getSourceNode();
1116  const int table_id = table_id_from_ra(input_ra);
1117  const auto col_id = used_input->getIndex();
1118  auto it = input_to_nest_level.find(input_ra);
1119  if (it != input_to_nest_level.end()) {
1120  const int input_desc = it->second;
1121  input_col_descs_unique.insert(std::make_shared<const InputColDescriptor>(
1122  dynamic_cast<const RelScan*>(input_ra)
1123  ? cat.getColumnIdBySpi(table_id, col_id + 1)
1124  : col_id,
1125  table_id,
1126  input_desc));
1127  } else if (!dynamic_cast<const RelLogicalUnion*>(ra_node)) {
1128  throw std::runtime_error("Bushy joins not supported");
1129  }
1130  }
1131 }
1132 
1133 template <class RA>
1134 std::pair<std::vector<InputDescriptor>,
1135  std::list<std::shared_ptr<const InputColDescriptor>>>
1136 get_input_desc_impl(const RA* ra_node,
1137  const std::unordered_set<const RexInput*>& used_inputs,
1138  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1139  const std::vector<size_t>& input_permutation,
1141  std::vector<InputDescriptor> input_descs;
1142  const auto data_sink_node = get_data_sink(ra_node);
1143  for (size_t input_idx = 0; input_idx < data_sink_node->inputCount(); ++input_idx) {
1144  const auto input_node_idx =
1145  input_permutation.empty() ? input_idx : input_permutation[input_idx];
1146  auto input_ra = data_sink_node->getInput(input_node_idx);
1147  const int table_id = table_id_from_ra(input_ra);
1148  input_descs.emplace_back(table_id, input_idx);
1149  }
1150  std::sort(input_descs.begin(),
1151  input_descs.end(),
1152  [](const InputDescriptor& lhs, const InputDescriptor& rhs) {
1153  return lhs.getNestLevel() < rhs.getNestLevel();
1154  });
1155  std::unordered_set<std::shared_ptr<const InputColDescriptor>> input_col_descs_unique;
1156  collect_used_input_desc(input_descs,
1157  cat,
1158  input_col_descs_unique, // modified
1159  ra_node,
1160  used_inputs,
1161  input_to_nest_level);
1162  std::unordered_set<const RexInput*> join_source_used_inputs;
1163  std::vector<std::shared_ptr<RexInput>> join_source_used_inputs_owned;
1164  std::tie(join_source_used_inputs, join_source_used_inputs_owned) =
1165  get_join_source_used_inputs(ra_node, cat);
1166  collect_used_input_desc(input_descs,
1167  cat,
1168  input_col_descs_unique, // modified
1169  ra_node,
1170  join_source_used_inputs,
1171  input_to_nest_level);
1172  std::vector<std::shared_ptr<const InputColDescriptor>> input_col_descs(
1173  input_col_descs_unique.begin(), input_col_descs_unique.end());
1174 
1175  std::sort(input_col_descs.begin(),
1176  input_col_descs.end(),
1177  [](std::shared_ptr<const InputColDescriptor> const& lhs,
1178  std::shared_ptr<const InputColDescriptor> const& rhs) {
1179  return std::make_tuple(lhs->getScanDesc().getNestLevel(),
1180  lhs->getColId(),
1181  lhs->getScanDesc().getTableId()) <
1182  std::make_tuple(rhs->getScanDesc().getNestLevel(),
1183  rhs->getColId(),
1184  rhs->getScanDesc().getTableId());
1185  });
1186  return {input_descs,
1187  std::list<std::shared_ptr<const InputColDescriptor>>(input_col_descs.begin(),
1188  input_col_descs.end())};
1189 }
1190 
1191 template <class RA>
1192 std::tuple<std::vector<InputDescriptor>,
1193  std::list<std::shared_ptr<const InputColDescriptor>>,
1194  std::vector<std::shared_ptr<RexInput>>>
1195 get_input_desc(const RA* ra_node,
1196  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
1197  const std::vector<size_t>& input_permutation,
1198  const Catalog_Namespace::Catalog& cat) {
1199  std::unordered_set<const RexInput*> used_inputs;
1200  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
1201  std::tie(used_inputs, used_inputs_owned) = get_used_inputs(ra_node, cat);
1202  VLOG(3) << "used_inputs.size() = " << used_inputs.size();
1203  auto input_desc_pair = get_input_desc_impl(
1204  ra_node, used_inputs, input_to_nest_level, input_permutation, cat);
1205  return std::make_tuple(
1206  input_desc_pair.first, input_desc_pair.second, used_inputs_owned);
1207 }
1208 
1209 size_t get_scalar_sources_size(const RelCompound* compound) {
1210  return compound->getScalarSourcesSize();
1211 }
1212 
1213 size_t get_scalar_sources_size(const RelProject* project) {
1214  return project->size();
1215 }
1216 
1217 size_t get_scalar_sources_size(const RelTableFunction* table_func) {
1218  return table_func->getTableFuncInputsSize();
1219 }
1220 
1221 const RexScalar* scalar_at(const size_t i, const RelCompound* compound) {
1222  return compound->getScalarSource(i);
1223 }
1224 
1225 const RexScalar* scalar_at(const size_t i, const RelProject* project) {
1226  return project->getProjectAt(i);
1227 }
1228 
1229 const RexScalar* scalar_at(const size_t i, const RelTableFunction* table_func) {
1230  return table_func->getTableFuncInputAt(i);
1231 }
1232 
1233 std::shared_ptr<Analyzer::Expr> set_transient_dict(
1234  const std::shared_ptr<Analyzer::Expr> expr) {
1235  const auto& ti = expr->get_type_info();
1236  if (!ti.is_string() || ti.get_compression() != kENCODING_NONE) {
1237  return expr;
1238  }
1239  auto transient_dict_ti = ti;
1240  transient_dict_ti.set_compression(kENCODING_DICT);
1241  transient_dict_ti.set_comp_param(TRANSIENT_DICT_ID);
1242  transient_dict_ti.set_fixed_size();
1243  return expr->add_cast(transient_dict_ti);
1244 }
1245 
1247  std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1248  const std::shared_ptr<Analyzer::Expr>& expr) {
1249  try {
1250  scalar_sources.push_back(set_transient_dict(fold_expr(expr.get())));
1251  } catch (...) {
1252  scalar_sources.push_back(fold_expr(expr.get()));
1253  }
1254 }
1255 
1256 std::shared_ptr<Analyzer::Expr> cast_dict_to_none(
1257  const std::shared_ptr<Analyzer::Expr>& input) {
1258  const auto& input_ti = input->get_type_info();
1259  if (input_ti.is_string() && input_ti.get_compression() == kENCODING_DICT) {
1260  return input->add_cast(SQLTypeInfo(kTEXT, input_ti.get_notnull()));
1261  }
1262  return input;
1263 }
1264 
1265 template <class RA>
1266 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources(
1267  const RA* ra_node,
1268  const RelAlgTranslator& translator,
1269  const ::ExecutorType executor_type) {
1270  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1271  const size_t scalar_sources_size = get_scalar_sources_size(ra_node);
1272  VLOG(3) << "get_scalar_sources_size(" << ra_node->toString()
1273  << ") = " << scalar_sources_size;
1274  for (size_t i = 0; i < scalar_sources_size; ++i) {
1275  const auto scalar_rex = scalar_at(i, ra_node);
1276  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1277  // RexRef are synthetic scalars we append at the end of the real ones
1278  // for the sake of taking memory ownership, no real work needed here.
1279  continue;
1280  }
1281 
1282  const auto scalar_expr =
1283  rewrite_array_elements(translator.translateScalarRex(scalar_rex).get());
1284  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1285  if (executor_type == ExecutorType::Native) {
1286  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1287  } else {
1288  scalar_sources.push_back(cast_dict_to_none(fold_expr(rewritten_expr.get())));
1289  }
1290  }
1291 
1292  return scalar_sources;
1293 }
1294 
1295 template <class RA>
1296 std::vector<std::shared_ptr<Analyzer::Expr>> translate_scalar_sources_for_update(
1297  const RA* ra_node,
1298  const RelAlgTranslator& translator,
1299  int32_t tableId,
1300  const Catalog_Namespace::Catalog& cat,
1301  const ColumnNameList& colNames,
1302  size_t starting_projection_column_idx) {
1303  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources;
1304  for (size_t i = 0; i < get_scalar_sources_size(ra_node); ++i) {
1305  const auto scalar_rex = scalar_at(i, ra_node);
1306  if (dynamic_cast<const RexRef*>(scalar_rex)) {
1307  // RexRef are synthetic scalars we append at the end of the real ones
1308  // for the sake of taking memory ownership, no real work needed here.
1309  continue;
1310  }
1311 
1312  std::shared_ptr<Analyzer::Expr> translated_expr;
1313  if (i >= starting_projection_column_idx && i < get_scalar_sources_size(ra_node) - 1) {
1314  translated_expr = cast_to_column_type(translator.translateScalarRex(scalar_rex),
1315  tableId,
1316  cat,
1317  colNames[i - starting_projection_column_idx]);
1318  } else {
1319  translated_expr = translator.translateScalarRex(scalar_rex);
1320  }
1321  const auto scalar_expr = rewrite_array_elements(translated_expr.get());
1322  const auto rewritten_expr = rewrite_expr(scalar_expr.get());
1323  set_transient_dict_maybe(scalar_sources, rewritten_expr);
1324  }
1325 
1326  return scalar_sources;
1327 }
1328 
1329 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1330  const RelCompound* compound,
1331  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1332  if (!compound->isAggregate()) {
1333  return {nullptr};
1334  }
1335  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1336  for (size_t group_idx = 0; group_idx < compound->getGroupByCount(); ++group_idx) {
1337  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1338  }
1339  return groupby_exprs;
1340 }
1341 
1342 std::list<std::shared_ptr<Analyzer::Expr>> translate_groupby_exprs(
1343  const RelAggregate* aggregate,
1344  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources) {
1345  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
1346  for (size_t group_idx = 0; group_idx < aggregate->getGroupByCount(); ++group_idx) {
1347  groupby_exprs.push_back(set_transient_dict(scalar_sources[group_idx]));
1348  }
1349  return groupby_exprs;
1350 }
1351 
1353  const RelAlgTranslator& translator) {
1354  const auto filter_rex = compound->getFilterExpr();
1355  const auto filter_expr =
1356  filter_rex ? translator.translateScalarRex(filter_rex) : nullptr;
1357  return filter_expr ? qual_to_conjunctive_form(fold_expr(filter_expr.get()))
1359 }
1360 
1361 std::vector<Analyzer::Expr*> translate_targets(
1362  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1363  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1364  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1365  const RelCompound* compound,
1366  const RelAlgTranslator& translator,
1367  const ExecutorType executor_type) {
1368  std::vector<Analyzer::Expr*> target_exprs;
1369  for (size_t i = 0; i < compound->size(); ++i) {
1370  const auto target_rex = compound->getTargetExpr(i);
1371  const auto target_rex_agg = dynamic_cast<const RexAgg*>(target_rex);
1372  std::shared_ptr<Analyzer::Expr> target_expr;
1373  if (target_rex_agg) {
1374  target_expr =
1375  RelAlgTranslator::translateAggregateRex(target_rex_agg, scalar_sources);
1376  } else {
1377  const auto target_rex_scalar = dynamic_cast<const RexScalar*>(target_rex);
1378  const auto target_rex_ref = dynamic_cast<const RexRef*>(target_rex_scalar);
1379  if (target_rex_ref) {
1380  const auto ref_idx = target_rex_ref->getIndex();
1381  CHECK_GE(ref_idx, size_t(1));
1382  CHECK_LE(ref_idx, groupby_exprs.size());
1383  const auto groupby_expr = *std::next(groupby_exprs.begin(), ref_idx - 1);
1384  target_expr = var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, ref_idx);
1385  } else {
1386  target_expr = translator.translateScalarRex(target_rex_scalar);
1387  auto rewritten_expr = rewrite_expr(target_expr.get());
1388  target_expr = fold_expr(rewritten_expr.get());
1389  if (executor_type == ExecutorType::Native) {
1390  try {
1391  target_expr = set_transient_dict(target_expr);
1392  } catch (...) {
1393  // noop
1394  }
1395  } else {
1396  target_expr = cast_dict_to_none(target_expr);
1397  }
1398  }
1399  }
1400  CHECK(target_expr);
1401  target_exprs_owned.push_back(target_expr);
1402  target_exprs.push_back(target_expr.get());
1403  }
1404  return target_exprs;
1405 }
1406 
1407 std::vector<Analyzer::Expr*> translate_targets(
1408  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned,
1409  const std::vector<std::shared_ptr<Analyzer::Expr>>& scalar_sources,
1410  const std::list<std::shared_ptr<Analyzer::Expr>>& groupby_exprs,
1411  const RelAggregate* aggregate,
1412  const RelAlgTranslator& translator) {
1413  std::vector<Analyzer::Expr*> target_exprs;
1414  size_t group_key_idx = 1;
1415  for (const auto& groupby_expr : groupby_exprs) {
1416  auto target_expr =
1417  var_ref(groupby_expr.get(), Analyzer::Var::kGROUPBY, group_key_idx++);
1418  target_exprs_owned.push_back(target_expr);
1419  target_exprs.push_back(target_expr.get());
1420  }
1421 
1422  for (const auto& target_rex_agg : aggregate->getAggExprs()) {
1423  auto target_expr =
1424  RelAlgTranslator::translateAggregateRex(target_rex_agg.get(), scalar_sources);
1425  CHECK(target_expr);
1426  target_expr = fold_expr(target_expr.get());
1427  target_exprs_owned.push_back(target_expr);
1428  target_exprs.push_back(target_expr.get());
1429  }
1430  return target_exprs;
1431 }
1432 
1434  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1435  return agg_expr && agg_expr->get_is_distinct();
1436 }
1437 
1438 bool is_agg(const Analyzer::Expr* expr) {
1439  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(expr);
1440  if (agg_expr && agg_expr->get_contains_agg()) {
1441  auto agg_type = agg_expr->get_aggtype();
1442  if (agg_type == SQLAgg::kMIN || agg_type == SQLAgg::kMAX ||
1443  agg_type == SQLAgg::kSUM || agg_type == SQLAgg::kAVG) {
1444  return true;
1445  }
1446  }
1447  return false;
1448 }
1449 
1451  if (is_count_distinct(&expr)) {
1452  return SQLTypeInfo(kBIGINT, false);
1453  } else if (is_agg(&expr)) {
1455  }
1456  return get_logical_type_info(expr.get_type_info());
1457 }
1458 
1459 template <class RA>
1460 std::vector<TargetMetaInfo> get_targets_meta(
1461  const RA* ra_node,
1462  const std::vector<Analyzer::Expr*>& target_exprs) {
1463  std::vector<TargetMetaInfo> targets_meta;
1464  CHECK_EQ(ra_node->size(), target_exprs.size());
1465  for (size_t i = 0; i < ra_node->size(); ++i) {
1466  CHECK(target_exprs[i]);
1467  // TODO(alex): remove the count distinct type fixup.
1468  targets_meta.emplace_back(ra_node->getFieldName(i),
1469  get_logical_type_for_expr(*target_exprs[i]),
1470  target_exprs[i]->get_type_info());
1471  }
1472  return targets_meta;
1473 }
1474 
1475 template <>
1476 std::vector<TargetMetaInfo> get_targets_meta(
1477  const RelFilter* filter,
1478  const std::vector<Analyzer::Expr*>& target_exprs) {
1479  RelAlgNode const* input0 = filter->getInput(0);
1480  if (auto const* input = dynamic_cast<RelCompound const*>(input0)) {
1481  return get_targets_meta(input, target_exprs);
1482  } else if (auto const* input = dynamic_cast<RelProject const*>(input0)) {
1483  return get_targets_meta(input, target_exprs);
1484  } else if (auto const* input = dynamic_cast<RelLogicalUnion const*>(input0)) {
1485  return get_targets_meta(input, target_exprs);
1486  } else if (auto const* input = dynamic_cast<RelAggregate const*>(input0)) {
1487  return get_targets_meta(input, target_exprs);
1488  } else if (auto const* input = dynamic_cast<RelScan const*>(input0)) {
1489  return get_targets_meta(input, target_exprs);
1490  }
1491  UNREACHABLE() << "Unhandled node type: " << input0->toString();
1492  return {};
1493 }
1494 
1495 } // namespace
1496 
1498  const CompilationOptions& co_in,
1499  const ExecutionOptions& eo_in,
1500  const int64_t queue_time_ms) {
1501  CHECK(node);
1502  auto timer = DEBUG_TIMER(__func__);
1503 
1505 
1506  auto co = co_in;
1507  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1508  // encoded string updates
1509 
1510  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1511  auto& work_unit,
1512  const bool is_aggregate) {
1514  std::make_unique<UpdateTransactionParameters>(node->getModifiedTableDescriptor(),
1515  node->getTargetColumns(),
1516  node->getOutputMetainfo(),
1517  node->isVarlenUpdateRequired());
1518 
1519  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1520 
1521  auto execute_update_ra_exe_unit = [this, &co, &eo_in, &table_infos](
1522  const RelAlgExecutionUnit& ra_exe_unit,
1523  const bool is_aggregate) {
1525 
1526  auto eo = eo_in;
1527  if (dml_transaction_parameters_->tableIsTemporary()) {
1528  eo.output_columnar_hint = true;
1529  co_project.allow_lazy_fetch = false;
1530  co_project.filter_on_deleted_column =
1531  false; // project the entire delete column for columnar update
1532  }
1533 
1534  auto update_transaction_parameters =
1536  CHECK(update_transaction_parameters);
1537  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1538  try {
1539  auto table_update_metadata =
1540  executor_->executeUpdate(ra_exe_unit,
1541  table_infos,
1542  co_project,
1543  eo,
1544  cat_,
1545  executor_->row_set_mem_owner_,
1546  update_callback,
1547  is_aggregate);
1548  post_execution_callback_ = [table_update_metadata, this]() {
1549  dml_transaction_parameters_->finalizeTransaction(cat_);
1550  TableOptimizer table_optimizer{
1551  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1552  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1553  };
1554  } catch (const QueryExecutionError& e) {
1555  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1556  }
1557  };
1558 
1559  if (dml_transaction_parameters_->tableIsTemporary()) {
1560  // hold owned target exprs during execution if rewriting
1561  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1562  // rewrite temp table updates to generate the full column by moving the where
1563  // clause into a case if such a rewrite is not possible, bail on the update
1564  // operation build an expr for the update target
1565  auto update_transaction_params =
1567  CHECK(update_transaction_params);
1568  const auto td = update_transaction_params->getTableDescriptor();
1569  CHECK(td);
1570  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1571  if (update_column_names.size() > 1) {
1572  throw std::runtime_error(
1573  "Multi-column update is not yet supported for temporary tables.");
1574  }
1575 
1576  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1577  CHECK(cd);
1578  auto projected_column_to_update =
1579  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1580  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1581  work_unit.exe_unit, projected_column_to_update);
1582  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1583  throw std::runtime_error(
1584  "Variable length updates not yet supported on temporary tables.");
1585  }
1586  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1587  } else {
1588  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1589  }
1590  };
1591 
1592  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1593  auto work_unit =
1594  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1595 
1596  execute_update_for_node(compound, work_unit, compound->isAggregate());
1597  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1598  auto work_unit =
1599  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1600 
1601  if (project->isSimple()) {
1602  CHECK_EQ(size_t(1), project->inputCount());
1603  const auto input_ra = project->getInput(0);
1604  if (dynamic_cast<const RelSort*>(input_ra)) {
1605  const auto& input_table =
1606  get_temporary_table(&temporary_tables_, -input_ra->getId());
1607  CHECK(input_table);
1608  work_unit.exe_unit.scan_limit = input_table->rowCount();
1609  }
1610  }
1611 
1612  execute_update_for_node(project, work_unit, false);
1613  } else {
1614  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1615  }
1616 }
1617 
1619  const CompilationOptions& co,
1620  const ExecutionOptions& eo_in,
1621  const int64_t queue_time_ms) {
1622  CHECK(node);
1623  auto timer = DEBUG_TIMER(__func__);
1624 
1626 
1627  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1628  auto& work_unit,
1629  const bool is_aggregate) {
1630  auto* table_descriptor = node->getModifiedTableDescriptor();
1631  CHECK(table_descriptor);
1632  if (!table_descriptor->hasDeletedCol) {
1633  throw std::runtime_error(
1634  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1635  }
1636 
1637  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1638 
1639  auto execute_delete_ra_exe_unit =
1640  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1641  const bool is_aggregate) {
1643  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1644  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1646  CHECK(delete_params);
1647  auto delete_callback = yieldDeleteCallback(*delete_params);
1649 
1650  auto eo = eo_in;
1651  if (dml_transaction_parameters_->tableIsTemporary()) {
1652  eo.output_columnar_hint = true;
1653  co_delete.filter_on_deleted_column =
1654  false; // project the entire delete column for columnar update
1655  } else {
1656  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1657  }
1658 
1659  try {
1660  auto table_update_metadata =
1661  executor_->executeUpdate(exe_unit,
1662  table_infos,
1663  co_delete,
1664  eo,
1665  cat_,
1666  executor_->row_set_mem_owner_,
1667  delete_callback,
1668  is_aggregate);
1669  post_execution_callback_ = [table_update_metadata, this]() {
1670  dml_transaction_parameters_->finalizeTransaction(cat_);
1671  TableOptimizer table_optimizer{
1672  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1673  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1674  };
1675  } catch (const QueryExecutionError& e) {
1676  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1677  }
1678  };
1679 
1680  if (table_is_temporary(table_descriptor)) {
1681  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1682  auto cd = cat_.getDeletedColumn(table_descriptor);
1683  CHECK(cd);
1684  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1685  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1686  const auto rewritten_exe_unit =
1687  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1688  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1689  } else {
1690  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1691  }
1692  };
1693 
1694  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1695  const auto work_unit =
1696  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1697  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1698  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1699  auto work_unit =
1700  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1701  if (project->isSimple()) {
1702  CHECK_EQ(size_t(1), project->inputCount());
1703  const auto input_ra = project->getInput(0);
1704  if (dynamic_cast<const RelSort*>(input_ra)) {
1705  const auto& input_table =
1706  get_temporary_table(&temporary_tables_, -input_ra->getId());
1707  CHECK(input_table);
1708  work_unit.exe_unit.scan_limit = input_table->rowCount();
1709  }
1710  }
1711  execute_delete_for_node(project, work_unit, false);
1712  } else {
1713  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1714  }
1715 }
1716 
1718  const CompilationOptions& co,
1719  const ExecutionOptions& eo,
1720  RenderInfo* render_info,
1721  const int64_t queue_time_ms) {
1722  auto timer = DEBUG_TIMER(__func__);
1723  const auto work_unit =
1724  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1725  CompilationOptions co_compound = co;
1726  return executeWorkUnit(work_unit,
1727  compound->getOutputMetainfo(),
1728  compound->isAggregate(),
1729  co_compound,
1730  eo,
1731  render_info,
1732  queue_time_ms);
1733 }
1734 
1736  const CompilationOptions& co,
1737  const ExecutionOptions& eo,
1738  RenderInfo* render_info,
1739  const int64_t queue_time_ms) {
1740  auto timer = DEBUG_TIMER(__func__);
1741  const auto work_unit = createAggregateWorkUnit(
1742  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1743  return executeWorkUnit(work_unit,
1744  aggregate->getOutputMetainfo(),
1745  true,
1746  co,
1747  eo,
1748  render_info,
1749  queue_time_ms);
1750 }
1751 
1752 namespace {
1753 
1754 // Returns true iff the execution unit contains window functions.
1756  return std::any_of(ra_exe_unit.target_exprs.begin(),
1757  ra_exe_unit.target_exprs.end(),
1758  [](const Analyzer::Expr* expr) {
1759  return dynamic_cast<const Analyzer::WindowFunction*>(expr);
1760  });
1761 }
1762 
1763 } // namespace
1764 
1766  const RelProject* project,
1767  const CompilationOptions& co,
1768  const ExecutionOptions& eo,
1769  RenderInfo* render_info,
1770  const int64_t queue_time_ms,
1771  const std::optional<size_t> previous_count) {
1772  auto timer = DEBUG_TIMER(__func__);
1773  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1774  CompilationOptions co_project = co;
1775  if (project->isSimple()) {
1776  CHECK_EQ(size_t(1), project->inputCount());
1777  const auto input_ra = project->getInput(0);
1778  if (dynamic_cast<const RelSort*>(input_ra)) {
1779  co_project.device_type = ExecutorDeviceType::CPU;
1780  const auto& input_table =
1781  get_temporary_table(&temporary_tables_, -input_ra->getId());
1782  CHECK(input_table);
1783  work_unit.exe_unit.scan_limit =
1784  std::min(input_table->getLimit(), input_table->rowCount());
1785  }
1786  }
1787  return executeWorkUnit(work_unit,
1788  project->getOutputMetainfo(),
1789  false,
1790  co_project,
1791  eo,
1792  render_info,
1793  queue_time_ms,
1794  previous_count);
1795 }
1796 
1798  const CompilationOptions& co_in,
1799  const ExecutionOptions& eo,
1800  const int64_t queue_time_ms) {
1802  auto timer = DEBUG_TIMER(__func__);
1803 
1804  auto co = co_in;
1805 
1806  if (g_cluster) {
1807  throw std::runtime_error("Table functions not supported in distributed mode yet");
1808  }
1809  if (!g_enable_table_functions) {
1810  throw std::runtime_error("Table function support is disabled");
1811  }
1812  auto table_func_work_unit = createTableFunctionWorkUnit(
1813  table_func,
1814  eo.just_explain,
1815  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1816  const auto body = table_func_work_unit.body;
1817  CHECK(body);
1818 
1819  const auto table_infos =
1820  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1821 
1822  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1823  co.device_type,
1825  nullptr,
1826  executor_->getCatalog(),
1827  executor_->blockSize(),
1828  executor_->gridSize()),
1829  {}};
1830 
1831  try {
1832  result = {executor_->executeTableFunction(
1833  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1834  body->getOutputMetainfo()};
1835  } catch (const QueryExecutionError& e) {
1838  throw std::runtime_error("Table function ran out of memory during execution");
1839  }
1840  result.setQueueTime(queue_time_ms);
1841  return result;
1842 }
1843 
1844 namespace {
1845 
1846 // Creates a new expression which has the range table index set to 1. This is needed to
1847 // reuse the hash join construction helpers to generate a hash table for the window
1848 // function partition: create an equals expression with left and right sides identical
1849 // except for the range table index.
1850 std::shared_ptr<Analyzer::Expr> transform_to_inner(const Analyzer::Expr* expr) {
1851  const auto tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr);
1852  if (tuple) {
1853  std::vector<std::shared_ptr<Analyzer::Expr>> transformed_tuple;
1854  for (const auto& element : tuple->getTuple()) {
1855  transformed_tuple.push_back(transform_to_inner(element.get()));
1856  }
1857  return makeExpr<Analyzer::ExpressionTuple>(transformed_tuple);
1858  }
1859  const auto col = dynamic_cast<const Analyzer::ColumnVar*>(expr);
1860  if (!col) {
1861  throw std::runtime_error("Only columns supported in the window partition for now");
1862  }
1863  return makeExpr<Analyzer::ColumnVar>(
1864  col->get_type_info(), col->get_table_id(), col->get_column_id(), 1);
1865 }
1866 
1867 } // namespace
1868 
1870  const CompilationOptions& co,
1871  const ExecutionOptions& eo,
1872  ColumnCacheMap& column_cache_map,
1873  const int64_t queue_time_ms) {
1874  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1875  CHECK_EQ(query_infos.size(), size_t(1));
1876  if (query_infos.front().info.fragments.size() != 1) {
1877  throw std::runtime_error(
1878  "Only single fragment tables supported for window functions for now");
1879  }
1880  if (eo.executor_type == ::ExecutorType::Extern) {
1881  return;
1882  }
1883  query_infos.push_back(query_infos.front());
1884  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1885  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1886  ++target_index) {
1887  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1888  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1889  if (!window_func) {
1890  continue;
1891  }
1892  // Always use baseline layout hash tables for now, make the expression a tuple.
1893  const auto& partition_keys = window_func->getPartitionKeys();
1894  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1895  if (partition_keys.size() > 1) {
1896  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1897  } else {
1898  if (partition_keys.empty()) {
1899  throw std::runtime_error(
1900  "Empty window function partitions are not supported yet");
1901  }
1902  CHECK_EQ(partition_keys.size(), size_t(1));
1903  partition_key_tuple = partition_keys.front();
1904  }
1905  // Creates a tautology equality with the partition expression on both sides.
1906  const auto partition_key_cond =
1907  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1908  kBW_EQ,
1909  kONE,
1910  partition_key_tuple,
1911  transform_to_inner(partition_key_tuple.get()));
1912  auto context = createWindowFunctionContext(window_func,
1913  partition_key_cond,
1914  ra_exe_unit,
1915  query_infos,
1916  co,
1917  column_cache_map,
1918  executor_->getRowSetMemoryOwner());
1919  context->compute();
1920  window_project_node_context->addWindowFunctionContext(std::move(context),
1921  target_index);
1922  }
1923 }
1924 
1925 std::unique_ptr<WindowFunctionContext> RelAlgExecutor::createWindowFunctionContext(
1926  const Analyzer::WindowFunction* window_func,
1927  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
1928  const RelAlgExecutionUnit& ra_exe_unit,
1929  const std::vector<InputTableInfo>& query_infos,
1930  const CompilationOptions& co,
1931  ColumnCacheMap& column_cache_map,
1932  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
1933  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1936  const auto join_table_or_err =
1937  executor_->buildHashTableForQualifier(partition_key_cond,
1938  query_infos,
1939  memory_level,
1941  column_cache_map,
1942  ra_exe_unit.query_hint);
1943  if (!join_table_or_err.fail_reason.empty()) {
1944  throw std::runtime_error(join_table_or_err.fail_reason);
1945  }
1946  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
1947  const auto& order_keys = window_func->getOrderKeys();
1948  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1949  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1950  auto context = std::make_unique<WindowFunctionContext>(window_func,
1951  join_table_or_err.hash_table,
1952  elem_count,
1953  co.device_type,
1954  row_set_mem_owner);
1955  for (const auto& order_key : order_keys) {
1956  const auto order_col =
1957  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1958  if (!order_col) {
1959  throw std::runtime_error("Only order by columns supported for now");
1960  }
1961  const int8_t* column;
1962  size_t join_col_elem_count;
1963  std::tie(column, join_col_elem_count) =
1965  *order_col,
1966  query_infos.front().info.fragments.front(),
1967  memory_level,
1968  0,
1969  nullptr,
1970  /*thread_idx=*/0,
1971  chunks_owner,
1972  column_cache_map);
1973  CHECK_EQ(join_col_elem_count, elem_count);
1974  context->addOrderColumn(column, order_col.get(), chunks_owner);
1975  }
1976  return context;
1977 }
1978 
1980  const CompilationOptions& co,
1981  const ExecutionOptions& eo,
1982  RenderInfo* render_info,
1983  const int64_t queue_time_ms) {
1984  auto timer = DEBUG_TIMER(__func__);
1985  const auto work_unit =
1986  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1987  return executeWorkUnit(
1988  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1989 }
1990 
1991 bool sameTypeInfo(std::vector<TargetMetaInfo> const& lhs,
1992  std::vector<TargetMetaInfo> const& rhs) {
1993  if (lhs.size() == rhs.size()) {
1994  for (size_t i = 0; i < lhs.size(); ++i) {
1995  if (lhs[i].get_type_info() != rhs[i].get_type_info()) {
1996  return false;
1997  }
1998  }
1999  return true;
2000  }
2001  return false;
2002 }
2003 
2004 bool isGeometry(TargetMetaInfo const& target_meta_info) {
2005  return target_meta_info.get_type_info().is_geometry();
2006 }
2007 
2009  const RaExecutionSequence& seq,
2010  const CompilationOptions& co,
2011  const ExecutionOptions& eo,
2012  RenderInfo* render_info,
2013  const int64_t queue_time_ms) {
2014  auto timer = DEBUG_TIMER(__func__);
2015  if (!logical_union->isAll()) {
2016  throw std::runtime_error("UNION without ALL is not supported yet.");
2017  }
2018  // Will throw a std::runtime_error if types don't match.
2019  logical_union->checkForMatchingMetaInfoTypes();
2020  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2021  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2022  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2023  }
2024  // Only Projections and Aggregates from a UNION are supported for now.
2025  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
2026  if (node->hasInput(logical_union) &&
2027  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
2028  node)) {
2029  throw std::runtime_error("UNION ALL not yet supported in this context.");
2030  }
2031  });
2032 
2033  auto work_unit =
2034  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2035  return executeWorkUnit(work_unit,
2036  logical_union->getOutputMetainfo(),
2037  false,
2039  eo,
2040  render_info,
2041  queue_time_ms);
2042 }
2043 
2045  const RelLogicalValues* logical_values,
2046  const ExecutionOptions& eo) {
2047  auto timer = DEBUG_TIMER(__func__);
2049  logical_values->getNumRows(),
2051  /*is_table_function=*/false);
2052 
2053  auto tuple_type = logical_values->getTupleType();
2054  for (size_t i = 0; i < tuple_type.size(); ++i) {
2055  auto& target_meta_info = tuple_type[i];
2056  if (target_meta_info.get_type_info().is_varlen()) {
2057  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2058  }
2059  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2060  // replace w/ bigint
2061  tuple_type[i] =
2062  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2063  }
2064  query_mem_desc.addColSlotInfo(
2065  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2066  }
2067  logical_values->setOutputMetainfo(tuple_type);
2068 
2069  std::vector<TargetInfo> target_infos;
2070  for (const auto& tuple_type_component : tuple_type) {
2071  target_infos.emplace_back(TargetInfo{false,
2072  kCOUNT,
2073  tuple_type_component.get_type_info(),
2074  SQLTypeInfo(kNULLT, false),
2075  false,
2076  false});
2077  }
2078 
2079  std::shared_ptr<ResultSet> rs{
2080  ResultSetLogicalValuesBuilder{logical_values,
2081  target_infos,
2084  executor_->getRowSetMemoryOwner(),
2085  executor_}
2086  .build()};
2087 
2088  return {rs, tuple_type};
2089 }
2090 
2091 namespace {
2092 
2093 template <class T>
2094 int64_t insert_one_dict_str(T* col_data,
2095  const std::string& columnName,
2096  const SQLTypeInfo& columnType,
2097  const Analyzer::Constant* col_cv,
2098  const Catalog_Namespace::Catalog& catalog) {
2099  if (col_cv->get_is_null()) {
2100  *col_data = inline_fixed_encoding_null_val(columnType);
2101  } else {
2102  const int dict_id = columnType.get_comp_param();
2103  const auto col_datum = col_cv->get_constval();
2104  const auto& str = *col_datum.stringval;
2105  const auto dd = catalog.getMetadataForDict(dict_id);
2106  CHECK(dd && dd->stringDict);
2107  int32_t str_id = dd->stringDict->getOrAdd(str);
2108  if (!dd->dictIsTemp) {
2109  const auto checkpoint_ok = dd->stringDict->checkpoint();
2110  if (!checkpoint_ok) {
2111  throw std::runtime_error("Failed to checkpoint dictionary for column " +
2112  columnName);
2113  }
2114  }
2115  const bool invalid = str_id > max_valid_int_value<T>();
2116  if (invalid || str_id == inline_int_null_value<int32_t>()) {
2117  if (invalid) {
2118  LOG(ERROR) << "Could not encode string: " << str
2119  << ", the encoded value doesn't fit in " << sizeof(T) * 8
2120  << " bits. Will store NULL instead.";
2121  }
2122  str_id = inline_fixed_encoding_null_val(columnType);
2123  }
2124  *col_data = str_id;
2125  }
2126  return *col_data;
2127 }
2128 
2129 template <class T>
2130 int64_t insert_one_dict_str(T* col_data,
2131  const ColumnDescriptor* cd,
2132  const Analyzer::Constant* col_cv,
2133  const Catalog_Namespace::Catalog& catalog) {
2134  return insert_one_dict_str(col_data, cd->columnName, cd->columnType, col_cv, catalog);
2135 }
2136 
2137 } // namespace
2138 
2140  const ExecutionOptions& eo) {
2141  auto timer = DEBUG_TIMER(__func__);
2142  if (eo.just_explain) {
2143  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2144  }
2145 
2146  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2149  executor_->getRowSetMemoryOwner(),
2150  executor_->getCatalog(),
2151  executor_->blockSize(),
2152  executor_->gridSize());
2153 
2154  std::vector<TargetMetaInfo> empty_targets;
2155  return {rs, empty_targets};
2156 }
2157 
2158 namespace {
2159 int64_t int_value_from_numbers_ptr(const SQLTypeInfo& type_info, const int8_t* data) {
2160  size_t sz = 0;
2161  switch (type_info.get_type()) {
2162  case kTINYINT:
2163  case kSMALLINT:
2164  case kINT:
2165  case kBIGINT:
2166  case kTIMESTAMP:
2167  case kTIME:
2168  case kDATE:
2169  sz = type_info.get_logical_size();
2170  break;
2171  case kTEXT:
2172  case kVARCHAR:
2173  case kCHAR:
2174  CHECK(type_info.get_compression() == kENCODING_DICT);
2175  sz = type_info.get_size();
2176  break;
2177  default:
2178  CHECK(false) << "Unexpected sharding key datatype";
2179  }
2180 
2181  switch (sz) {
2182  case 1:
2183  return *(reinterpret_cast<const int8_t*>(data));
2184  case 2:
2185  return *(reinterpret_cast<const int16_t*>(data));
2186  case 4:
2187  return *(reinterpret_cast<const int32_t*>(data));
2188  case 8:
2189  return *(reinterpret_cast<const int64_t*>(data));
2190  default:
2191  CHECK(false);
2192  return 0;
2193  }
2194 }
2195 
2198  const Fragmenter_Namespace::InsertData& data) {
2199  auto shard_column_md = cat.getShardColumnMetadataForTable(td);
2200  CHECK(shard_column_md);
2201  auto sharded_column_id = shard_column_md->columnId;
2202  const TableDescriptor* shard{nullptr};
2203  for (size_t i = 0; i < data.columnIds.size(); ++i) {
2204  if (data.columnIds[i] == sharded_column_id) {
2205  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
2206  const auto shard_count = shard_tables.size();
2207  CHECK(data.data[i].numbersPtr);
2208  auto value = int_value_from_numbers_ptr(shard_column_md->columnType,
2209  data.data[i].numbersPtr);
2210  const size_t shard_idx = SHARD_FOR_KEY(value, shard_count);
2211  shard = shard_tables[shard_idx];
2212  break;
2213  }
2214  }
2215  return shard;
2216 }
2217 
2218 } // namespace
2219 
2221  // Note: We currently obtain an executor for this method, but we do not need it.
2222  // Therefore, we skip the executor state setup in the regular execution path. In the
2223  // future, we will likely want to use the executor to evaluate expressions in the insert
2224  // statement.
2225 
2226  const auto& targets = query.get_targetlist();
2227  const int table_id = query.get_result_table_id();
2228  const auto& col_id_list = query.get_result_col_list();
2229 
2230  std::vector<const ColumnDescriptor*> col_descriptors;
2231  std::vector<int> col_ids;
2232  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2233  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2234  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2235 
2236  for (const int col_id : col_id_list) {
2237  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2238  const auto col_enc = cd->columnType.get_compression();
2239  if (cd->columnType.is_string()) {
2240  switch (col_enc) {
2241  case kENCODING_NONE: {
2242  auto it_ok =
2243  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2244  CHECK(it_ok.second);
2245  break;
2246  }
2247  case kENCODING_DICT: {
2248  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2249  CHECK(dd);
2250  const auto it_ok = col_buffers.emplace(
2251  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2252  CHECK(it_ok.second);
2253  break;
2254  }
2255  default:
2256  CHECK(false);
2257  }
2258  } else if (cd->columnType.is_geometry()) {
2259  auto it_ok =
2260  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2261  CHECK(it_ok.second);
2262  } else if (cd->columnType.is_array()) {
2263  auto it_ok =
2264  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2265  CHECK(it_ok.second);
2266  } else {
2267  const auto it_ok = col_buffers.emplace(
2268  col_id,
2269  std::unique_ptr<uint8_t[]>(
2270  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2271  // the buffer
2272  CHECK(it_ok.second);
2273  }
2274  col_descriptors.push_back(cd);
2275  col_ids.push_back(col_id);
2276  }
2277  size_t col_idx = 0;
2279  insert_data.databaseId = cat_.getCurrentDB().dbId;
2280  insert_data.tableId = table_id;
2281  for (auto target_entry : targets) {
2282  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2283  if (!col_cv) {
2284  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2285  CHECK(col_cast);
2286  CHECK_EQ(kCAST, col_cast->get_optype());
2287  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2288  }
2289  CHECK(col_cv);
2290  const auto cd = col_descriptors[col_idx];
2291  auto col_datum = col_cv->get_constval();
2292  auto col_type = cd->columnType.get_type();
2293  uint8_t* col_data_bytes{nullptr};
2294  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2295  (!cd->columnType.is_string() ||
2296  cd->columnType.get_compression() == kENCODING_DICT)) {
2297  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2298  CHECK(col_data_bytes_it != col_buffers.end());
2299  col_data_bytes = col_data_bytes_it->second.get();
2300  }
2301  switch (col_type) {
2302  case kBOOLEAN: {
2303  auto col_data = col_data_bytes;
2304  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2305  : (col_datum.boolval ? 1 : 0);
2306  break;
2307  }
2308  case kTINYINT: {
2309  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2310  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2311  : col_datum.tinyintval;
2312  break;
2313  }
2314  case kSMALLINT: {
2315  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2316  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2317  : col_datum.smallintval;
2318  break;
2319  }
2320  case kINT: {
2321  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2322  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2323  : col_datum.intval;
2324  break;
2325  }
2326  case kBIGINT:
2327  case kDECIMAL:
2328  case kNUMERIC: {
2329  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2330  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2331  : col_datum.bigintval;
2332  break;
2333  }
2334  case kFLOAT: {
2335  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2336  *col_data = col_datum.floatval;
2337  break;
2338  }
2339  case kDOUBLE: {
2340  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2341  *col_data = col_datum.doubleval;
2342  break;
2343  }
2344  case kTEXT:
2345  case kVARCHAR:
2346  case kCHAR: {
2347  switch (cd->columnType.get_compression()) {
2348  case kENCODING_NONE:
2349  str_col_buffers[col_ids[col_idx]].push_back(
2350  col_datum.stringval ? *col_datum.stringval : "");
2351  break;
2352  case kENCODING_DICT: {
2353  switch (cd->columnType.get_size()) {
2354  case 1:
2356  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2357  break;
2358  case 2:
2360  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2361  break;
2362  case 4:
2364  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2365  break;
2366  default:
2367  CHECK(false);
2368  }
2369  break;
2370  }
2371  default:
2372  CHECK(false);
2373  }
2374  break;
2375  }
2376  case kTIME:
2377  case kTIMESTAMP:
2378  case kDATE: {
2379  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2380  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2381  : col_datum.bigintval;
2382  break;
2383  }
2384  case kARRAY: {
2385  const auto is_null = col_cv->get_is_null();
2386  const auto size = cd->columnType.get_size();
2387  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2388  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2389  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2390  if (is_null && !is_point_coords) {
2391  if (size > 0) {
2392  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2393  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2394  throw std::runtime_error("Column " + cd->columnName +
2395  " doesn't accept NULL values");
2396  }
2397  int8_t* buf = (int8_t*)checked_malloc(size);
2398  put_null_array(static_cast<void*>(buf), elem_ti, "");
2399  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2400  p += elem_ti.get_size()) {
2401  put_null(static_cast<void*>(p), elem_ti, "");
2402  }
2403  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2404  } else {
2405  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2406  }
2407  break;
2408  }
2409  const auto l = col_cv->get_value_list();
2410  size_t len = l.size() * elem_ti.get_size();
2411  if (size > 0 && static_cast<size_t>(size) != len) {
2412  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2413  std::to_string(size / elem_ti.get_size()) +
2414  " values, " + "received " + std::to_string(l.size()));
2415  }
2416  if (elem_ti.is_string()) {
2417  CHECK(kENCODING_DICT == elem_ti.get_compression());
2418  CHECK(4 == elem_ti.get_size());
2419 
2420  int8_t* buf = (int8_t*)checked_malloc(len);
2421  int32_t* p = reinterpret_cast<int32_t*>(buf);
2422 
2423  int elemIndex = 0;
2424  for (auto& e : l) {
2425  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2426  CHECK(c);
2427  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2428  elemIndex++;
2429  }
2430  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2431 
2432  } else {
2433  int8_t* buf = (int8_t*)checked_malloc(len);
2434  int8_t* p = buf;
2435  for (auto& e : l) {
2436  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2437  CHECK(c);
2438  p = appendDatum(p, c->get_constval(), elem_ti);
2439  }
2440  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2441  }
2442  break;
2443  }
2444  case kPOINT:
2445  case kLINESTRING:
2446  case kPOLYGON:
2447  case kMULTIPOLYGON:
2448  str_col_buffers[col_ids[col_idx]].push_back(
2449  col_datum.stringval ? *col_datum.stringval : "");
2450  break;
2451  default:
2452  CHECK(false);
2453  }
2454  ++col_idx;
2455  }
2456  for (const auto& kv : col_buffers) {
2457  insert_data.columnIds.push_back(kv.first);
2458  DataBlockPtr p;
2459  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2460  insert_data.data.push_back(p);
2461  }
2462  for (auto& kv : str_col_buffers) {
2463  insert_data.columnIds.push_back(kv.first);
2464  DataBlockPtr p;
2465  p.stringsPtr = &kv.second;
2466  insert_data.data.push_back(p);
2467  }
2468  for (auto& kv : arr_col_buffers) {
2469  insert_data.columnIds.push_back(kv.first);
2470  DataBlockPtr p;
2471  p.arraysPtr = &kv.second;
2472  insert_data.data.push_back(p);
2473  }
2474  insert_data.numRows = 1;
2475  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2476  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2477  CHECK(table_descriptor);
2478  if (table_descriptor->nShards > 0) {
2479  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2480  CHECK(shard);
2481  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2482  } else {
2483  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2484  }
2485 
2486  // Ensure checkpoint happens across all shards, if not in distributed
2487  // mode (aggregator handles checkpointing in distributed mode)
2488  if (!g_cluster &&
2489  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2490  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2491  }
2492 
2493  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2496  executor_->getRowSetMemoryOwner(),
2497  nullptr,
2498  0,
2499  0);
2500  std::vector<TargetMetaInfo> empty_targets;
2501  return {rs, empty_targets};
2502 }
2503 
2504 namespace {
2505 
2506 // TODO(alex): Once we're fully migrated to the relational algebra model, change
2507 // the executor interface to use the collation directly and remove this conversion.
2508 std::list<Analyzer::OrderEntry> get_order_entries(const RelSort* sort) {
2509  std::list<Analyzer::OrderEntry> result;
2510  for (size_t i = 0; i < sort->collationCount(); ++i) {
2511  const auto sort_field = sort->getCollation(i);
2512  result.emplace_back(sort_field.getField() + 1,
2513  sort_field.getSortDir() == SortDirection::Descending,
2514  sort_field.getNullsPosition() == NullSortedPosition::First);
2515  }
2516  return result;
2517 }
2518 
2519 size_t get_scan_limit(const RelAlgNode* ra, const size_t limit) {
2520  const auto aggregate = dynamic_cast<const RelAggregate*>(ra);
2521  if (aggregate) {
2522  return 0;
2523  }
2524  const auto compound = dynamic_cast<const RelCompound*>(ra);
2525  return (compound && compound->isAggregate()) ? 0 : limit;
2526 }
2527 
2528 bool first_oe_is_desc(const std::list<Analyzer::OrderEntry>& order_entries) {
2529  return !order_entries.empty() && order_entries.front().is_desc;
2530 }
2531 
2532 } // namespace
2533 
2535  const CompilationOptions& co,
2536  const ExecutionOptions& eo,
2537  RenderInfo* render_info,
2538  const int64_t queue_time_ms) {
2539  auto timer = DEBUG_TIMER(__func__);
2541  const auto source = sort->getInput(0);
2542  const bool is_aggregate = node_is_aggregate(source);
2543  auto it = leaf_results_.find(sort->getId());
2544  if (it != leaf_results_.end()) {
2545  // Add any transient string literals to the sdp on the agg
2546  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2548  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2549  // Handle push-down for LIMIT for multi-node
2550  auto& aggregated_result = it->second;
2551  auto& result_rows = aggregated_result.rs;
2552  const size_t limit = sort->getLimit();
2553  const size_t offset = sort->getOffset();
2554  const auto order_entries = get_order_entries(sort);
2555  if (limit || offset) {
2556  if (!order_entries.empty()) {
2557  result_rows->sort(order_entries, limit + offset, executor_);
2558  }
2559  result_rows->dropFirstN(offset);
2560  if (limit) {
2561  result_rows->keepFirstN(limit);
2562  }
2563  }
2564  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2565  sort->setOutputMetainfo(aggregated_result.targets_meta);
2566  return result;
2567  }
2568 
2569  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2570  bool is_desc{false};
2571 
2572  auto execute_sort_query = [this,
2573  sort,
2574  &source,
2575  &is_aggregate,
2576  &eo,
2577  &co,
2578  render_info,
2579  queue_time_ms,
2580  &groupby_exprs,
2581  &is_desc]() -> ExecutionResult {
2582  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2583  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2584  ExecutionOptions eo_copy = {
2586  eo.allow_multifrag,
2587  eo.just_explain,
2588  eo.allow_loop_joins,
2589  eo.with_watchdog,
2590  eo.jit_debug,
2591  eo.just_validate || sort->isEmptyResult(),
2592  eo.with_dynamic_watchdog,
2593  eo.dynamic_watchdog_time_limit,
2594  eo.find_push_down_candidates,
2595  eo.just_calcite_explain,
2596  eo.gpu_input_mem_limit_percent,
2597  eo.allow_runtime_query_interrupt,
2598  eo.running_query_interrupt_freq,
2599  eo.pending_query_interrupt_freq,
2600  eo.executor_type,
2601  };
2602 
2603  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2604  auto source_result = executeWorkUnit(source_work_unit,
2605  source->getOutputMetainfo(),
2606  is_aggregate,
2607  co,
2608  eo_copy,
2609  render_info,
2610  queue_time_ms);
2611  if (render_info && render_info->isPotentialInSituRender()) {
2612  return source_result;
2613  }
2614  if (source_result.isFilterPushDownEnabled()) {
2615  return source_result;
2616  }
2617  auto rows_to_sort = source_result.getRows();
2618  if (eo.just_explain) {
2619  return {rows_to_sort, {}};
2620  }
2621  const size_t limit = sort->getLimit();
2622  const size_t offset = sort->getOffset();
2623  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2624  !use_speculative_top_n(source_work_unit.exe_unit,
2625  rows_to_sort->getQueryMemDesc())) {
2626  const size_t top_n = limit == 0 ? 0 : limit + offset;
2627  rows_to_sort->sort(
2628  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2629  }
2630  if (limit || offset) {
2631  if (g_cluster && sort->collationCount() == 0) {
2632  if (offset >= rows_to_sort->rowCount()) {
2633  rows_to_sort->dropFirstN(offset);
2634  } else {
2635  rows_to_sort->keepFirstN(limit + offset);
2636  }
2637  } else {
2638  rows_to_sort->dropFirstN(offset);
2639  if (limit) {
2640  rows_to_sort->keepFirstN(limit);
2641  }
2642  }
2643  }
2644  return {rows_to_sort, source_result.getTargetsMeta()};
2645  };
2646 
2647  try {
2648  return execute_sort_query();
2649  } catch (const SpeculativeTopNFailed& e) {
2650  CHECK_EQ(size_t(1), groupby_exprs.size());
2651  CHECK(groupby_exprs.front());
2652  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2653  return execute_sort_query();
2654  }
2655 }
2656 
2658  const RelSort* sort,
2659  const ExecutionOptions& eo) {
2660  const auto source = sort->getInput(0);
2661  const size_t limit = sort->getLimit();
2662  const size_t offset = sort->getOffset();
2663  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2664  const size_t scan_total_limit =
2665  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2666  size_t max_groups_buffer_entry_guess{
2667  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2669  const auto order_entries = get_order_entries(sort);
2670  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2671  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2672  const auto& source_exe_unit = source_work_unit.exe_unit;
2673 
2674  // we do not allow sorting geometry or array types
2675  for (auto order_entry : order_entries) {
2676  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2677  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2678  const auto& ti = get_target_info(te, false);
2679  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2680  throw std::runtime_error(
2681  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2682  }
2683  }
2684 
2685  if (source_exe_unit.groupby_exprs.size() == 1) {
2686  if (!source_exe_unit.groupby_exprs.front()) {
2687  sort_algorithm = SortAlgorithm::StreamingTopN;
2688  } else {
2689  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2690  first_oe_is_desc(order_entries))) {
2691  sort_algorithm = SortAlgorithm::Default;
2692  }
2693  }
2694  }
2695 
2696  sort->setOutputMetainfo(source->getOutputMetainfo());
2697  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2698  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2699  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2700  std::move(source_exe_unit.input_col_descs),
2701  source_exe_unit.simple_quals,
2702  source_exe_unit.quals,
2703  source_exe_unit.join_quals,
2704  source_exe_unit.groupby_exprs,
2705  source_exe_unit.target_exprs,
2706  nullptr,
2707  {sort_info.order_entries, sort_algorithm, limit, offset},
2708  scan_total_limit,
2709  source_exe_unit.query_hint,
2710  source_exe_unit.use_bump_allocator,
2711  source_exe_unit.union_all,
2712  source_exe_unit.query_state},
2713  source,
2714  max_groups_buffer_entry_guess,
2715  std::move(source_work_unit.query_rewriter),
2716  source_work_unit.input_permutation,
2717  source_work_unit.left_deep_join_input_sizes};
2718 }
2719 
2720 namespace {
2721 
2728 size_t groups_approx_upper_bound(const std::vector<InputTableInfo>& table_infos) {
2729  CHECK(!table_infos.empty());
2730  const auto& first_table = table_infos.front();
2731  size_t max_num_groups = first_table.info.getNumTuplesUpperBound();
2732  for (const auto& table_info : table_infos) {
2733  if (table_info.info.getNumTuplesUpperBound() > max_num_groups) {
2734  max_num_groups = table_info.info.getNumTuplesUpperBound();
2735  }
2736  }
2737  return std::max(max_num_groups, size_t(1));
2738 }
2739 
2747  for (const auto target_expr : ra_exe_unit.target_exprs) {
2748  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
2749  return false;
2750  }
2751  }
2752  if (ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
2753  (!ra_exe_unit.scan_limit || ra_exe_unit.scan_limit > Executor::high_scan_limit)) {
2754  return true;
2755  }
2756  return false;
2757 }
2758 
2759 inline bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit) {
2760  return !(ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
2761  ra_exe_unit.simple_quals.empty());
2762 }
2763 
2765  const RelAlgExecutionUnit& ra_exe_unit_in,
2766  const std::vector<InputTableInfo>& table_infos,
2767  const Executor* executor,
2768  const ExecutorDeviceType device_type_in,
2769  std::vector<std::shared_ptr<Analyzer::Expr>>& target_exprs_owned) {
2770  RelAlgExecutionUnit ra_exe_unit = ra_exe_unit_in;
2771  for (size_t i = 0; i < ra_exe_unit.target_exprs.size(); ++i) {
2772  const auto target_expr = ra_exe_unit.target_exprs[i];
2773  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2774  if (agg_info.agg_kind != kAPPROX_COUNT_DISTINCT) {
2775  continue;
2776  }
2777  CHECK(dynamic_cast<const Analyzer::AggExpr*>(target_expr));
2778  const auto arg = static_cast<Analyzer::AggExpr*>(target_expr)->get_own_arg();
2779  CHECK(arg);
2780  const auto& arg_ti = arg->get_type_info();
2781  // Avoid calling getExpressionRange for variable length types (string and array),
2782  // it'd trigger an assertion since that API expects to be called only for types
2783  // for which the notion of range is well-defined. A bit of a kludge, but the
2784  // logic to reject these types anyway is at lower levels in the stack and not
2785  // really worth pulling into a separate function for now.
2786  if (!(arg_ti.is_number() || arg_ti.is_boolean() || arg_ti.is_time() ||
2787  (arg_ti.is_string() && arg_ti.get_compression() == kENCODING_DICT))) {
2788  continue;
2789  }
2790  const auto arg_range = getExpressionRange(arg.get(), table_infos, executor);
2791  if (arg_range.getType() != ExpressionRangeType::Integer) {
2792  continue;
2793  }
2794  // When running distributed, the threshold for using the precise implementation
2795  // must be consistent across all leaves, otherwise we could have a mix of precise
2796  // and approximate bitmaps and we cannot aggregate them.
2797  const auto device_type = g_cluster ? ExecutorDeviceType::GPU : device_type_in;
2798  const auto bitmap_sz_bits = arg_range.getIntMax() - arg_range.getIntMin() + 1;
2799  const auto sub_bitmap_count =
2800  get_count_distinct_sub_bitmap_count(bitmap_sz_bits, ra_exe_unit, device_type);
2801  int64_t approx_bitmap_sz_bits{0};
2802  const auto error_rate =
2803  static_cast<Analyzer::AggExpr*>(target_expr)->get_error_rate();
2804  if (error_rate) {
2805  CHECK(error_rate->get_type_info().get_type() == kINT);
2806  CHECK_GE(error_rate->get_constval().intval, 1);
2807  approx_bitmap_sz_bits = hll_size_for_rate(error_rate->get_constval().intval);
2808  } else {
2809  approx_bitmap_sz_bits = g_hll_precision_bits;
2810  }
2811  CountDistinctDescriptor approx_count_distinct_desc{CountDistinctImplType::Bitmap,
2812  arg_range.getIntMin(),
2813  approx_bitmap_sz_bits,
2814  true,
2815  device_type,
2816  sub_bitmap_count};
2817  CountDistinctDescriptor precise_count_distinct_desc{CountDistinctImplType::Bitmap,
2818  arg_range.getIntMin(),
2819  bitmap_sz_bits,
2820  false,
2821  device_type,
2822  sub_bitmap_count};
2823  if (approx_count_distinct_desc.bitmapPaddedSizeBytes() >=
2824  precise_count_distinct_desc.bitmapPaddedSizeBytes()) {
2825  auto precise_count_distinct = makeExpr<Analyzer::AggExpr>(
2826  get_agg_type(kCOUNT, arg.get()), kCOUNT, arg, true, nullptr);
2827  target_exprs_owned.push_back(precise_count_distinct);
2828  ra_exe_unit.target_exprs[i] = precise_count_distinct.get();
2829  }
2830  }
2831  return ra_exe_unit;
2832 }
2833 
2835  const std::vector<Analyzer::Expr*>& work_unit_target_exprs,
2836  const std::vector<TargetMetaInfo>& targets_meta) {
2837  CHECK_EQ(work_unit_target_exprs.size(), targets_meta.size());
2838  render_info.targets.clear();
2839  for (size_t i = 0; i < targets_meta.size(); ++i) {
2840  render_info.targets.emplace_back(std::make_shared<Analyzer::TargetEntry>(
2841  targets_meta[i].get_resname(),
2842  work_unit_target_exprs[i]->get_shared_ptr(),
2843  false));
2844  }
2845 }
2846 
2847 inline bool can_use_bump_allocator(const RelAlgExecutionUnit& ra_exe_unit,
2848  const CompilationOptions& co,
2849  const ExecutionOptions& eo) {
2851  !eo.output_columnar_hint && ra_exe_unit.sort_info.order_entries.empty();
2852 }
2853 
2854 } // namespace
2855 
2857  const RelAlgExecutor::WorkUnit& work_unit,
2858  const std::vector<TargetMetaInfo>& targets_meta,
2859  const bool is_agg,
2860  const CompilationOptions& co_in,
2861  const ExecutionOptions& eo,
2862  RenderInfo* render_info,
2863  const int64_t queue_time_ms,
2864  const std::optional<size_t> previous_count) {
2866  auto timer = DEBUG_TIMER(__func__);
2867 
2868  auto co = co_in;
2869  ColumnCacheMap column_cache;
2870  if (is_window_execution_unit(work_unit.exe_unit)) {
2872  throw std::runtime_error("Window functions support is disabled");
2873  }
2875  co.allow_lazy_fetch = false;
2876  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2877  }
2878  if (!eo.just_explain && eo.find_push_down_candidates) {
2879  // find potential candidates:
2880  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2881  if (!selected_filters.empty() || eo.just_calcite_explain) {
2882  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2883  }
2884  }
2885  if (render_info && render_info->isPotentialInSituRender()) {
2886  co.allow_lazy_fetch = false;
2887  }
2888  const auto body = work_unit.body;
2889  CHECK(body);
2890  auto it = leaf_results_.find(body->getId());
2891  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2892  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2893  if (it != leaf_results_.end()) {
2895  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2896  auto& aggregated_result = it->second;
2897  auto& result_rows = aggregated_result.rs;
2898  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2899  body->setOutputMetainfo(aggregated_result.targets_meta);
2900  if (render_info) {
2901  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2902  }
2903  return result;
2904  }
2905  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2906 
2908  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2909 
2910  // register query hint if query_dag_ is valid
2911  ra_exe_unit.query_hint =
2912  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults();
2913 
2914  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2915  if (is_window_execution_unit(ra_exe_unit)) {
2916  CHECK_EQ(table_infos.size(), size_t(1));
2917  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2918  max_groups_buffer_entry_guess =
2919  table_infos.front().info.fragments.front().getNumTuples();
2920  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2921  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2922  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2923  ra_exe_unit.scan_limit = *previous_count;
2924  } else {
2925  // TODO(adb): enable bump allocator path for render queries
2926  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2927  ra_exe_unit.scan_limit = 0;
2928  ra_exe_unit.use_bump_allocator = true;
2929  } else if (eo.executor_type == ::ExecutorType::Extern) {
2930  ra_exe_unit.scan_limit = 0;
2931  } else if (!eo.just_explain) {
2932  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2933  if (filter_count_all) {
2934  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2935  }
2936  }
2937  }
2938  }
2939  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2940  co.device_type,
2942  nullptr,
2943  executor_->getCatalog(),
2944  executor_->blockSize(),
2945  executor_->gridSize()),
2946  {}};
2947 
2948  auto execute_and_handle_errors =
2949  [&](const auto max_groups_buffer_entry_guess_in,
2950  const bool has_cardinality_estimation) -> ExecutionResult {
2951  // Note that the groups buffer entry guess may be modified during query execution.
2952  // Create a local copy so we can track those changes if we need to attempt a retry
2953  // due to OOM
2954  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2955  try {
2956  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2957  is_agg,
2958  table_infos,
2959  ra_exe_unit,
2960  co,
2961  eo,
2962  cat_,
2963  render_info,
2964  has_cardinality_estimation,
2965  column_cache),
2966  targets_meta};
2967  } catch (const QueryExecutionError& e) {
2969  return handleOutOfMemoryRetry(
2970  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2971  targets_meta,
2972  is_agg,
2973  co,
2974  eo,
2975  render_info,
2977  queue_time_ms);
2978  }
2979  };
2980 
2981  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2982  try {
2983  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2984  auto card = cached_cardinality.second;
2985  if (cached_cardinality.first && card >= 0) {
2986  result = execute_and_handle_errors(card, true);
2987  } else {
2988  result = execute_and_handle_errors(
2989  max_groups_buffer_entry_guess,
2991  }
2992  } catch (const CardinalityEstimationRequired& e) {
2993  // check the cardinality cache
2994  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2995  auto card = cached_cardinality.second;
2996  if (cached_cardinality.first && card >= 0) {
2997  result = execute_and_handle_errors(card, true);
2998  } else {
2999  const auto estimated_groups_buffer_entry_guess =
3000  2 * std::min(groups_approx_upper_bound(table_infos),
3001  getNDVEstimation(work_unit, e.range(), is_agg, co, eo));
3002  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3003  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
3004  if (!(eo.just_validate || eo.just_explain)) {
3005  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3006  }
3007  }
3008  }
3009 
3010  result.setQueueTime(queue_time_ms);
3011  if (render_info) {
3012  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3013  if (render_info->isPotentialInSituRender()) {
3014  // return an empty result (with the same queue time, and zero render time)
3015  return {std::make_shared<ResultSet>(
3016  queue_time_ms,
3017  0,
3018  executor_->row_set_mem_owner_
3019  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3020  : nullptr),
3021  {}};
3022  }
3023  }
3024  return result;
3025 }
3026 
3027 std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_unit,
3028  const bool is_agg,
3029  const CompilationOptions& co,
3030  const ExecutionOptions& eo) {
3031  const auto count =
3032  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3033  kCOUNT,
3034  nullptr,
3035  false,
3036  nullptr);
3037  const auto count_all_exe_unit =
3039  size_t one{1};
3040  ResultSetPtr count_all_result;
3041  try {
3042  ColumnCacheMap column_cache;
3043  count_all_result =
3044  executor_->executeWorkUnit(one,
3045  is_agg,
3046  get_table_infos(work_unit.exe_unit, executor_),
3047  count_all_exe_unit,
3048  co,
3049  eo,
3050  cat_,
3051  nullptr,
3052  false,
3053  column_cache);
3054  } catch (const foreign_storage::ForeignStorageException& error) {
3055  throw error;
3056  } catch (const QueryMustRunOnCpu&) {
3057  // force a retry of the top level query on CPU
3058  throw;
3059  } catch (const std::exception& e) {
3060  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3061  return std::nullopt;
3062  }
3063  const auto count_row = count_all_result->getNextRow(false, false);
3064  CHECK_EQ(size_t(1), count_row.size());
3065  const auto& count_tv = count_row.front();
3066  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3067  CHECK(count_scalar_tv);
3068  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3069  CHECK(count_ptr);
3070  CHECK_GE(*count_ptr, 0);
3071  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3072  return std::max(count_upper_bound, size_t(1));
3073 }
3074 
3075 bool RelAlgExecutor::isRowidLookup(const WorkUnit& work_unit) {
3076  const auto& ra_exe_unit = work_unit.exe_unit;
3077  if (ra_exe_unit.input_descs.size() != 1) {
3078  return false;
3079  }
3080  const auto& table_desc = ra_exe_unit.input_descs.front();
3081  if (table_desc.getSourceType() != InputSourceType::TABLE) {
3082  return false;
3083  }
3084  const int table_id = table_desc.getTableId();
3085  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
3086  const auto comp_expr =
3087  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
3088  if (!comp_expr || comp_expr->get_optype() != kEQ) {
3089  return false;
3090  }
3091  const auto lhs = comp_expr->get_left_operand();
3092  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
3093  if (!lhs_col || !lhs_col->get_table_id() || lhs_col->get_rte_idx()) {
3094  return false;
3095  }
3096  const auto rhs = comp_expr->get_right_operand();
3097  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
3098  if (!rhs_const) {
3099  return false;
3100  }
3101  auto cd = get_column_descriptor(lhs_col->get_column_id(), table_id, cat_);
3102  if (cd->isVirtualCol) {
3103  CHECK_EQ("rowid", cd->columnName);
3104  return true;
3105  }
3106  }
3107  return false;
3108 }
3109 
3111  const RelAlgExecutor::WorkUnit& work_unit,
3112  const std::vector<TargetMetaInfo>& targets_meta,
3113  const bool is_agg,
3114  const CompilationOptions& co,
3115  const ExecutionOptions& eo,
3116  RenderInfo* render_info,
3117  const bool was_multifrag_kernel_launch,
3118  const int64_t queue_time_ms) {
3119  // Disable the bump allocator
3120  // Note that this will have basically the same affect as using the bump allocator for
3121  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
3122  // path and the bump allocator path for kernel per fragment execution.
3123  auto ra_exe_unit_in = work_unit.exe_unit;
3124  ra_exe_unit_in.use_bump_allocator = false;
3125 
3126  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3127  co.device_type,
3129  nullptr,
3130  executor_->getCatalog(),
3131  executor_->blockSize(),
3132  executor_->gridSize()),
3133  {}};
3134 
3135  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
3136  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3137  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
3138  false,
3139  false,
3140  eo.allow_loop_joins,
3141  eo.with_watchdog,
3142  eo.jit_debug,
3143  false,
3146  false,
3147  false,
3152  eo.executor_type,
3154 
3155  if (was_multifrag_kernel_launch) {
3156  try {
3157  // Attempt to retry using the kernel per fragment path. The smaller input size
3158  // required may allow the entire kernel to execute in GPU memory.
3159  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
3160  "kernels disabled.";
3161  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
3162  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
3163  ColumnCacheMap column_cache;
3164  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3165  is_agg,
3166  table_infos,
3167  ra_exe_unit,
3168  co,
3169  eo_no_multifrag,
3170  cat_,
3171  nullptr,
3172  true,
3173  column_cache),
3174  targets_meta};
3175  result.setQueueTime(queue_time_ms);
3176  } catch (const QueryExecutionError& e) {
3178  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
3179  }
3180  }
3181 
3182  if (render_info) {
3183  render_info->setForceNonInSituData();
3184  }
3185 
3186  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
3187  // Only reset the group buffer entry guess if we ran out of slots, which
3188  // suggests a
3189  // highly pathological input which prevented a good estimation of distinct tuple
3190  // count. For projection queries, this will force a per-fragment scan limit, which is
3191  // compatible with the CPU path
3192  VLOG(1) << "Resetting max groups buffer entry guess.";
3193  max_groups_buffer_entry_guess = 0;
3194 
3195  int iteration_ctr = -1;
3196  while (true) {
3197  iteration_ctr++;
3199  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3200  ColumnCacheMap column_cache;
3201  try {
3202  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3203  is_agg,
3204  table_infos,
3205  ra_exe_unit,
3206  co_cpu,
3207  eo_no_multifrag,
3208  cat_,
3209  nullptr,
3210  true,
3211  column_cache),
3212  targets_meta};
3213  } catch (const QueryExecutionError& e) {
3214  // Ran out of slots
3215  if (e.getErrorCode() < 0) {
3216  // Even the conservative guess failed; it should only happen when we group
3217  // by a huge cardinality array. Maybe we should throw an exception instead?
3218  // Such a heavy query is entirely capable of exhausting all the host memory.
3219  CHECK(max_groups_buffer_entry_guess);
3220  // Only allow two iterations of increasingly large entry guesses up to a maximum
3221  // of 512MB per column per kernel
3222  if (g_enable_watchdog || iteration_ctr > 1) {
3223  throw std::runtime_error("Query ran out of output slots in the result");
3224  }
3225  max_groups_buffer_entry_guess *= 2;
3226  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
3227  "groups buffer entry "
3228  "guess equal to "
3229  << max_groups_buffer_entry_guess;
3230  } else {
3232  }
3233  continue;
3234  }
3235  result.setQueueTime(queue_time_ms);
3236  return result;
3237  }
3238  return result;
3239 }
3240 
3241 void RelAlgExecutor::handlePersistentError(const int32_t error_code) {
3242  LOG(ERROR) << "Query execution failed with error "
3243  << getErrorMessageFromCode(error_code);
3244  if (error_code == Executor::ERR_OUT_OF_GPU_MEM) {
3245  // We ran out of GPU memory, this doesn't count as an error if the query is
3246  // allowed to continue on CPU because retry on CPU is explicitly allowed through
3247  // --allow-cpu-retry.
3248  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
3249  if (!g_allow_cpu_retry) {
3250  throw std::runtime_error(
3251  "Query ran out of GPU memory, unable to automatically retry on CPU");
3252  }
3253  return;
3254  }
3255  throw std::runtime_error(getErrorMessageFromCode(error_code));
3256 }
3257 
3258 namespace {
3259 struct ErrorInfo {
3260  const char* code{nullptr};
3261  const char* description{nullptr};
3262 };
3263 ErrorInfo getErrorDescription(const int32_t error_code) {
3264  switch (error_code) {
3266  return {.code = "ERR_DIV_BY_ZERO", .description = "Division by zero"};
3268  return {.code = "ERR_OUT_OF_GPU_MEM",
3269  .description =
3270  "Query couldn't keep the entire working set of columns in GPU memory"};
3272  return {.code = "ERR_UNSUPPORTED_SELF_JOIN",
3273  .description = "Self joins not supported yet"};
3275  return {.code = "ERR_OUT_OF_CPU_MEM",
3276  .description = "Not enough host memory to execute the query"};
3278  return {.code = "ERR_OVERFLOW_OR_UNDERFLOW",
3279  .description = "Overflow or underflow"};
3281  return {.code = "ERR_OUT_OF_TIME",
3282  .description = "Query execution has exceeded the time limit"};
3284  return {.code = "ERR_INTERRUPTED",
3285  .description = "Query execution has been interrupted"};
3287  return {
3288  .code = "ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED",
3289  .description = "Columnar conversion not supported for variable length types"};
3291  return {.code = "ERR_TOO_MANY_LITERALS",
3292  .description = "Too many literals in the query"};
3294  return {.code = "ERR_STRING_CONST_IN_RESULTSET",
3295  .description =
3296  "NONE ENCODED String types are not supported as input result set."};
3298  return {.code = "ERR_OUT_OF_RENDER_MEM",
3299  .description = "Not enough OpenGL memory to render the query results"};
3301  return {.code = "ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY",
3302  .description = "Streaming-Top-N not supported in Render Query"};
3304  return {.code = "ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES",
3305  .description = "Multiple distinct values encountered"};
3306  case Executor::ERR_GEOS:
3307  return {.code = "ERR_GEOS", .description = "ERR_GEOS"};
3308  default:
3309  return {.code = nullptr, .description = nullptr};
3310  }
3311 }
3312 
3313 } // namespace
3314 
3315 std::string RelAlgExecutor::getErrorMessageFromCode(const int32_t error_code) {
3316  if (error_code < 0) {
3317  return "Ran out of slots in the query output buffer";
3318  }
3319  const auto errorInfo = getErrorDescription(error_code);
3320 
3321  if (errorInfo.code) {
3322  return errorInfo.code + ": "s + errorInfo.description;
3323  } else {
3324  return "Other error: code "s + std::to_string(error_code);
3325  }
3326 }
3327 
3330  VLOG(1) << "Running post execution callback.";
3331  (*post_execution_callback_)();
3332  }
3333 }
3334 
3336  const SortInfo& sort_info,
3337  const ExecutionOptions& eo) {
3338  const auto compound = dynamic_cast<const RelCompound*>(node);
3339  if (compound) {
3340  return createCompoundWorkUnit(compound, sort_info, eo);
3341  }
3342  const auto project = dynamic_cast<const RelProject*>(node);
3343  if (project) {
3344  return createProjectWorkUnit(project, sort_info, eo);
3345  }
3346  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3347  if (aggregate) {
3348  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3349  }
3350  const auto filter = dynamic_cast<const RelFilter*>(node);
3351  if (filter) {
3352  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3353  }
3354  LOG(FATAL) << "Unhandled node type: " << node->toString();
3355  return {};
3356 }
3357 
3358 namespace {
3359 
3361  auto sink = get_data_sink(ra);
3362  if (auto join = dynamic_cast<const RelJoin*>(sink)) {
3363  return join->getJoinType();
3364  }
3365  if (dynamic_cast<const RelLeftDeepInnerJoin*>(sink)) {
3366  return JoinType::INNER;
3367  }
3368 
3369  return JoinType::INVALID;
3370 }
3371 
3372 std::unique_ptr<const RexOperator> get_bitwise_equals(const RexScalar* scalar) {
3373  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3374  if (!condition || condition->getOperator() != kOR || condition->size() != 2) {
3375  return nullptr;
3376  }
3377  const auto equi_join_condition =
3378  dynamic_cast<const RexOperator*>(condition->getOperand(0));
3379  if (!equi_join_condition || equi_join_condition->getOperator() != kEQ) {
3380  return nullptr;
3381  }
3382  const auto both_are_null_condition =
3383  dynamic_cast<const RexOperator*>(condition->getOperand(1));
3384  if (!both_are_null_condition || both_are_null_condition->getOperator() != kAND ||
3385  both_are_null_condition->size() != 2) {
3386  return nullptr;
3387  }
3388  const auto lhs_is_null =
3389  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(0));
3390  const auto rhs_is_null =
3391  dynamic_cast<const RexOperator*>(both_are_null_condition->getOperand(1));
3392  if (!lhs_is_null || !rhs_is_null || lhs_is_null->getOperator() != kISNULL ||
3393  rhs_is_null->getOperator() != kISNULL) {
3394  return nullptr;
3395  }
3396  CHECK_EQ(size_t(1), lhs_is_null->size());
3397  CHECK_EQ(size_t(1), rhs_is_null->size());
3398  CHECK_EQ(size_t(2), equi_join_condition->size());
3399  const auto eq_lhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(0));
3400  const auto eq_rhs = dynamic_cast<const RexInput*>(equi_join_condition->getOperand(1));
3401  const auto is_null_lhs = dynamic_cast<const RexInput*>(lhs_is_null->getOperand(0));
3402  const auto is_null_rhs = dynamic_cast<const RexInput*>(rhs_is_null->getOperand(0));
3403  if (!eq_lhs || !eq_rhs || !is_null_lhs || !is_null_rhs) {
3404  return nullptr;
3405  }
3406  std::vector<std::unique_ptr<const RexScalar>> eq_operands;
3407  if (*eq_lhs == *is_null_lhs && *eq_rhs == *is_null_rhs) {
3408  RexDeepCopyVisitor deep_copy_visitor;
3409  auto lhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(0));
3410  auto rhs_op_copy = deep_copy_visitor.visit(equi_join_condition->getOperand(1));
3411  eq_operands.emplace_back(lhs_op_copy.release());
3412  eq_operands.emplace_back(rhs_op_copy.release());
3413  return boost::make_unique<const RexOperator>(
3414  kBW_EQ, eq_operands, equi_join_condition->getType());
3415  }
3416  return nullptr;
3417 }
3418 
3419 std::unique_ptr<const RexOperator> get_bitwise_equals_conjunction(
3420  const RexScalar* scalar) {
3421  const auto condition = dynamic_cast<const RexOperator*>(scalar);
3422  if (condition && condition->getOperator() == kAND) {
3423  CHECK_GE(condition->size(), size_t(2));
3424  auto acc = get_bitwise_equals(condition->getOperand(0));
3425  if (!acc) {
3426  return nullptr;
3427  }
3428  for (size_t i = 1; i < condition->size(); ++i) {
3429  std::vector<std::unique_ptr<const RexScalar>> and_operands;
3430  and_operands.emplace_back(std::move(acc));
3431  and_operands.emplace_back(get_bitwise_equals_conjunction(condition->getOperand(i)));
3432  acc =
3433  boost::make_unique<const RexOperator>(kAND, and_operands, condition->getType());
3434  }
3435  return acc;
3436  }
3437  return get_bitwise_equals(scalar);
3438 }
3439 
3440 std::vector<JoinType> left_deep_join_types(const RelLeftDeepInnerJoin* left_deep_join) {
3441  CHECK_GE(left_deep_join->inputCount(), size_t(2));
3442  std::vector<JoinType> join_types(left_deep_join->inputCount() - 1, JoinType::INNER);
3443  for (size_t nesting_level = 1; nesting_level <= left_deep_join->inputCount() - 1;
3444  ++nesting_level) {
3445  if (left_deep_join->getOuterCondition(nesting_level)) {
3446  join_types[nesting_level - 1] = JoinType::LEFT;
3447  }
3448  }
3449  return join_types;
3450 }
3451 
3452 template <class RA>
3453 std::vector<size_t> do_table_reordering(
3454  std::vector<InputDescriptor>& input_descs,
3455  std::list<std::shared_ptr<const InputColDescriptor>>& input_col_descs,
3456  const JoinQualsPerNestingLevel& left_deep_join_quals,
3457  std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3458  const RA* node,
3459  const std::vector<InputTableInfo>& query_infos,
3460  const Executor* executor) {
3461  if (g_cluster) {
3462  // Disable table reordering in distributed mode. The aggregator does not have enough
3463  // information to break ties
3464  return {};
3465  }
3466  const auto& cat = *executor->getCatalog();
3467  for (const auto& table_info : query_infos) {
3468  if (table_info.table_id < 0) {
3469  continue;
3470  }
3471  const auto td = cat.getMetadataForTable(table_info.table_id);
3472  CHECK(td);
3473  if (table_is_replicated(td)) {
3474  return {};
3475  }
3476  }
3477  const auto input_permutation =
3478  get_node_input_permutation(left_deep_join_quals, query_infos, executor);
3479  input_to_nest_level = get_input_nest_levels(node, input_permutation);
3480  std::tie(input_descs, input_col_descs, std::ignore) =
3481  get_input_desc(node, input_to_nest_level, input_permutation, cat);
3482  return input_permutation;
3483 }
3484 
3486  const RelLeftDeepInnerJoin* left_deep_join) {
3487  std::vector<size_t> input_sizes;
3488  for (size_t i = 0; i < left_deep_join->inputCount(); ++i) {
3489  const auto inputs = get_node_output(left_deep_join->getInput(i));
3490  input_sizes.push_back(inputs.size());
3491  }
3492  return input_sizes;
3493 }
3494 
3495 std::list<std::shared_ptr<Analyzer::Expr>> rewrite_quals(
3496  const std::list<std::shared_ptr<Analyzer::Expr>>& quals) {
3497  std::list<std::shared_ptr<Analyzer::Expr>> rewritten_quals;
3498  for (const auto& qual : quals) {
3499  const auto rewritten_qual = rewrite_expr(qual.get());
3500  rewritten_quals.push_back(rewritten_qual ? rewritten_qual : qual);
3501  }
3502  return rewritten_quals;
3503 }
3504 
3505 } // namespace
3506 
3508  const RelCompound* compound,
3509  const SortInfo& sort_info,
3510  const ExecutionOptions& eo) {
3511  std::vector<InputDescriptor> input_descs;
3512  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3513  auto input_to_nest_level = get_input_nest_levels(compound, {});
3514  std::tie(input_descs, input_col_descs, std::ignore) =
3515  get_input_desc(compound, input_to_nest_level, {}, cat_);
3516  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3517  const auto query_infos = get_table_infos(input_descs, executor_);
3518  CHECK_EQ(size_t(1), compound->inputCount());
3519  const auto left_deep_join =
3520  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3521  JoinQualsPerNestingLevel left_deep_join_quals;
3522  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3523  : std::vector<JoinType>{get_join_type(compound)};
3524  std::vector<size_t> input_permutation;
3525  std::vector<size_t> left_deep_join_input_sizes;
3526  if (left_deep_join) {
3527  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3528  left_deep_join_quals = translateLeftDeepJoinFilter(
3529  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3531  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3532  join_types.end()) {
3533  input_permutation = do_table_reordering(input_descs,
3534  input_col_descs,
3535  left_deep_join_quals,
3536  input_to_nest_level,
3537  compound,
3538  query_infos,
3539  executor_);
3540  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3541  std::tie(input_descs, input_col_descs, std::ignore) =
3542  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3543  left_deep_join_quals = translateLeftDeepJoinFilter(
3544  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3545  }
3546  }
3547  RelAlgTranslator translator(cat_,
3548  query_state_,
3549  executor_,
3550  input_to_nest_level,
3551  join_types,
3552  now_,
3553  eo.just_explain);
3554  const auto scalar_sources =
3555  translate_scalar_sources(compound, translator, eo.executor_type);
3556  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3557  const auto quals_cf = translate_quals(compound, translator);
3558  const auto target_exprs = translate_targets(target_exprs_owned_,
3559  scalar_sources,
3560  groupby_exprs,
3561  compound,
3562  translator,
3563  eo.executor_type);
3564  CHECK_EQ(compound->size(), target_exprs.size());
3565  const RelAlgExecutionUnit exe_unit = {
3566  input_descs,
3567  input_col_descs,
3568  quals_cf.simple_quals,
3569  rewrite_quals(quals_cf.quals),
3570  left_deep_join_quals,
3571  groupby_exprs,
3572  target_exprs,
3573  nullptr,
3574  sort_info,
3575  0,
3576  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3577  false,
3578  std::nullopt,
3579  query_state_};
3580  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3581  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3582  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3583  compound->setOutputMetainfo(targets_meta);
3584  return {rewritten_exe_unit,
3585  compound,
3587  std::move(query_rewriter),
3588  input_permutation,
3589  left_deep_join_input_sizes};
3590 }
3591 
3592 namespace {
3593 
3594 std::vector<const RexScalar*> rex_to_conjunctive_form(const RexScalar* qual_expr) {
3595  CHECK(qual_expr);
3596  const auto bin_oper = dynamic_cast<const RexOperator*>(qual_expr);
3597  if (!bin_oper || bin_oper->getOperator() != kAND) {
3598  return {qual_expr};
3599  }
3600  CHECK_GE(bin_oper->size(), size_t(2));
3601  auto lhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(0));
3602  for (size_t i = 1; i < bin_oper->size(); ++i) {
3603  const auto rhs_cf = rex_to_conjunctive_form(bin_oper->getOperand(i));
3604  lhs_cf.insert(lhs_cf.end(), rhs_cf.begin(), rhs_cf.end());
3605  }
3606  return lhs_cf;
3607 }
3608 
3609 std::shared_ptr<Analyzer::Expr> build_logical_expression(
3610  const std::vector<std::shared_ptr<Analyzer::Expr>>& factors,
3611  const SQLOps sql_op) {
3612  CHECK(!factors.empty());
3613  auto acc = factors.front();
3614  for (size_t i = 1; i < factors.size(); ++i) {
3615  acc = Parser::OperExpr::normalize(sql_op, kONE, acc, factors[i]);
3616  }
3617  return acc;
3618 }
3619 
3620 template <class QualsList>
3621 bool list_contains_expression(const QualsList& haystack,
3622  const std::shared_ptr<Analyzer::Expr>& needle) {
3623  for (const auto& qual : haystack) {
3624  if (*qual == *needle) {
3625  return true;
3626  }
3627  }
3628  return false;
3629 }
3630 
3631 // Transform `(p AND q) OR (p AND r)` to `p AND (q OR r)`. Avoids redundant
3632 // evaluations of `p` and allows use of the original form in joins if `p`
3633 // can be used for hash joins.
3634 std::shared_ptr<Analyzer::Expr> reverse_logical_distribution(
3635  const std::shared_ptr<Analyzer::Expr>& expr) {
3636  const auto expr_terms = qual_to_disjunctive_form(expr);
3637  CHECK_GE(expr_terms.size(), size_t(1));
3638  const auto& first_term = expr_terms.front();
3639  const auto first_term_factors = qual_to_conjunctive_form(first_term);
3640  std::vector<std::shared_ptr<Analyzer::Expr>> common_factors;
3641  // First, collect the conjunctive components common to all the disjunctive components.
3642  // Don't do it for simple qualifiers, we only care about expensive or join qualifiers.
3643  for (const auto& first_term_factor : first_term_factors.quals) {
3644  bool is_common =
3645  expr_terms.size() > 1; // Only report common factors for disjunction.
3646  for (size_t i = 1; i < expr_terms.size(); ++i) {
3647  const auto crt_term_factors = qual_to_conjunctive_form(expr_terms[i]);
3648  if (!list_contains_expression(crt_term_factors.quals, first_term_factor)) {
3649  is_common = false;
3650  break;
3651  }
3652  }
3653  if (is_common) {
3654  common_factors.push_back(first_term_factor);
3655  }
3656  }
3657  if (common_factors.empty()) {
3658  return expr;
3659  }
3660  // Now that the common expressions are known, collect the remaining expressions.
3661  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_terms;
3662  for (const auto& term : expr_terms) {
3663  const auto term_cf = qual_to_conjunctive_form(term);
3664  std::vector<std::shared_ptr<Analyzer::Expr>> remaining_quals(
3665  term_cf.simple_quals.begin(), term_cf.simple_quals.end());
3666  for (const auto& qual : term_cf.quals) {
3667  if (!list_contains_expression(common_factors, qual)) {
3668  remaining_quals.push_back(qual);
3669  }
3670  }
3671  if (!remaining_quals.empty()) {
3672  remaining_terms.push_back(build_logical_expression(remaining_quals, kAND));
3673  }
3674  }
3675  // Reconstruct the expression with the transformation applied.
3676  const auto common_expr = build_logical_expression(common_factors, kAND);
3677  if (remaining_terms.empty()) {
3678  return common_expr;
3679  }
3680  const auto remaining_expr = build_logical_expression(remaining_terms, kOR);
3681  return Parser::OperExpr::normalize(kAND, kONE, common_expr, remaining_expr);
3682 }
3683 
3684 } // namespace
3685 
3686 std::list<std::shared_ptr<Analyzer::Expr>> RelAlgExecutor::makeJoinQuals(
3687  const RexScalar* join_condition,
3688  const std::vector<JoinType>& join_types,
3689  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3690  const bool just_explain) const {
3691  RelAlgTranslator translator(
3692  cat_, query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
3693  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
3694  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
3695  for (const auto rex_condition_component : rex_condition_cf) {
3696  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
3697  const auto join_condition =
3699  bw_equals ? bw_equals.get() : rex_condition_component));
3700  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
3701  join_condition_quals.insert(join_condition_quals.end(),
3702  join_condition_cf.quals.begin(),
3703  join_condition_cf.quals.end());
3704  join_condition_quals.insert(join_condition_quals.end(),
3705  join_condition_cf.simple_quals.begin(),
3706  join_condition_cf.simple_quals.end());
3707  }
3708  return combine_equi_join_conditions(join_condition_quals);
3709 }
3710 
3711 // Translate left deep join filter and separate the conjunctive form qualifiers
3712 // per nesting level. The code generated for hash table lookups on each level
3713 // must dominate its uses in deeper nesting levels.
3715  const RelLeftDeepInnerJoin* join,
3716  const std::vector<InputDescriptor>& input_descs,
3717  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
3718  const bool just_explain) {
3719  const auto join_types = left_deep_join_types(join);
3720  const auto join_condition_quals = makeJoinQuals(
3721  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
3722  MaxRangeTableIndexVisitor rte_idx_visitor;
3723  JoinQualsPerNestingLevel result(input_descs.size() - 1);
3724  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
3725  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
3726  const auto outer_condition = join->getOuterCondition(rte_idx);
3727  if (outer_condition) {
3728  result[rte_idx - 1].quals =
3729  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
3730  CHECK_LE(rte_idx, join_types.size());
3731  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
3732  result[rte_idx - 1].type = JoinType::LEFT;
3733  continue;
3734  }
3735  for (const auto& qual : join_condition_quals) {
3736  if (visited_quals.count(qual)) {
3737  continue;
3738  }
3739  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
3740  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
3741  const auto it_ok = visited_quals.emplace(qual);
3742  CHECK(it_ok.second);
3743  result[rte_idx - 1].quals.push_back(qual);
3744  }
3745  }
3746  CHECK_LE(rte_idx, join_types.size());
3747  CHECK(join_types[rte_idx - 1] == JoinType::INNER);
3748  result[rte_idx - 1].type = JoinType::INNER;
3749  }
3750  return result;
3751 }
3752 
3753 namespace {
3754 
3755 std::vector<std::shared_ptr<Analyzer::Expr>> synthesize_inputs(
3756  const RelAlgNode* ra_node,
3757  const size_t nest_level,
3758  const std::vector<TargetMetaInfo>& in_metainfo,
3759  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
3760  CHECK_LE(size_t(1), ra_node->inputCount());
3761  CHECK_GE(size_t(2), ra_node->inputCount());
3762  const auto input = ra_node->getInput(nest_level);
3763  const auto it_rte_idx = input_to_nest_level.find(input);
3764  CHECK(it_rte_idx != input_to_nest_level.end());
3765  const int rte_idx = it_rte_idx->second;
3766  const int table_id = table_id_from_ra(input);
3767  std::vector<std::shared_ptr<Analyzer::Expr>> inputs;
3768  const auto scan_ra = dynamic_cast<const RelScan*>(input);
3769  int input_idx = 0;
3770  for (const auto& input_meta : in_metainfo) {
3771  inputs.push_back(
3772  std::make_shared<Analyzer::ColumnVar>(input_meta.get_type_info(),
3773  table_id,
3774  scan_ra ? input_idx + 1 : input_idx,
3775  rte_idx));
3776  ++input_idx;
3777  }
3778  return inputs;
3779 }
3780 
3781 } // namespace
3782 
3784  const RelAggregate* aggregate,
3785  const SortInfo& sort_info,
3786  const bool just_explain) {
3787  std::vector<InputDescriptor> input_descs;
3788  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3789  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3790  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3791  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3792  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3793  const auto join_type = get_join_type(aggregate);
3794 
3795  RelAlgTranslator translator(cat_,
3796  query_state_,
3797  executor_,
3798  input_to_nest_level,
3799  {join_type},
3800  now_,
3801  just_explain);
3802  CHECK_EQ(size_t(1), aggregate->inputCount());
3803  const auto source = aggregate->getInput(0);
3804  const auto& in_metainfo = source->getOutputMetainfo();
3805  const auto scalar_sources =
3806  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3807  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3808  const auto target_exprs = translate_targets(
3809  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3810  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3811  aggregate->setOutputMetainfo(targets_meta);
3812  return {RelAlgExecutionUnit{
3813  input_descs,
3814  input_col_descs,
3815  {},
3816  {},
3817  {},
3818  groupby_exprs,
3819  target_exprs,
3820  nullptr,
3821  sort_info,
3822  0,
3823  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3824  false,
3825  std::nullopt,
3826  query_state_},
3827  aggregate,
3829  nullptr};
3830 }
3831 
3833  const RelProject* project,
3834  const SortInfo& sort_info,
3835  const ExecutionOptions& eo) {
3836  std::vector<InputDescriptor> input_descs;
3837  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3838  auto input_to_nest_level = get_input_nest_levels(project, {});
3839  std::tie(input_descs, input_col_descs, std::ignore) =
3840  get_input_desc(project, input_to_nest_level, {}, cat_);
3841  const auto query_infos = get_table_infos(input_descs, executor_);
3842 
3843  const auto left_deep_join =
3844  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3845  JoinQualsPerNestingLevel left_deep_join_quals;
3846  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3847  : std::vector<JoinType>{get_join_type(project)};
3848  std::vector<size_t> input_permutation;
3849  std::vector<size_t> left_deep_join_input_sizes;
3850  if (left_deep_join) {
3851  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3852  const auto query_infos = get_table_infos(input_descs, executor_);
3853  left_deep_join_quals = translateLeftDeepJoinFilter(
3854  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3856  input_permutation = do_table_reordering(input_descs,
3857  input_col_descs,
3858  left_deep_join_quals,
3859  input_to_nest_level,
3860  project,
3861  query_infos,
3862  executor_);
3863  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3864  std::tie(input_descs, input_col_descs, std::ignore) =
3865  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3866  left_deep_join_quals = translateLeftDeepJoinFilter(
3867  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3868  }
3869  }
3870 
3871  RelAlgTranslator translator(cat_,
3872  query_state_,
3873  executor_,
3874  input_to_nest_level,
3875  join_types,
3876  now_,
3877  eo.just_explain);
3878  const auto target_exprs_owned =
3879  translate_scalar_sources(project, translator, eo.executor_type);
3880  target_exprs_owned_.insert(
3881  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3882  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3883 
3884  const RelAlgExecutionUnit exe_unit = {
3885  input_descs,
3886  input_col_descs,
3887  {},
3888  {},
3889  left_deep_join_quals,
3890  {nullptr},
3891  target_exprs,
3892  nullptr,
3893  sort_info,
3894  0,
3895  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3896  false,
3897  std::nullopt,
3898  query_state_};
3899  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3900  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3901  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3902  project->setOutputMetainfo(targets_meta);
3903  return {rewritten_exe_unit,
3904  project,
3906  std::move(query_rewriter),
3907  input_permutation,
3908  left_deep_join_input_sizes};
3909 }
3910 
3911 namespace {
3912 
3913 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_for_union(
3914  RelAlgNode const* input_node) {
3915  std::vector<TargetMetaInfo> const& tmis = input_node->getOutputMetainfo();
3916  VLOG(3) << "input_node->getOutputMetainfo()=" << shared::printContainer(tmis);
3917  const int negative_node_id = -input_node->getId();
3918  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs;
3919  target_exprs.reserve(tmis.size());
3920  for (size_t i = 0; i < tmis.size(); ++i) {
3921  target_exprs.push_back(std::make_shared<Analyzer::ColumnVar>(
3922  tmis[i].get_type_info(), negative_node_id, i, 0));
3923  }
3924  return target_exprs;
3925 }
3926 
3927 } // namespace
3928 
3930  const RelLogicalUnion* logical_union,
3931  const SortInfo& sort_info,
3932  const ExecutionOptions& eo) {
3933  std::vector<InputDescriptor> input_descs;
3934  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3935  // Map ra input ptr to index (0, 1).
3936  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3937  std::tie(input_descs, input_col_descs, std::ignore) =
3938  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3939  const auto query_infos = get_table_infos(input_descs, executor_);
3940  auto const max_num_tuples =
3941  std::accumulate(query_infos.cbegin(),
3942  query_infos.cend(),
3943  size_t(0),
3944  [](auto max, auto const& query_info) {
3945  return std::max(max, query_info.info.getNumTuples());
3946  });
3947 
3948  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3949  for (auto& pair : input_to_nest_level) {
3950  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3951  }
3952 
3953  RelAlgTranslator translator(
3954  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3955 
3956  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3957  CHECK(!input_exprs_owned.empty())
3958  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3959  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3960  for (auto& input_expr : input_exprs_owned) {
3961  VLOG(3) << " " << input_expr->toString();
3962  }
3963  target_exprs_owned_.insert(
3964  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3965  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3966 
3967  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3968  << " input_col_descs=" << shared::printContainer(input_col_descs)
3969  << " target_exprs.size()=" << target_exprs.size()
3970  << " max_num_tuples=" << max_num_tuples;
3971 
3972  const RelAlgExecutionUnit exe_unit = {
3973  input_descs,
3974  input_col_descs,
3975  {}, // quals_cf.simple_quals,
3976  {}, // rewrite_quals(quals_cf.quals),
3977  {},
3978  {nullptr},
3979  target_exprs,
3980  nullptr,
3981  sort_info,
3982  max_num_tuples,
3983  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3984  false,
3985  logical_union->isAll(),
3986  query_state_};
3987  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3988  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3989 
3990  RelAlgNode const* input0 = logical_union->getInput(0);
3991  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3992  logical_union->setOutputMetainfo(
3993  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3994  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3995  logical_union->setOutputMetainfo(
3996  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3997  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3998  logical_union->setOutputMetainfo(
3999  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4000  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4001  logical_union->setOutputMetainfo(
4002  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4003  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4004  logical_union->setOutputMetainfo(
4005  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4006  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4007  logical_union->setOutputMetainfo(
4008  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4009  } else if (dynamic_cast<const RelSort*>(input0)) {
4010  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4011  } else {
4012  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4013  }
4014  VLOG(3) << "logical_union->getOutputMetainfo()="
4015  << shared::printContainer(logical_union->getOutputMetainfo())
4016  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4017  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4018 
4019  return {rewritten_exe_unit,
4020  logical_union,
4022  std::move(query_rewriter)};
4023 }
4024 
4026  const RelTableFunction* rel_table_func,
4027  const bool just_explain,
4028  const bool is_gpu) {
4029  std::vector<InputDescriptor> input_descs;
4030  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4031  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4032  std::tie(input_descs, input_col_descs, std::ignore) =
4033  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4034  const auto query_infos = get_table_infos(input_descs, executor_);
4035  RelAlgTranslator translator(
4036  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4037  const auto input_exprs_owned =
4038  translate_scalar_sources(rel_table_func, translator, ::ExecutorType::Native);
4039  target_exprs_owned_.insert(
4040  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4041  auto input_exprs = get_exprs_not_owned(input_exprs_owned);
4042 
4043  const auto table_function_impl_and_type_infos = [=]() {
4044  if (is_gpu) {
4045  try {
4046  return bind_table_function(
4047  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4048  } catch (ExtensionFunctionBindingError& e) {
4049  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4050  << " Redirecting " << rel_table_func->getFunctionName()
4051  << " to run on CPU.";
4052  throw QueryMustRunOnCpu();
4053  }
4054  } else {
4055  try {
4056  return bind_table_function(
4057  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4058  } catch (ExtensionFunctionBindingError& e) {
4059  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4060  throw;
4061  }
4062  }
4063  }();
4064  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4065  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4066 
4067  size_t output_row_sizing_param = 0;
4068  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
4069  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
4070  const auto parameter_index =
4071  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4072  CHECK_GT(parameter_index, size_t(0));
4073  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4074  const auto parameter_expr =
4075  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4076  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4077  if (!parameter_expr_literal) {
4078  throw std::runtime_error(
4079  "Provided output buffer sizing parameter is not a literal. Only literal "
4080  "values are supported with output buffer sizing configured table "
4081  "functions.");
4082  }
4083  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4084  if (literal_val < 0) {
4085  throw std::runtime_error("Provided output sizing parameter " +
4086  std::to_string(literal_val) +
4087  " must be positive integer.");
4088  }
4089  output_row_sizing_param = static_cast<size_t>(literal_val);
4090  } else {
4091  // RowMultiplier not specified in the SQL query. Set it to 1
4092  output_row_sizing_param = 1; // default value for RowMultiplier
4094  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4095  makeExpr<Analyzer::Constant>(kINT, false, d);
4096  // Push the constant 1 to input_exprs
4097  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4098  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4099  }
4100  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
4101  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4102  } else {
4103  UNREACHABLE();
4104  }
4105 
4106  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4107  size_t input_index = 0;
4108  for (const auto& ti : table_function_type_infos) {
4109  if (ti.is_column_list()) {
4110  for (int i = 0; i < ti.get_dimension(); i++) {
4111  auto& input_expr = input_exprs[input_index];
4112  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4113  CHECK(col_var);
4114  input_expr->set_type_info(
4115  ti); // ti is shared in between all columns in the same column_list
4116  input_col_exprs.push_back(col_var);
4117  input_index++;
4118  }
4119  } else if (ti.is_column()) {
4120  auto& input_expr = input_exprs[input_index];
4121  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4122  CHECK(col_var);
4123  input_expr->set_type_info(ti);
4124  input_col_exprs.push_back(col_var);
4125  input_index++;
4126  } else {
4127  input_index++;
4128  }
4129  }
4130  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4131  std::vector<Analyzer::Expr*> table_func_outputs;
4132  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4133  const auto ti = table_function_impl.getOutputSQLType(i);
4134  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4135  table_func_outputs.push_back(target_exprs_owned_.back().get());
4136  }
4137  const TableFunctionExecutionUnit exe_unit = {
4138  input_descs,
4139  input_col_descs,
4140  input_exprs, // table function inputs
4141  input_col_exprs, // table function column inputs (duplicates w/ above)
4142  table_func_outputs, // table function projected exprs
4143  output_row_sizing_param, // output buffer sizing param
4144  table_function_impl};
4145  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4146  rel_table_func->setOutputMetainfo(targets_meta);
4147  return {exe_unit, rel_table_func};
4148 }
4149 
4150 namespace {
4151 
4152 std::pair<std::vector<TargetMetaInfo>, std::vector<std::shared_ptr<Analyzer::Expr>>>
4154  const RelAlgTranslator& translator,
4155  const std::vector<std::shared_ptr<RexInput>>& inputs_owned,
4156  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level) {
4157  std::vector<TargetMetaInfo> in_metainfo;
4158  std::vector<std::shared_ptr<Analyzer::Expr>> exprs_owned;
4159  const auto data_sink_node = get_data_sink(filter);
4160  auto input_it = inputs_owned.begin();
4161  for (size_t nest_level = 0; nest_level < data_sink_node->inputCount(); ++nest_level) {
4162  const auto source = data_sink_node->getInput(nest_level);
4163  const auto scan_source = dynamic_cast<const RelScan*>(source);
4164  if (scan_source) {
4165  CHECK(source->getOutputMetainfo().empty());
4166  std::vector<std::shared_ptr<Analyzer::Expr>> scalar_sources_owned;
4167  for (size_t i = 0; i < scan_source->size(); ++i, ++input_it) {
4168  scalar_sources_owned.push_back(translator.translateScalarRex(input_it->get()));
4169  }
4170  const auto source_metadata =
4171  get_targets_meta(scan_source, get_exprs_not_owned(scalar_sources_owned));
4172  in_metainfo.insert(
4173  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4174  exprs_owned.insert(
4175  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4176  } else {
4177  const auto& source_metadata = source->getOutputMetainfo();
4178  input_it += source_metadata.size();
4179  in_metainfo.insert(
4180  in_metainfo.end(), source_metadata.begin(), source_metadata.end());
4181  const auto scalar_sources_owned = synthesize_inputs(
4182  data_sink_node, nest_level, source_metadata, input_to_nest_level);
4183  exprs_owned.insert(
4184  exprs_owned.end(), scalar_sources_owned.begin(), scalar_sources_owned.end());
4185  }
4186  }
4187  return std::make_pair(in_metainfo, exprs_owned);
4188 }
4189 
4190 } // namespace
4191 
4193  const SortInfo& sort_info,
4194  const bool just_explain) {
4195  CHECK_EQ(size_t(1), filter->inputCount());
4196  std::vector<InputDescriptor> input_descs;
4197  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4198  std::vector<TargetMetaInfo> in_metainfo;
4199  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4200  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4201 
4202  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4203  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4204  get_input_desc(filter, input_to_nest_level, {}, cat_);
4205  const auto join_type = get_join_type(filter);
4206  RelAlgTranslator translator(cat_,
4207  query_state_,
4208  executor_,
4209  input_to_nest_level,
4210  {join_type},
4211  now_,
4212  just_explain);
4213  std::tie(in_metainfo, target_exprs_owned) =
4214  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4215  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4216  const auto qual = fold_expr(filter_expr.get());
4217  target_exprs_owned_.insert(
4218  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4219  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4220  filter->setOutputMetainfo(in_metainfo);
4221  const auto rewritten_qual = rewrite_expr(qual.get());
4222  return {{input_descs,
4223  input_col_descs,
4224  {},
4225  {rewritten_qual ? rewritten_qual : qual},
4226  {},
4227  {nullptr},
4228  target_exprs,
4229  nullptr,
4230  sort_info,
4231  0,
4232  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults()},
4233  filter,
4235  nullptr};
4236 }
4237 
const size_t getGroupByCount() const
bool isAll() const
bool is_agg(const Analyzer::Expr *expr)
Analyzer::ExpressionPtr rewrite_array_elements(Analyzer::Expr const *expr)
std::vector< Analyzer::Expr * > target_exprs
SortField getCollation(const size_t i) const
const foreign_storage::ForeignTable * getForeignTable(const std::string &tableName) const
Definition: Catalog.cpp:1448
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:211
void collect_used_input_desc(std::vector< InputDescriptor > &input_descs, const Catalog_Namespace::Catalog &cat, std::unordered_set< std::shared_ptr< const InputColDescriptor >> &input_col_descs_unique, const RelAlgNode *ra_node, const std::unordered_set< const RexInput * > &source_used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
size_t getOffset() const
std::vector< int > ChunkKey
Definition: types.h:37
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool g_enable_watchdog
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
std::string cat(Ts &&...args)
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
const Rex * getTargetExpr(const size_t i) const
AggregatedColRange computeColRangesCache()
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1229
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
Definition: sqltypes.h:48
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
size_t size() const override
int hll_size_for_rate(const int err_percent)
Definition: HyperLogLog.h:115
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const RexScalar * getFilterExpr() const
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3118
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
tuple d
Definition: test_fsi.py:9
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const RelAlgNode * body
ExecutorDeviceType
ErrorInfo getErrorDescription(const int32_t error_code)
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
size_t getIndex() const
#define SPIMAP_GEO_PHYSICAL_INPUT(c, i)
Definition: Catalog.h:76
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
bool g_skip_intermediate_count
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
RexUsedInputsVisitor(const Catalog_Namespace::Catalog &cat)
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
const RexScalar * getOuterCondition(const size_t nesting_level) const
TableGenerations computeTableGenerations()
SQLTypeInfo get_nullable_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:932
std::shared_ptr< Analyzer::Expr > translateScalarRex(const RexScalar *rex) const
#define LOG(tag)
Definition: Logger.h:194
std::pair< int, int > ParallelismHint
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
size_t size() const override
std::vector< size_t > outer_fragment_indices
static SpeculativeTopNBlacklist speculative_topn_blacklist_
size_t get_scalar_sources_size(const RelCompound *compound)
RelAlgExecutionUnit exe_unit
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< std::shared_ptr< Analyzer::TargetEntry > > targets
Definition: RenderInfo.h:37
SQLOps
Definition: sqldefs.h:29
TemporaryTables temporary_tables_
size_t getNumRows() const
static const size_t max_groups_buffer_entry_default_guess
const std::list< Analyzer::OrderEntry > order_entries
PersistentStorageMgr * getPersistentStorageMgr() const
Definition: DataMgr.cpp:571
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const RexScalar * getCondition() const
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5369
std::string join(T const &container, std::string const &delim)
const std::vector< TargetMetaInfo > getTupleType() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources_for_update(const RA *ra_node, const RelAlgTranslator &translator, int32_t tableId, const Catalog_Namespace::Catalog &cat, const ColumnNameList &colNames, size_t starting_projection_column_idx)
bool get_is_null() const
Definition: Analyzer.h:334
Definition: sqldefs.h:38
std::vector< InputDescriptor > input_descs
std::shared_ptr< Analyzer::Var > var_ref(const Analyzer::Expr *expr, const Analyzer::Var::WhichRow which_row, const int varno)
Definition: Analyzer.h:1675
#define UNREACHABLE()
Definition: Logger.h:247
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
#define CHECK_GE(x, y)
Definition: Logger.h:216
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
static WindowProjectNodeContext * create(Executor *executor)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:911
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Definition: sqldefs.h:49
Definition: sqldefs.h:30
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
int64_t int_value_from_numbers_ptr(const SQLTypeInfo &type_info, const int8_t *data)
void prepare_string_dictionaries(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
static const int32_t ERR_GEOS
Definition: Execute.h:1122
SQLTypeInfo get_agg_type(const SQLAgg agg_kind, const Analyzer::Expr *arg_expr)
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1118
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
std::shared_ptr< Analyzer::Expr > set_transient_dict(const std::shared_ptr< Analyzer::Expr > expr)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:252
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
static void invalidateCaches()
int g_hll_precision_bits
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
const SQLTypeInfo & get_type_info() const
void checkForMatchingMetaInfoTypes() const
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:4045
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
#define CHECK_GT(x, y)
Definition: Logger.h:215
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
bool is_count_distinct(const Analyzer::Expr *expr)
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::string to_string(char const *&&v)
double running_query_interrupt_freq
foreign_storage::ForeignStorageMgr * getForeignStorageMgr() const
void handleNop(RaExecutionDesc &ed)
#define LOG_IF(severity, condition)
Definition: Logger.h:293
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< Analyzer::Expr > cast_dict_to_none(const std::shared_ptr< Analyzer::Expr > &input)
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static const size_t high_scan_limit
Definition: Execute.h:452
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
bool list_contains_expression(const QualsList &haystack, const std::shared_ptr< Analyzer::Expr > &needle)
size_t get_count_distinct_sub_bitmap_count(const size_t bitmap_sz_bits, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1119
Definition: sqldefs.h:73
ExecutorType
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t getColInputsSize() const
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool g_enable_interop
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
virtual T visit(const RexScalar *rex_scalar) const
Definition: RexVisitor.h:27
const size_t getScalarSourcesSize() const
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1120
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1117
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
unsigned getIndex() const
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1108
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_from_table_reordering
Definition: Execute.cpp:84
CONSTEXPR DEVICE bool is_null(const T &value)
unsigned getId() const
Classes representing a parse tree.
int get_logical_size() const
Definition: sqltypes.h:325
bool isGeometry(TargetMetaInfo const &target_meta_info)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
const SortInfo sort_info
ExecutorType executor_type
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1112
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
int count
const JoinQualsPerNestingLevel join_quals
static void reset(Executor *executor)
SQLTypeInfo get_logical_type_for_expr(const Analyzer::Expr &expr)
const TableDescriptor * get_shard_for_key(const TableDescriptor *td, const Catalog_Namespace::Catalog &cat, const Fragmenter_Namespace::InsertData &data)
std::vector< std::shared_ptr< RexInput > > synthesized_physical_inputs_owned
std::pair< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > > > get_input_desc_impl(const RA *ra_node, const std::unordered_set< const RexInput * > &used_inputs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
SortAlgorithm
void set_parallelism_hints(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:102
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1114
const std::shared_ptr< ResultSet > & getRows() const
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1115
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
bool g_bigint_count
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
const RelAlgNode * getInput(const size_t idx) const
Definition: sqldefs.h:37
Definition: sqldefs.h:75
void executePostExecutionCallback()
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int getDatabaseId() const
Definition: Catalog.h:277
std::shared_ptr< Analyzer::Expr > build_logical_expression(const std::vector< std::shared_ptr< Analyzer::Expr >> &factors, const SQLOps sql_op)
bool is_metadata_placeholder(const ChunkMetadata &metadata)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:1111
bool isSimple() const
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
specifies the content in-memory of a row in the column metadata table
bool get_is_distinct() const
Definition: Analyzer.h:1098
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1121
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
JoinType get_join_type(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1109
size_t getTableFuncInputsSize() const
unsigned pending_query_interrupt_freq
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4063
const ColumnDescriptor * getMetadataForColumnBySpi(const int tableId, const size_t spi) const
Definition: Catalog.cpp:1598
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
Executor * getExecutor() const
std::string * stringval
Definition: sqltypes.h:214
int get_result_table_id() const
Definition: Analyzer.h:1632
static std::shared_ptr< Analyzer::Expr > normalize(const SQLOps optype, const SQLQualifier qual, std::shared_ptr< Analyzer::Expr > left_expr, std::shared_ptr< Analyzer::Expr > right_expr)
Definition: ParserNode.cpp:284
const std::vector< std::unique_ptr< const RexAgg > > & getAggExprs() const
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:103
bool isEmptyResult() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RexScalar * getProjectAt(const size_t idx) const
Definition: sqltypes.h:51
bool hasInput(const RelAlgNode *needle) const
Definition: sqltypes.h:52
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
int32_t countRexLiteralArgs() const
Definition: sqldefs.h:69
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:253
string description
Definition: setup.py:45
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_used_inputs(const RelCompound *compound, const Catalog_Namespace::Catalog &cat)
#define CHECK_LE(x, y)
Definition: Logger.h:214
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:940
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool table_is_temporary(const TableDescriptor *const td)
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
std::shared_ptr< const query_state::QueryState > query_state_
bool table_is_replicated(const TableDescriptor *td)
std::unordered_set< const RexInput * > visitInput(const RexInput *rex_input) const override
std::pair< std::unordered_set< const RexInput * >, std::vector< std::shared_ptr< RexInput > > > get_join_source_used_inputs(const RelAlgNode *ra_node, const Catalog_Namespace::Catalog &cat)
Basic constructors and methods of the row set interface.
Datum get_constval() const
Definition: Analyzer.h:335
Definition: sqldefs.h:76
const size_t getGroupByCount() const
std::unordered_set< const RexInput * > aggregateResult(const std::unordered_set< const RexInput * > &aggregate, const std::unordered_set< const RexInput * > &next_result) const override
const RelAlgNode & getRootRelAlgNode() const
size_t collationCount() const
const RelAlgNode * getSourceNode() const
bool isAggregate() const
size_t getLimit() const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
Definition: sqltypes.h:40
int table_id_from_ra(const RelAlgNode *ra_node)
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
static void handlePersistentError(const int32_t error_code)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
const RexScalar * getTableFuncInputAt(const size_t idx) const
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
SQLAgg get_aggtype() const
Definition: Analyzer.h:1095
const size_t max_groups_buffer_entry_guess
std::string getFunctionName() const
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool g_enable_bump_allocator
Definition: Execute.cpp:109
StringDictionaryGenerations computeStringDictionaryGenerations()
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
const RexScalar * getInnerCondition() const
RegisteredQueryHint query_hint
virtual std::string toString() const =0
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:203
bool is_geometry() const
Definition: sqltypes.h:501
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::vector< std::shared_ptr< Analyzer::Expr > > qual_to_disjunctive_form(const std::shared_ptr< Analyzer::Expr > &qual_expr)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< RexInput > > & get_inputs_owned() const
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getBody() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
double gpu_input_mem_limit_percent
RANodeOutput get_node_output(const RelAlgNode *ra_node)
bool g_enable_union
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< const RexOperator > get_bitwise_equals(const RexScalar *scalar)
Estimators to be used when precise cardinality isn&#39;t useful.
bool g_cluster
bool g_allow_cpu_retry
Definition: Execute.cpp:81
static std::string getErrorMessageFromCode(const int32_t error_code)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59